Skip to content

Commit a973b06

Browse files
authored
Fix async channel finishes all (#152)
* asyncChannel: enforce termination for all when finished * asyncThrowingChannel: enforce termination for all when finished * asyncChannel: improve unit tests readability * asyncChannel: adapt the Guide to the new finish behaviour * asyncChannel: adapt the Guide to the new finish behaviour
1 parent 6b59cfb commit a973b06

File tree

4 files changed

+213
-75
lines changed

4 files changed

+213
-75
lines changed

Sources/AsyncAlgorithms/AsyncAlgorithms.docc/Guides/Channel.md

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
## Proposed Solution
1616

17-
To achieve a system that supports back pressure and allows for the communication of more than one value from one task to another we are introducing a new type, the _channel_. The channel will be a reference-type asynchronous sequence with an asynchronous sending capability that awaits the consumption of iteration. Each value sent by the channel, or finish transmitted, will await the consumption of that value or event by iteration. That awaiting behavior will allow for the affordance of back pressure applied from the consumption site to be transmitted to the production site. This means that the rate of production cannot exceed the rate of consumption, and that the rate of consumption cannot exceed the rate of production.
17+
To achieve a system that supports back pressure and allows for the communication of more than one value from one task to another we are introducing a new type, the _channel_. The channel will be a reference-type asynchronous sequence with an asynchronous sending capability that awaits the consumption of iteration. Each value sent by the channel will await the consumption of that value by iteration. That awaiting behavior will allow for the affordance of back pressure applied from the consumption site to be transmitted to the production site. This means that the rate of production cannot exceed the rate of consumption, and that the rate of consumption cannot exceed the rate of production. Sending a terminal event to the channel will instantly resume all pending operations for every producers and consumers.
1818

1919
## Detailed Design
2020

@@ -31,7 +31,7 @@ public final class AsyncChannel<Element: Sendable>: AsyncSequence, Sendable {
3131
public init(element elementType: Element.Type = Element.self)
3232

3333
public func send(_ element: Element) async
34-
public func finish() async
34+
public func finish()
3535

3636
public func makeAsyncIterator() -> Iterator
3737
}
@@ -45,21 +45,21 @@ public final class AsyncThrowingChannel<Element: Sendable, Failure: Error>: Asyn
4545

4646
public func send(_ element: Element) async
4747
public func fail(_ error: Error) async where Failure == Error
48-
public func finish() async
48+
public func finish()
4949

5050
public func makeAsyncIterator() -> Iterator
5151
}
5252
```
5353

54-
Channels are intended to be used as communication types between tasks. Particularly when one task produces values and another task consumes said values. The back pressure applied by `send(_:)`, `fail(_:)` and `finish()` via the suspension/resume ensure that the production of values does not exceed the consumption of values from iteration. Each of these methods suspend after enqueuing the event and are resumed when the next call to `next()` on the `Iterator` is made.
54+
Channels are intended to be used as communication types between tasks. Particularly when one task produces values and another task consumes said values. On the one hand, the back pressure applied by `send(_:)` and `fail(_:)` via the suspension/resume ensure that the production of values does not exceed the consumption of values from iteration. Each of these methods suspend after enqueuing the event and are resumed when the next call to `next()` on the `Iterator` is made. On the other hand, the call to `finish()` immediately resumes all the pending operations for every producers and consumers. Thus, every suspended `send(_:)` operations instantly resume, so as every suspended `next()` operations by producing a nil value, indicating the termination of the iterations. Further calls to `send(_:)` will immediately resume.
5555

5656
```swift
5757
let channel = AsyncChannel<String>()
5858
Task {
5959
while let resultOfLongCalculation = doLongCalculations() {
6060
await channel.send(resultOfLongCalculation)
6161
}
62-
await channel.finish()
62+
channel.finish()
6363
}
6464

6565
for await calculationResult in channel {

Sources/AsyncAlgorithms/AsyncChannel.swift

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,12 @@ public final class AsyncChannel<Element: Sendable>: AsyncSequence, Sendable {
131131
func next(_ generation: Int) async -> Element? {
132132
return await withUnsafeContinuation { continuation in
133133
var cancelled = false
134+
var terminal = false
134135
state.withCriticalRegion { state -> UnsafeResumption<UnsafeContinuation<Element?, Never>?, Never>? in
136+
if state.terminal {
137+
terminal = true
138+
return nil
139+
}
135140
switch state.emission {
136141
case .idle:
137142
state.emission = .awaiting([Awaiting(generation: generation, continuation: continuation)])
@@ -157,13 +162,13 @@ public final class AsyncChannel<Element: Sendable>: AsyncSequence, Sendable {
157162
return nil
158163
}
159164
}?.resume()
160-
if cancelled {
165+
if cancelled || terminal {
161166
continuation.resume(returning: nil)
162167
}
163168
}
164169
}
165170

166-
func cancelSend() {
171+
func finishAll() {
167172
let (sends, nexts) = state.withCriticalRegion { state -> ([UnsafeContinuation<UnsafeContinuation<Element?, Never>?, Never>], Set<Awaiting>) in
168173
if state.terminal {
169174
return ([], [])
@@ -188,23 +193,15 @@ public final class AsyncChannel<Element: Sendable>: AsyncSequence, Sendable {
188193
}
189194
}
190195

191-
func _send(_ result: Result<Element?, Never>) async {
196+
func _send(_ element: Element) async {
192197
await withTaskCancellationHandler {
193-
cancelSend()
198+
finishAll()
194199
} operation: {
195200
let continuation: UnsafeContinuation<Element?, Never>? = await withUnsafeContinuation { continuation in
196201
state.withCriticalRegion { state -> UnsafeResumption<UnsafeContinuation<Element?, Never>?, Never>? in
197202
if state.terminal {
198203
return UnsafeResumption(continuation: continuation, success: nil)
199204
}
200-
switch result {
201-
case .success(let value):
202-
if value == nil {
203-
state.terminal = true
204-
}
205-
case .failure:
206-
state.terminal = true
207-
}
208205
switch state.emission {
209206
case .idle:
210207
state.emission = .pending([continuation])
@@ -224,20 +221,19 @@ public final class AsyncChannel<Element: Sendable>: AsyncSequence, Sendable {
224221
}
225222
}?.resume()
226223
}
227-
continuation?.resume(with: result)
224+
continuation?.resume(returning: element)
228225
}
229226
}
230227

231228
/// Send an element to an awaiting iteration. This function will resume when the next call to `next()` is made.
232229
/// If the channel is already finished then this returns immediately
233230
public func send(_ element: Element) async {
234-
await _send(.success(element))
231+
await _send(element)
235232
}
236233

237-
/// Send a finish to an awaiting iteration. This function will resume when the next call to `next()` is made.
238-
/// If the channel is already finished then this returns immediately
239-
public func finish() async {
240-
await _send(.success(nil))
234+
/// Send a finish to all awaiting iterations.
235+
public func finish() {
236+
finishAll()
241237
}
242238

243239
/// Create an `Iterator` for iteration of an `AsyncChannel`

Sources/AsyncAlgorithms/AsyncThrowingChannel.swift

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,12 @@ public final class AsyncThrowingChannel<Element: Sendable, Failure: Error>: Asyn
129129
func next(_ generation: Int) async throws -> Element? {
130130
return try await withUnsafeThrowingContinuation { continuation in
131131
var cancelled = false
132+
var terminal = false
132133
state.withCriticalRegion { state -> UnsafeResumption<UnsafeContinuation<Element?, Error>?, Never>? in
134+
if state.terminal {
135+
terminal = true
136+
return nil
137+
}
133138
switch state.emission {
134139
case .idle:
135140
state.emission = .awaiting([Awaiting(generation: generation, continuation: continuation)])
@@ -155,13 +160,13 @@ public final class AsyncThrowingChannel<Element: Sendable, Failure: Error>: Asyn
155160
return nil
156161
}
157162
}?.resume()
158-
if cancelled {
163+
if cancelled || terminal {
159164
continuation.resume(returning: nil)
160165
}
161166
}
162167
}
163168

164-
func cancelSend() {
169+
func finishAll() {
165170
let (sends, nexts) = state.withCriticalRegion { state -> ([UnsafeContinuation<UnsafeContinuation<Element?, Error>?, Never>], Set<Awaiting>) in
166171
if state.terminal {
167172
return ([], [])
@@ -186,23 +191,20 @@ public final class AsyncThrowingChannel<Element: Sendable, Failure: Error>: Asyn
186191
}
187192
}
188193

189-
func _send(_ result: Result<Element?, Error>) async {
194+
func _send(_ result: Result<Element, Error>) async {
190195
await withTaskCancellationHandler {
191-
cancelSend()
196+
finishAll()
192197
} operation: {
193198
let continuation: UnsafeContinuation<Element?, Error>? = await withUnsafeContinuation { continuation in
194199
state.withCriticalRegion { state -> UnsafeResumption<UnsafeContinuation<Element?, Error>?, Never>? in
195200
if state.terminal {
196201
return UnsafeResumption(continuation: continuation, success: nil)
197202
}
198-
switch result {
199-
case .success(let value):
200-
if value == nil {
201-
state.terminal = true
202-
}
203-
case .failure:
203+
204+
if case .failure = result {
204205
state.terminal = true
205206
}
207+
206208
switch state.emission {
207209
case .idle:
208210
state.emission = .pending([continuation])
@@ -222,7 +224,7 @@ public final class AsyncThrowingChannel<Element: Sendable, Failure: Error>: Asyn
222224
}
223225
}?.resume()
224226
}
225-
continuation?.resume(with: result)
227+
continuation?.resume(with: result.map { $0 as Element? })
226228
}
227229
}
228230

@@ -238,10 +240,9 @@ public final class AsyncThrowingChannel<Element: Sendable, Failure: Error>: Asyn
238240
await _send(.failure(error))
239241
}
240242

241-
/// Send a finish to an awaiting iteration. This function will resume when the next call to `next()` is made.
242-
/// If the channel is already finished then this returns immediately
243-
public func finish() async {
244-
await _send(.success(nil))
243+
/// Send a finish to all awaiting iterations.
244+
public func finish() {
245+
finishAll()
245246
}
246247

247248
public func makeAsyncIterator() -> Iterator {

0 commit comments

Comments
 (0)