Skip to content

Commit 7e45005

Browse files
authored
Switch from group.waitForAll() to group.next() (#254)
# Motivation Swift 5.8 is including a change to how `group.waitForAll()` is working. It now properly waits for all tasks to finish even if one of the tasks throws. We have used `group.waitForAll()` in multiple places and need to change this code accordingly. # Modification Switch code from `group.waitForAll()` to `group.next()`. # Result This fixes a few stuck tests that we have seen when running against development snapshots.
1 parent ffa6ee9 commit 7e45005

File tree

6 files changed

+118
-118
lines changed

6 files changed

+118
-118
lines changed

Sources/AsyncAlgorithms/CombineLatest/CombineLatestStateMachine.swift

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -530,7 +530,8 @@ struct CombineLatestStateMachine<
530530
preconditionFailure("Internal inconsistency current state \(self.state) and received upstreamThrew()")
531531

532532
case .upstreamsFinished:
533-
preconditionFailure("Internal inconsistency current state \(self.state) and received upstreamThrew()")
533+
// We need to tolerate multiple upstreams failing
534+
return .none
534535

535536
case .waitingForDemand(let task, let upstreams, _):
536537
// An upstream threw. We can cancel everything now and transition to finished.

Sources/AsyncAlgorithms/CombineLatest/CombineLatestStorage.swift

Lines changed: 26 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -319,35 +319,33 @@ final class CombineLatestStorage<
319319
}
320320
}
321321

322-
do {
323-
try await group.waitForAll()
324-
} catch {
325-
// One of the upstream sequences threw an error
326-
self.stateMachine.withCriticalRegion { stateMachine in
327-
let action = stateMachine.upstreamThrew(error)
328-
329-
switch action {
330-
case .cancelTaskAndUpstreamContinuations(let task, let upstreamContinuations):
331-
upstreamContinuations.forEach { $0.resume(throwing: CancellationError()) }
332-
task.cancel()
333-
334-
case .resumeContinuationWithErrorAndCancelTaskAndUpstreamContinuations(
335-
let downstreamContinuation,
336-
let error,
337-
let task,
338-
let upstreamContinuations
339-
):
340-
upstreamContinuations.forEach { $0.resume(throwing: CancellationError()) }
341-
task.cancel()
342-
343-
downstreamContinuation.resume(returning: .failure(error))
344-
345-
case .none:
346-
break
347-
}
348-
}
322+
while !group.isEmpty {
323+
do {
324+
try await group.next()
325+
} catch {
326+
// One of the upstream sequences threw an error
327+
self.stateMachine.withCriticalRegion { stateMachine in
328+
let action = stateMachine.upstreamThrew(error)
329+
switch action {
330+
case .cancelTaskAndUpstreamContinuations(let task, let upstreamContinuations):
331+
upstreamContinuations.forEach { $0.resume(throwing: CancellationError()) }
332+
task.cancel()
333+
case .resumeContinuationWithErrorAndCancelTaskAndUpstreamContinuations(
334+
let downstreamContinuation,
335+
let error,
336+
let task,
337+
let upstreamContinuations
338+
):
339+
upstreamContinuations.forEach { $0.resume(throwing: CancellationError()) }
340+
task.cancel()
341+
downstreamContinuation.resume(returning: .failure(error))
342+
case .none:
343+
break
344+
}
345+
}
349346

350-
group.cancelAll()
347+
group.cancelAll()
348+
}
351349
}
352350
}
353351
}

Sources/AsyncAlgorithms/Debounce/DebounceStateMachine.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -390,8 +390,8 @@ struct DebounceStateMachine<Base: AsyncSequence, C: Clock> {
390390
preconditionFailure("Internal inconsistency current state \(self.state) and received upstreamFinished()")
391391

392392
case .upstreamFailure:
393-
// The upstream already failed so it should never have throw again.
394-
preconditionFailure("Internal inconsistency current state \(self.state) and received childTaskSuspended()")
393+
// We need to tolerate multiple upstreams failing
394+
return .none
395395

396396
case .waitingForDemand(let task, .none, let clockContinuation, .none):
397397
// We don't have any buffered element so we can just go ahead

Sources/AsyncAlgorithms/Debounce/DebounceStorage.swift

Lines changed: 33 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -254,39 +254,41 @@ final class DebounceStorage<Base: AsyncSequence, C: Clock>: @unchecked Sendable
254254
}
255255
}
256256

257-
do {
258-
try await group.waitForAll()
259-
} catch {
260-
// The upstream sequence threw an error
261-
let action = self.stateMachine.withCriticalRegion { $0.upstreamThrew(error) }
257+
while !group.isEmpty {
258+
do {
259+
try await group.next()
260+
} catch {
261+
// One of the upstream sequences threw an error
262+
self.stateMachine.withCriticalRegion { stateMachine in
263+
let action = stateMachine.upstreamThrew(error)
264+
switch action {
265+
case .resumeContinuationWithErrorAndCancelTaskAndUpstreamContinuation(
266+
let downstreamContinuation,
267+
let error,
268+
let task,
269+
let upstreamContinuation,
270+
let clockContinuation
271+
):
272+
upstreamContinuation?.resume(throwing: CancellationError())
273+
clockContinuation?.resume(throwing: CancellationError())
262274

263-
switch action {
264-
case .resumeContinuationWithErrorAndCancelTaskAndUpstreamContinuation(
265-
let downstreamContinuation,
266-
let error,
267-
let task,
268-
let upstreamContinuation,
269-
let clockContinuation
270-
):
271-
upstreamContinuation?.resume(throwing: CancellationError())
272-
clockContinuation?.resume(throwing: CancellationError())
273-
274-
task.cancel()
275-
276-
downstreamContinuation.resume(returning: .failure(error))
277-
278-
case .cancelTaskAndClockContinuation(
279-
let task,
280-
let clockContinuation
281-
):
282-
clockContinuation?.resume(throwing: CancellationError())
283-
task.cancel()
284-
285-
case .none:
286-
break
287-
}
275+
task.cancel()
276+
277+
downstreamContinuation.resume(returning: .failure(error))
288278

289-
group.cancelAll()
279+
case .cancelTaskAndClockContinuation(
280+
let task,
281+
let clockContinuation
282+
):
283+
clockContinuation?.resume(throwing: CancellationError())
284+
task.cancel()
285+
case .none:
286+
break
287+
}
288+
}
289+
290+
group.cancelAll()
291+
}
290292
}
291293
}
292294
}

Sources/AsyncAlgorithms/Merge/MergeStorage.swift

Lines changed: 31 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -404,40 +404,39 @@ final class MergeStorage<
404404
}
405405
}
406406
}
407-
408-
do {
409-
try await group.waitForAll()
410-
} catch {
407+
408+
while !group.isEmpty {
409+
do {
410+
try await group.next()
411+
} catch {
411412
// One of the upstream sequences threw an error
412-
let action = self.lock.withLock {
413-
self.stateMachine.upstreamThrew(error)
414-
}
415-
416-
switch action {
417-
case let .resumeContinuationWithErrorAndCancelTaskAndUpstreamContinuations(
418-
downstreamContinuation,
419-
error,
420-
task,
421-
upstreamContinuations
422-
):
423-
upstreamContinuations.forEach { $0.resume(throwing: CancellationError()) }
424-
425-
task.cancel()
426-
427-
downstreamContinuation.resume(throwing: error)
428-
case let .cancelTaskAndUpstreamContinuations(
429-
task,
430-
upstreamContinuations
431-
):
432-
upstreamContinuations.forEach { $0.resume(throwing: CancellationError()) }
433-
434-
task.cancel()
435-
436-
case .none:
437-
break
413+
let action = self.lock.withLock {
414+
self.stateMachine.upstreamThrew(error)
415+
}
416+
switch action {
417+
case let .resumeContinuationWithErrorAndCancelTaskAndUpstreamContinuations(
418+
downstreamContinuation,
419+
error,
420+
task,
421+
upstreamContinuations
422+
):
423+
upstreamContinuations.forEach { $0.resume(throwing: CancellationError()) }
424+
425+
task.cancel()
426+
427+
downstreamContinuation.resume(throwing: error)
428+
case let .cancelTaskAndUpstreamContinuations(
429+
task,
430+
upstreamContinuations
431+
):
432+
upstreamContinuations.forEach { $0.resume(throwing: CancellationError()) }
433+
434+
task.cancel()
435+
case .none:
436+
break
437+
}
438+
group.cancelAll()
438439
}
439-
440-
group.cancelAll()
441440
}
442441
}
443442
}

Sources/AsyncAlgorithms/Zip/ZipStorage.swift

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -286,31 +286,31 @@ final class ZipStorage<Base1: AsyncSequence, Base2: AsyncSequence, Base3: AsyncS
286286
}
287287
}
288288

289-
do {
290-
try await group.waitForAll()
291-
} catch {
292-
// One of the upstream sequences threw an error
293-
self.stateMachine.withCriticalRegion { stateMachine in
294-
let action = stateMachine.upstreamThrew(error)
295-
296-
switch action {
297-
case .resumeContinuationWithErrorAndCancelTaskAndUpstreamContinuations(
298-
let downstreamContinuation,
299-
let error,
300-
let task,
301-
let upstreamContinuations
302-
):
303-
upstreamContinuations.forEach { $0.resume(throwing: CancellationError()) }
304-
task.cancel()
305-
306-
downstreamContinuation.resume(returning: .failure(error))
307-
308-
case .none:
309-
break
310-
}
311-
}
289+
while !group.isEmpty {
290+
do {
291+
try await group.next()
292+
} catch {
293+
// One of the upstream sequences threw an error
294+
self.stateMachine.withCriticalRegion { stateMachine in
295+
let action = stateMachine.upstreamThrew(error)
296+
switch action {
297+
case .resumeContinuationWithErrorAndCancelTaskAndUpstreamContinuations(
298+
let downstreamContinuation,
299+
let error,
300+
let task,
301+
let upstreamContinuations
302+
):
303+
upstreamContinuations.forEach { $0.resume(throwing: CancellationError()) }
304+
task.cancel()
305+
306+
downstreamContinuation.resume(returning: .failure(error))
307+
case .none:
308+
break
309+
}
310+
}
312311

313-
group.cancelAll()
312+
group.cancelAll()
313+
}
314314
}
315315
}
316316
}

0 commit comments

Comments
 (0)