Skip to content

Commit 314b10c

Browse files
authored
Remove AsyncIterator: Sendable requirement from merge (#185)
* Remove `AsyncIterator: Sendable` requirement from merge # Motivation Currently a lot of the operator implementations in here that consume other `AsyncSequence`s require the `AsyncIterator` to be `Sendable`. This is mostly due to the fact that we are calling `makeAsyncIterator` on the upstream `AsyncSequence` and then pass that iterator around to various newly spawned `Task`s. This has two downsides: 1. It only allows users to use operators like `merge` if their `AsyncSequence.AsyncIterator` is `Sendable` 2. In merge we are creating new `Task`s for every new demand. Creating `Task`s is not cheap. My main goal of this PR was to remove the `Sendable` constraint from `merge`. # Modification This PR overhauls the complete inner workings of the `AsyncMerge2Sequence`. It does a couple of things: 1. The main change is that instead of creating new `Task`s for every demand, we are creating one `Task` when the `AsyncIterator` is created. This task has as child task for every upstream sequence. 2. When calling `next` we are signalling the child tasks to demand from the upstream 3. A new state machine that is synchronizing the various concurrent operations that can happen 4. Handling cancellation since we are creating a bunch of continuations. # Result In the end, this PR swaps the implementation of `AsyncMerge2Sequence` and drops the `Sendable` constraint and passes all tests. Furthermore, on my local performance testing I saw up 50% speed increase in throughput. # Open points 1. I need to make this sequence re-throwing but before going down that rabbit whole I wanna get buy-in on the implementation. 2. We should discuss and document if `merge` and other operators are hot or cold, i.e. if they only request if they got downstream demand 3. I need to switch `AsyncMerge3Sequence` over to the same iplementation * Split logic into multiple files, adapt merge3 and incorporate PR feedback * Add more tests and fix upstream throw behaviour * Remove internal class from the sequence * Setup task after first next * Remove unnecessary tests
1 parent 4feeb83 commit 314b10c

File tree

9 files changed

+1351
-331
lines changed

9 files changed

+1351
-331
lines changed

Sources/AsyncAlgorithms/AsyncMerge3Sequence.swift

Lines changed: 0 additions & 284 deletions
This file was deleted.

Sources/AsyncAlgorithms/Locking.swift

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,27 @@ internal struct Lock {
8787
func unlock() {
8888
Lock.unlock(platformLock)
8989
}
90+
91+
/// Acquire the lock for the duration of the given block.
92+
///
93+
/// This convenience method should be preferred to `lock` and `unlock` in
94+
/// most situations, as it ensures that the lock will be released regardless
95+
/// of how `body` exits.
96+
///
97+
/// - Parameter body: The block to execute while holding the lock.
98+
/// - Returns: The value returned by the block.
99+
func withLock<T>(_ body: () throws -> T) rethrows -> T {
100+
self.lock()
101+
defer {
102+
self.unlock()
103+
}
104+
return try body()
105+
}
106+
107+
// specialise Void return (for performance)
108+
func withLockVoid(_ body: () throws -> Void) rethrows -> Void {
109+
try self.withLock(body)
110+
}
90111
}
91112

92113
struct ManagedCriticalState<State> {

0 commit comments

Comments
 (0)