Skip to content

Commit 5a56d26

Browse files
committed
WIP
1 parent a7b97f6 commit 5a56d26

File tree

5 files changed

+513
-94
lines changed

5 files changed

+513
-94
lines changed

Sources/AsyncAlgorithms/Buffer/AsyncBufferSequence.swift

Lines changed: 30 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -21,43 +21,49 @@ extension AsyncSequence where Self: Sendable {
2121
/// - Parameter policy: A policy that drives the behaviour of the ``AsyncBufferSequence``
2222
/// - Returns: An asynchronous sequence that buffers elements up to a given limit.
2323
public func buffer(
24-
policy: AsyncBufferSequencePolicy
24+
policy: AsyncBufferSequencePolicy<Self>
2525
) -> AsyncBufferSequence<Self> {
26-
AsyncBufferSequence(base: self, policy: policy)
26+
AsyncBufferSequence(base: self, storage: policy.storage)
2727
}
2828
}
2929

3030
/// A policy dictating the buffering behaviour of an ``AsyncBufferSequence``
31-
public struct AsyncBufferSequencePolicy {
32-
let policy: _AsyncBufferSequencePolicy
31+
public struct AsyncBufferSequencePolicy<Base: AsyncSequence> {
32+
let storage: _AsyncBufferSequenceStorage<Base>
3333

3434
/// A policy for buffering elements until the limit is reached. Then it will suspend the upstream async sequence.
35-
public static func bounded(_ limit: Int) -> Self { Self(policy: .bounded(limit)) }
35+
public static func bounded(_ limit: Int) -> Self {
36+
precondition(limit > 0, "The limit should be positive for the buffer operator to be efficient.")
37+
return Self(storage: .bounded(storage: BoundedBufferStorage<Base>(limit: limit)))
38+
}
3639

3740
/// A policy for buffering elements without limit.
38-
public static let unbounded = Self(policy: .unbounded)
41+
public static var unbounded: Self {
42+
Self(storage: .unbounded(storage: UnboundedBufferStorage(policy: .unlimited)))
43+
}
3944

4045
/// A policy for buffering elements until the limit is reached. Then it will discard the oldest buffered elements.
41-
public static func bufferingNewest(_ limit: Int) -> Self { Self(policy: .bufferingNewest(limit)) }
46+
public static func bufferingNewest(_ limit: Int) -> Self {
47+
precondition(limit > 0, "The limit should be positive for the buffer operator to be efficient.")
48+
return Self(storage: .unbounded(storage: UnboundedBufferStorage(policy: .bufferingNewest(limit))))
49+
}
4250

4351
/// A policy for buffering elements until the limit is reached. Then it will discard the newest elements.
44-
public static func bufferingOldest(_ limit: Int) -> Self { Self(policy: .bufferingOldest(limit)) }
52+
public static func bufferingOldest(_ limit: Int) -> Self {
53+
precondition(limit > 0, "The limit should be positive for the buffer operator to be efficient.")
54+
return Self(storage: .unbounded(storage: UnboundedBufferStorage(policy: .bufferingOldest(limit))))
55+
}
4556
}
4657

47-
enum _AsyncBufferSequencePolicy {
48-
case bounded(Int)
49-
case unbounded
58+
enum _UnboundedBufferPolicy {
59+
case unlimited
5060
case bufferingNewest(Int)
5161
case bufferingOldest(Int)
62+
}
5263

53-
var isPositiveLimit: Bool {
54-
switch self {
55-
case .unbounded:
56-
return true
57-
case .bounded(let limit), .bufferingNewest(let limit), .bufferingOldest(let limit):
58-
return limit > 0
59-
}
60-
}
64+
enum _AsyncBufferSequenceStorage<Base: AsyncSequence> {
65+
case bounded(storage: BoundedBufferStorage<Base>)
66+
case unbounded(storage: UnboundedBufferStorage<Base>)
6167
}
6268

6369
/// An `AsyncSequence` that buffers elements in regard to a policy.
@@ -66,14 +72,14 @@ public struct AsyncBufferSequence<Base: AsyncSequence & Sendable>: AsyncSequence
6672
public typealias AsyncIterator = Iterator
6773

6874
let base: Base
69-
let storage: BufferStorage<Base>
75+
let storage: _AsyncBufferSequenceStorage<Base>
7076

71-
public init(
77+
init(
7278
base: Base,
73-
policy: AsyncBufferSequencePolicy
79+
storage: _AsyncBufferSequenceStorage<Base>
7480
) {
7581
self.base = base
76-
self.storage = BufferStorage<Base>(policy: policy.policy)
82+
self.storage = storage
7783
}
7884

7985
public func makeAsyncIterator() -> Iterator {
@@ -87,7 +93,7 @@ public struct AsyncBufferSequence<Base: AsyncSequence & Sendable>: AsyncSequence
8793
var task: Task<Void, Never>? = nil
8894

8995
let base: Base
90-
let storage: BufferStorage<Base>
96+
let storage: _AsyncBufferSequenceStorage<Base>
9197

9298
public mutating func next() async rethrows -> Element? {
9399
if self.task == nil {
Lines changed: 278 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,278 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the Swift Async Algorithms open source project
4+
//
5+
// Copyright (c) 2022 Apple Inc. and the Swift project authors
6+
// Licensed under Apache License v2.0 with Runtime Library Exception
7+
//
8+
// See https://swift.org/LICENSE.txt for license information
9+
//
10+
//===----------------------------------------------------------------------===//
11+
12+
@_implementationOnly import DequeModule
13+
14+
struct BoundedBufferStateMachine<Base: AsyncSequence> {
15+
typealias Element = Base.Element
16+
typealias SuspendedProducer = UnsafeContinuation<Void, Never>
17+
typealias SuspendedConsumer = UnsafeContinuation<Element?, Error>
18+
19+
private enum State {
20+
case initial(base: Base)
21+
case buffering(
22+
task: Task<Void, Never>,
23+
buffer: Deque<Result<Element, Error>>,
24+
suspendedProducer: SuspendedProducer?,
25+
suspendedConsumer: SuspendedConsumer?
26+
)
27+
case modifying
28+
case finished(buffer: Deque<Result<Element, Error>>)
29+
}
30+
31+
private var state: State
32+
private let limit: Int
33+
34+
init(base: Base, limit: Int) {
35+
self.state = .initial(base: base)
36+
self.limit = limit
37+
}
38+
39+
var task: Task<Void, Never>? {
40+
switch self.state {
41+
case .buffering(let task, _, _, _):
42+
return task
43+
default:
44+
return nil
45+
}
46+
}
47+
48+
mutating func taskStarted(
49+
task: Task<Void, Never>,
50+
continuation: UnsafeContinuation<Element?, Error>
51+
) {
52+
switch self.state {
53+
case .initial:
54+
self.state = .buffering(task: task, buffer: [], suspendedProducer: nil, suspendedConsumer: continuation)
55+
case .buffering:
56+
preconditionFailure("Invalid state.")
57+
case .modifying:
58+
preconditionFailure("Invalid state.")
59+
case .finished:
60+
preconditionFailure("Invalid state.")
61+
}
62+
}
63+
64+
enum ProducerSuspendedAction {
65+
case none
66+
case resumeProducer
67+
}
68+
69+
mutating func producerSuspended(
70+
continuation: UnsafeContinuation<Void, Never>
71+
) -> ProducerSuspendedAction {
72+
switch self.state {
73+
case .initial:
74+
preconditionFailure("Invalid state. The task should already by started.")
75+
76+
case .buffering(let task, var buffer, .none, .none):
77+
// we are either idle or the buffer is already in use (no awaiting consumer)
78+
// if the buffer is available we resume the producer so it can we can request the next element
79+
// otherwise we confirm the suspension
80+
if buffer.count < limit {
81+
self.state = .modifying
82+
return .resumeProducer
83+
} else {
84+
self.state = .buffering(task: task, buffer: buffer, suspendedProducer: continuation, suspendedConsumer: nil)
85+
return .none
86+
}
87+
88+
case .buffering(_, let buffer, .none, .some):
89+
// we have an awaiting consumer, we can resume the producer so the next element can be requested
90+
precondition(buffer.isEmpty, "Invalid state. The buffer should be empty as we have an awaiting consumer already.")
91+
return .resumeProducer
92+
93+
case .buffering(_, _, .some, _):
94+
preconditionFailure("Invalid state. There is already a suspended producer.")
95+
96+
case .modifying:
97+
preconditionFailure("Invalid state.")
98+
99+
case .finished:
100+
return .resumeProducer
101+
}
102+
}
103+
104+
enum ElementProducedAction {
105+
case none
106+
case resumeConsumer(continuation: SuspendedConsumer, result: Result<Element, Error>)
107+
}
108+
109+
mutating func elementProduced(
110+
element: Element
111+
) -> ElementProducedAction {
112+
switch self.state {
113+
case .initial:
114+
preconditionFailure("Invalid state. The task should already by started.")
115+
116+
case .buffering(let task, var buffer, .none, .none):
117+
// we are either idle or the buffer is already in use (no awaiting consumer)
118+
// we have to stack the new element or suspend the producer if the buffer is full
119+
precondition(buffer.count < limit, "Invalid state. The buffer should be available for stacking a new element.")
120+
self.state = .modifying
121+
buffer.append(.success(element))
122+
self.state = .buffering(task: task, buffer: buffer, suspendedProducer: nil, suspendedConsumer: nil)
123+
return .none
124+
125+
case .buffering(let task, let buffer, .none, .some(let suspendedConsumer)):
126+
// we have an awaiting consumer, we can resume it with the element and exit
127+
precondition(buffer.isEmpty, "Invalid state. The buffer should be empty.")
128+
self.state = .buffering(task: task, buffer: buffer, suspendedProducer: nil, suspendedConsumer: nil)
129+
return .resumeConsumer(continuation: suspendedConsumer, result: .success(element))
130+
131+
case .buffering(_, _, .some, _):
132+
preconditionFailure("Invalid state. There should not be a suspended producer.")
133+
134+
case .modifying:
135+
preconditionFailure("Invalid state.")
136+
137+
case .finished:
138+
return .none
139+
}
140+
}
141+
142+
enum ProducerCancelledAction {
143+
case none
144+
case resumeProducerAndConsumer(
145+
producerContinuation: UnsafeContinuation<Void, Never>?,
146+
consumerContinuation: UnsafeContinuation<Element?, Error>?
147+
)
148+
}
149+
150+
mutating func producerCancelled() -> ProducerCancelledAction {
151+
switch self.state {
152+
case .initial:
153+
preconditionFailure("Invalid state. The task should already by started.")
154+
155+
case .buffering(_, _, let suspendedProducer, let suspendedConsumer):
156+
// the state machine cannot receive elements anymore
157+
self.state = .finished(buffer: [])
158+
return .resumeProducerAndConsumer(producerContinuation: suspendedProducer, consumerContinuation: suspendedConsumer)
159+
160+
case .modifying:
161+
preconditionFailure("Invalid state.")
162+
163+
case .finished:
164+
return .none
165+
}
166+
}
167+
168+
enum FinishAction {
169+
case none
170+
case resumeConsumer(
171+
continuation: UnsafeContinuation<Element?, Error>
172+
)
173+
}
174+
175+
mutating func finish(error: Error?) -> FinishAction {
176+
switch self.state {
177+
case .initial:
178+
preconditionFailure("Invalid state. The task should already by started.")
179+
180+
case .buffering(_, var buffer, .none, .none):
181+
// we are either idle or the buffer is already in use (no awaiting consumer)
182+
// if we have an error we stack it in the buffer so it can be consumed later
183+
if let error {
184+
buffer.append(.failure(error))
185+
}
186+
self.state = .finished(buffer: buffer)
187+
return .none
188+
189+
case .buffering(_, let buffer, .none, .some(let suspendedConsumer)):
190+
// we have an awaiting consumer, we can resume it
191+
precondition(buffer.isEmpty, "Invalid state. The buffer should be empty.")
192+
self.state = .finished(buffer: [])
193+
return .resumeConsumer(continuation: suspendedConsumer)
194+
195+
case .buffering(_, _, .some, _):
196+
preconditionFailure("Invalid state. There should not be a suspended producer.")
197+
198+
case .modifying:
199+
preconditionFailure("Invalid state.")
200+
201+
case .finished:
202+
return .none
203+
}
204+
}
205+
206+
enum NextAction {
207+
case none
208+
case resumeConsumer(result: Result<Element, Error>?)
209+
case resumeProducerAndConsumer(continuation: UnsafeContinuation<Void, Never>, result: Result<Element, Error>)
210+
case startTask(base: Base)
211+
}
212+
213+
mutating func next(
214+
continuation: UnsafeContinuation<Element?, Error>
215+
) -> NextAction {
216+
switch self.state {
217+
case .initial(let base):
218+
return .startTask(base: base)
219+
220+
case .buffering(let task, let buffer, .none, .none) where buffer.isEmpty:
221+
// we are idle, we have to suspend the consumer
222+
self.state = .buffering(task: task, buffer: buffer, suspendedProducer: nil, suspendedConsumer: continuation)
223+
return .none
224+
225+
case .buffering(let task, var buffer, let suspendedProducer, .none):
226+
// we have values in the buffer, we unstack the oldest one and resume a potential suspended producer
227+
self.state = .modifying
228+
let result = buffer.popFirst()!
229+
if let suspendedProducer {
230+
self.state = .buffering(task: task, buffer: buffer, suspendedProducer: nil, suspendedConsumer: nil)
231+
return .resumeProducerAndConsumer(continuation: suspendedProducer, result: result)
232+
}
233+
self.state = .buffering(task: task, buffer: buffer, suspendedProducer: nil, suspendedConsumer: nil)
234+
return .resumeConsumer(result: result)
235+
236+
case .buffering(_, _, _, .some):
237+
preconditionFailure("Invalid states. There is already a suspended consumer.")
238+
239+
case .modifying:
240+
preconditionFailure("Invalid state.")
241+
242+
case .finished(let buffer) where buffer.isEmpty:
243+
return .resumeConsumer(result: nil)
244+
245+
case .finished(var buffer):
246+
let result = buffer.popFirst()!
247+
self.state = .finished(buffer: buffer)
248+
return .resumeConsumer(result: result)
249+
}
250+
}
251+
252+
enum NextCancelledAction {
253+
case none
254+
case resumeProducerAndConsumer(
255+
producerContinuation: UnsafeContinuation<Void, Never>?,
256+
consumerContinuation: UnsafeContinuation<Element?, Error>?
257+
)
258+
}
259+
260+
mutating func nextCancelled() -> NextCancelledAction {
261+
switch self.state {
262+
case .initial:
263+
self.state = .finished(buffer: [])
264+
return .none
265+
266+
case .buffering(_, _, let suspendedProducer, let suspendedConsumer):
267+
self.state = .finished(buffer: [])
268+
return .resumeProducerAndConsumer(producerContinuation: suspendedProducer, consumerContinuation: suspendedConsumer)
269+
270+
case .modifying:
271+
preconditionFailure("Invalid state.")
272+
273+
case .finished:
274+
self.state = .finished(buffer: [])
275+
return .none
276+
}
277+
}
278+
}

0 commit comments

Comments
 (0)