diff --git a/Sources/AsyncAlgorithms/AsyncBufferSequence.swift b/Sources/AsyncAlgorithms/AsyncBufferSequence.swift deleted file mode 100644 index c079289c..00000000 --- a/Sources/AsyncAlgorithms/AsyncBufferSequence.swift +++ /dev/null @@ -1,308 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the Swift Async Algorithms open source project -// -// Copyright (c) 2022 Apple Inc. and the Swift project authors -// Licensed under Apache License v2.0 with Runtime Library Exception -// -// See https://swift.org/LICENSE.txt for license information -// -//===----------------------------------------------------------------------===// - -actor AsyncBufferState { - enum TerminationState: Sendable, CustomStringConvertible { - case running - case baseFailure(Error) // An error from the base sequence has occurred. We need to process any buffered items before throwing the error. We can rely on it not emitting any more items. - case baseTermination - case terminal - - var description: String { - switch self { - case .running: return "running" - case .baseFailure: return "base failure" - case .baseTermination: return "base termination" - case .terminal: return "terminal" - } - } - } - - var pending = [UnsafeContinuation, Never>]() - var terminationState = TerminationState.running - - init() { } - - func drain(buffer: Buffer) async where Buffer.Input == Input, Buffer.Output == Output { - guard pending.count > 0 else { - return - } - - do { - if let value = try await buffer.pop() { - pending.removeFirst().resume(returning: .success(value)) - } else { - switch terminationState { - case .running: - // There's no value to report, because it was probably grabbed by next() before we could grab it. The pending continuation was either resumed by next() directly, or will be by a future enqueued value or base termination/failure. - break - case .baseFailure(let error): - // Now that there are no more items in the buffer, we can finally report the base sequence's error and enter terminal state. - pending.removeFirst().resume(returning: .failure(error)) - self.terminate() - case .terminal, .baseTermination: - self.terminate() - } - } - } catch { - // Errors thrown by the buffer immediately terminate the sequence. - pending.removeFirst().resume(returning: .failure(error)) - self.terminate() - } - } - - func enqueue(_ item: Input, buffer: Buffer) async where Buffer.Input == Input, Buffer.Output == Output { - await buffer.push(item) - await drain(buffer: buffer) - } - - func fail(_ error: Error, buffer: Buffer) async where Buffer.Input == Input, Buffer.Output == Output { - terminationState = .baseFailure(error) - await drain(buffer: buffer) - } - - func finish(buffer: Buffer) async where Buffer.Input == Input, Buffer.Output == Output { - if case .running = terminationState { - terminationState = .baseTermination - } - await drain(buffer: buffer) - } - - func terminate() { - terminationState = .terminal - let oldPending = pending - pending = [] - for continuation in oldPending { - continuation.resume(returning: .success(nil)) - } - } - - func next(buffer: Buffer) async throws -> Buffer.Output? where Buffer.Input == Input, Buffer.Output == Output { - if case .terminal = terminationState { - return nil - } - - do { - while let value = try await buffer.pop() { - if let continuation = pending.first { - pending.removeFirst() - continuation.resume(returning: .success(value)) - } else { - return value - } - } - } catch { - // Errors thrown by the buffer immediately terminate the sequence. - self.terminate() - throw error - } - - switch terminationState { - case .running: - break - case .baseFailure(let error): - self.terminate() - throw error - case .baseTermination, .terminal: - self.terminate() - return nil - } - - let result: Result = await withUnsafeContinuation { continuation in - pending.append(continuation) - } - return try result._rethrowGet() - } -} - -/// An asynchronous buffer storage actor protocol used for buffering -/// elements to an `AsyncBufferSequence`. -@rethrows -public protocol AsyncBuffer: Actor { - associatedtype Input: Sendable - associatedtype Output: Sendable - - /// Push an element to enqueue to the buffer - func push(_ element: Input) async - - /// Pop an element from the buffer. - /// - /// Implementors of `pop()` may throw. In cases where types - /// throw from this function, that throwing behavior contributes to - /// the rethrowing characteristics of `AsyncBufferSequence`. - func pop() async throws -> Output? -} - -/// A buffer that limits pushed items by a certain count. -public actor AsyncLimitBuffer: AsyncBuffer { - /// A policy for buffering elements to an `AsyncLimitBuffer` - public enum Policy: Sendable { - /// A policy for no bounding limit of pushed elements. - case unbounded - /// A policy for limiting to a specific number of oldest values. - case bufferingOldest(Int) - /// A policy for limiting to a specific number of newest values. - case bufferingNewest(Int) - } - - var buffer = [Element]() - let policy: Policy - - init(policy: Policy) { - // limits should always be greater than 0 items - switch policy { - case .bufferingNewest(let limit): - precondition(limit > 0) - case .bufferingOldest(let limit): - precondition(limit > 0) - default: break - } - self.policy = policy - } - - /// Push an element to enqueue to the buffer. - public func push(_ element: Element) async { - switch policy { - case .unbounded: - buffer.append(element) - case .bufferingOldest(let limit): - if buffer.count < limit { - buffer.append(element) - } - case .bufferingNewest(let limit): - if buffer.count < limit { - // there is space available - buffer.append(element) - } else { - // no space is available and this should make some room - buffer.removeFirst() - buffer.append(element) - } - } - } - - /// Pop an element from the buffer. - public func pop() async -> Element? { - guard buffer.count > 0 else { - return nil - } - return buffer.removeFirst() - } -} - -extension AsyncSequence where Element: Sendable, Self: Sendable { - /// Creates an asynchronous sequence that buffers elements using a buffer created from a supplied closure. - /// - /// Use the `buffer(_:)` method to account for `AsyncSequence` types that may produce elements faster - /// than they are iterated. The `createBuffer` closure returns a backing buffer for storing elements and dealing with - /// behavioral characteristics of the `buffer(_:)` algorithm. - /// - /// - Parameter createBuffer: A closure that constructs a new `AsyncBuffer` actor to store buffered values. - /// - Returns: An asynchronous sequence that buffers elements using the specified `AsyncBuffer`. - public func buffer(_ createBuffer: @Sendable @escaping () -> Buffer) -> AsyncBufferSequence where Buffer.Input == Element { - AsyncBufferSequence(self, createBuffer: createBuffer) - } - - /// Creates an asynchronous sequence that buffers elements using a specific policy to limit the number of - /// elements that are buffered. - /// - /// - Parameter policy: A limiting policy behavior on the buffering behavior of the `AsyncBufferSequence` - /// - Returns: An asynchronous sequence that buffers elements up to a given limit. - public func buffer(policy limit: AsyncLimitBuffer.Policy) -> AsyncBufferSequence> { - buffer { - AsyncLimitBuffer(policy: limit) - } - } -} - -/// An `AsyncSequence` that buffers elements utilizing an `AsyncBuffer`. -public struct AsyncBufferSequence where Base.Element == Buffer.Input { - let base: Base - let createBuffer: @Sendable () -> Buffer - - init(_ base: Base, createBuffer: @Sendable @escaping () -> Buffer) { - self.base = base - self.createBuffer = createBuffer - } -} - -extension AsyncBufferSequence: Sendable where Base: Sendable { } - -extension AsyncBufferSequence: AsyncSequence { - public typealias Element = Buffer.Output - - /// The iterator for a `AsyncBufferSequence` instance. - public struct Iterator: AsyncIteratorProtocol { - struct Active { - var task: Task? - let buffer: Buffer - let state: AsyncBufferState - - init(_ base: Base, buffer: Buffer, state: AsyncBufferState) { - self.buffer = buffer - self.state = state - task = Task { - var iter = base.makeAsyncIterator() - do { - while let item = try await iter.next() { - await state.enqueue(item, buffer: buffer) - } - await state.finish(buffer: buffer) - } catch { - await state.fail(error, buffer: buffer) - } - } - } - - func next() async rethrows -> Element? { - let result: Result = await withTaskCancellationHandler { - do { - let value = try await state.next(buffer: buffer) - return .success(value) - } catch { - task?.cancel() - return .failure(error) - } - } onCancel: { - task?.cancel() - } - return try result._rethrowGet() - } - } - - enum State { - case idle(Base, @Sendable () -> Buffer) - case active(Active) - } - - var state: State - - init(_ base: Base, createBuffer: @Sendable @escaping () -> Buffer) { - state = .idle(base, createBuffer) - } - - public mutating func next() async rethrows -> Element? { - switch state { - case .idle(let base, let createBuffer): - let bufferState = AsyncBufferState() - let buffer = Active(base, buffer: createBuffer(), state: bufferState) - state = .active(buffer) - return try await buffer.next() - case .active(let buffer): - return try await buffer.next() - } - } - } - - public func makeAsyncIterator() -> Iterator { - Iterator(base, createBuffer: createBuffer) - } -} diff --git a/Sources/AsyncAlgorithms/Buffer/AsyncBufferSequence.swift b/Sources/AsyncAlgorithms/Buffer/AsyncBufferSequence.swift new file mode 100644 index 00000000..8049a1a0 --- /dev/null +++ b/Sources/AsyncAlgorithms/Buffer/AsyncBufferSequence.swift @@ -0,0 +1,131 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2022 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +extension AsyncSequence where Self: Sendable { + /// Creates an asynchronous sequence that buffers elements. + /// + /// The buffering behaviour is dictated by the policy: + /// - bounded: will buffer elements until the limit is reached. Then it will suspend the upstream async sequence. + /// - unbounded: will buffer elements without limit. + /// - bufferingNewest: will buffer elements until the limit is reached. Then it will discard the oldest elements. + /// - bufferingOldest: will buffer elements until the limit is reached. Then it will discard the newest elements. + /// + /// - Parameter policy: A policy that drives the behaviour of the ``AsyncBufferSequence`` + /// - Returns: An asynchronous sequence that buffers elements up to a given limit. + public func buffer( + policy: AsyncBufferSequencePolicy + ) -> AsyncBufferSequence { + AsyncBufferSequence(base: self, policy: policy) + } +} + +/// A policy dictating the buffering behaviour of an ``AsyncBufferSequence`` +public struct AsyncBufferSequencePolicy: Sendable { + enum _Policy { + case bounded(Int) + case unbounded + case bufferingNewest(Int) + case bufferingOldest(Int) + } + + let policy: _Policy + + /// A policy for buffering elements until the limit is reached. + /// Then consumption of the upstream `AsyncSequence` will be paused until elements are consumed from the buffer. + /// If the limit is zero then no buffering policy is applied. + public static func bounded(_ limit: Int) -> Self { + precondition(limit >= 0, "The limit should be positive or equal to 0.") + return Self(policy: .bounded(limit)) + } + + /// A policy for buffering elements without limit. + public static var unbounded: Self { + return Self(policy: .unbounded) + } + + /// A policy for buffering elements until the limit is reached. + /// After the limit is reached and a new element is produced by the upstream, the oldest buffered element will be discarded. + /// If the limit is zero then no buffering policy is applied. + public static func bufferingLatest(_ limit: Int) -> Self { + precondition(limit >= 0, "The limit should be positive or equal to 0.") + return Self(policy: .bufferingNewest(limit)) + } + + /// A policy for buffering elements until the limit is reached. + /// After the limit is reached and a new element is produced by the upstream, the latest buffered element will be discarded. + /// If the limit is zero then no buffering policy is applied. + public static func bufferingOldest(_ limit: Int) -> Self { + precondition(limit >= 0, "The limit should be positive or equal to 0.") + return Self(policy: .bufferingOldest(limit)) + } +} + +/// An `AsyncSequence` that buffers elements in regard to a policy. +public struct AsyncBufferSequence: AsyncSequence { + enum StorageType { + case transparent(Base.AsyncIterator) + case bounded(storage: BoundedBufferStorage) + case unbounded(storage: UnboundedBufferStorage) + } + + public typealias Element = Base.Element + public typealias AsyncIterator = Iterator + + let base: Base + let policy: AsyncBufferSequencePolicy + + public init( + base: Base, + policy: AsyncBufferSequencePolicy + ) { + self.base = base + self.policy = policy + } + + public func makeAsyncIterator() -> Iterator { + let storageType: StorageType + switch self.policy.policy { + case .bounded(...0), .bufferingNewest(...0), .bufferingOldest(...0): + storageType = .transparent(self.base.makeAsyncIterator()) + case .bounded(let limit): + storageType = .bounded(storage: BoundedBufferStorage(base: self.base, limit: limit)) + case .unbounded: + storageType = .unbounded(storage: UnboundedBufferStorage(base: self.base, policy: .unlimited)) + case .bufferingNewest(let limit): + storageType = .unbounded(storage: UnboundedBufferStorage(base: self.base, policy: .bufferingNewest(limit))) + case .bufferingOldest(let limit): + storageType = .unbounded(storage: UnboundedBufferStorage(base: self.base, policy: .bufferingOldest(limit))) + } + return Iterator(storageType: storageType) + } + + public struct Iterator: AsyncIteratorProtocol { + var storageType: StorageType + + public mutating func next() async rethrows -> Element? { + switch self.storageType { + case .transparent(var iterator): + let element = try await iterator.next() + self.storageType = .transparent(iterator) + return element + case .bounded(let storage): + return try await storage.next()?._rethrowGet() + case .unbounded(let storage): + return try await storage.next()?._rethrowGet() + } + } + } +} + +extension AsyncBufferSequence: Sendable where Base: Sendable { } + +@available(*, unavailable) +extension AsyncBufferSequence.Iterator: Sendable { } diff --git a/Sources/AsyncAlgorithms/Buffer/BoundedBufferStateMachine.swift b/Sources/AsyncAlgorithms/Buffer/BoundedBufferStateMachine.swift new file mode 100644 index 00000000..a5c50a61 --- /dev/null +++ b/Sources/AsyncAlgorithms/Buffer/BoundedBufferStateMachine.swift @@ -0,0 +1,310 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2022 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +@_implementationOnly import DequeModule + +struct BoundedBufferStateMachine { + typealias Element = Base.Element + typealias SuspendedProducer = UnsafeContinuation + typealias SuspendedConsumer = UnsafeContinuation?, Never> + + private enum State { + case initial(base: Base) + case buffering( + task: Task, + buffer: Deque>, + suspendedProducer: SuspendedProducer?, + suspendedConsumer: SuspendedConsumer? + ) + case modifying + case finished(buffer: Deque>) + } + + private var state: State + private let limit: Int + + init(base: Base, limit: Int) { + self.state = .initial(base: base) + self.limit = limit + } + + var task: Task? { + switch self.state { + case .buffering(let task, _, _, _): + return task + default: + return nil + } + } + + mutating func taskStarted(task: Task) { + switch self.state { + case .initial: + self.state = .buffering(task: task, buffer: [], suspendedProducer: nil, suspendedConsumer: nil) + + case .buffering: + preconditionFailure("Invalid state.") + + case .modifying: + preconditionFailure("Invalid state.") + + case .finished: + preconditionFailure("Invalid state.") + } + } + + mutating func shouldSuspendProducer() -> Bool { + switch state { + case .initial: + preconditionFailure("Invalid state. The task should already by started.") + + case .buffering(_, let buffer, .none, .none): + // we are either idle or the buffer is already in use (no awaiting consumer) + // if there are free slots, we should directly request the next element + return buffer.count >= self.limit + + case .buffering(_, _, .none, .some): + // we have an awaiting consumer, we should not suspended the producer, we should + // directly request the next element + return false + + case .buffering(_, _, .some, _): + preconditionFailure("Invalid state. There is already a suspended producer.") + + case .modifying: + preconditionFailure("Invalid state.") + + case .finished: + return false + } + } + + enum ProducerSuspendedAction { + case none + case resumeProducer + } + + mutating func producerSuspended(continuation: SuspendedProducer) -> ProducerSuspendedAction { + switch self.state { + case .initial: + preconditionFailure("Invalid state. The task should already by started.") + + case .buffering(let task, let buffer, .none, .none): + // we are either idle or the buffer is already in use (no awaiting consumer) + // if the buffer is available we resume the producer so it can we can request the next element + // otherwise we confirm the suspension + if buffer.count < limit { + return .resumeProducer + } else { + self.state = .buffering(task: task, buffer: buffer, suspendedProducer: continuation, suspendedConsumer: nil) + return .none + } + + case .buffering(_, let buffer, .none, .some): + // we have an awaiting consumer, we can resume the producer so the next element can be requested + precondition(buffer.isEmpty, "Invalid state. The buffer should be empty as we have an awaiting consumer already.") + return .resumeProducer + + case .buffering(_, _, .some, _): + preconditionFailure("Invalid state. There is already a suspended producer.") + + case .modifying: + preconditionFailure("Invalid state.") + + case .finished: + return .resumeProducer + } + } + + enum ElementProducedAction { + case none + case resumeConsumer(continuation: SuspendedConsumer, result: Result) + } + + mutating func elementProduced(element: Element) -> ElementProducedAction { + switch self.state { + case .initial: + preconditionFailure("Invalid state. The task should already by started.") + + case .buffering(let task, var buffer, .none, .none): + // we are either idle or the buffer is already in use (no awaiting consumer) + // we have to stack the new element or suspend the producer if the buffer is full + precondition(buffer.count < limit, "Invalid state. The buffer should be available for stacking a new element.") + self.state = .modifying + buffer.append(.success(element)) + self.state = .buffering(task: task, buffer: buffer, suspendedProducer: nil, suspendedConsumer: nil) + return .none + + case .buffering(let task, let buffer, .none, .some(let suspendedConsumer)): + // we have an awaiting consumer, we can resume it with the element and exit + precondition(buffer.isEmpty, "Invalid state. The buffer should be empty.") + self.state = .buffering(task: task, buffer: buffer, suspendedProducer: nil, suspendedConsumer: nil) + return .resumeConsumer(continuation: suspendedConsumer, result: .success(element)) + + case .buffering(_, _, .some, _): + preconditionFailure("Invalid state. There should not be a suspended producer.") + + case .modifying: + preconditionFailure("Invalid state.") + + case .finished: + return .none + } + } + + enum FinishAction { + case none + case resumeConsumer( + continuation: UnsafeContinuation?, Never>? + ) + } + + mutating func finish(error: Error?) -> FinishAction { + switch self.state { + case .initial: + preconditionFailure("Invalid state. The task should already by started.") + + case .buffering(_, var buffer, .none, .none): + // we are either idle or the buffer is already in use (no awaiting consumer) + // if we have an error we stack it in the buffer so it can be consumed later + if let error { + buffer.append(.failure(error)) + } + self.state = .finished(buffer: buffer) + return .none + + case .buffering(_, let buffer, .none, .some(let suspendedConsumer)): + // we have an awaiting consumer, we can resume it + precondition(buffer.isEmpty, "Invalid state. The buffer should be empty.") + self.state = .finished(buffer: []) + return .resumeConsumer(continuation: suspendedConsumer) + + case .buffering(_, _, .some, _): + preconditionFailure("Invalid state. There should not be a suspended producer.") + + case .modifying: + preconditionFailure("Invalid state.") + + case .finished: + return .none + } + } + + enum NextAction { + case startTask(base: Base) + case suspend + case returnResult(producerContinuation: UnsafeContinuation?, result: Result?) + } + + mutating func next() -> NextAction { + switch state { + case .initial(let base): + return .startTask(base: base) + + case .buffering(_, let buffer, .none, .none) where buffer.isEmpty: + // we are idle, we must suspend the consumer + return .suspend + + case .buffering(let task, var buffer, let suspendedProducer, .none): + // we have values in the buffer, we unstack the oldest one and resume a potential suspended producer + self.state = .modifying + let result = buffer.popFirst()! + self.state = .buffering(task: task, buffer: buffer, suspendedProducer: nil, suspendedConsumer: nil) + return .returnResult(producerContinuation: suspendedProducer, result: result) + + case .buffering(_, _, _, .some): + preconditionFailure("Invalid states. There is already a suspended consumer.") + + case .modifying: + preconditionFailure("Invalid state.") + + case .finished(let buffer) where buffer.isEmpty: + return .returnResult(producerContinuation: nil, result: nil) + + case .finished(var buffer): + self.state = .modifying + let result = buffer.popFirst()! + self.state = .finished(buffer: buffer) + return .returnResult(producerContinuation: nil, result: result) + } + } + + enum NextSuspendedAction { + case none + case returnResult(producerContinuation: UnsafeContinuation?, result: Result?) + } + + mutating func nextSuspended(continuation: SuspendedConsumer) -> NextSuspendedAction { + switch self.state { + case .initial: + preconditionFailure("Invalid state. The task should already by started.") + + case .buffering(let task, let buffer, .none, .none) where buffer.isEmpty: + // we are idle, we confirm the suspension of the consumer + self.state = .buffering(task: task, buffer: buffer, suspendedProducer: nil, suspendedConsumer: continuation) + return .none + + case .buffering(let task, var buffer, let suspendedProducer, .none): + // we have values in the buffer, we unstack the oldest one and resume a potential suspended producer + self.state = .modifying + let result = buffer.popFirst()! + self.state = .buffering(task: task, buffer: buffer, suspendedProducer: nil, suspendedConsumer: nil) + return .returnResult(producerContinuation: suspendedProducer, result: result) + + case .buffering(_, _, _, .some): + preconditionFailure("Invalid states. There is already a suspended consumer.") + + case .modifying: + preconditionFailure("Invalid state.") + + case .finished(let buffer) where buffer.isEmpty: + return .returnResult(producerContinuation: nil, result: nil) + + case .finished(var buffer): + self.state = .modifying + let result = buffer.popFirst()! + self.state = .finished(buffer: buffer) + return .returnResult(producerContinuation: nil, result: result) + } + } + + enum InterruptedAction { + case none + case resumeProducerAndConsumer( + task: Task, + producerContinuation: UnsafeContinuation?, + consumerContinuation: UnsafeContinuation?, Never>? + ) + } + + mutating func interrupted() -> InterruptedAction { + switch self.state { + case .initial: + self.state = .finished(buffer: []) + return .none + + case .buffering(let task, _, let suspendedProducer, let suspendedConsumer): + self.state = .finished(buffer: []) + return .resumeProducerAndConsumer( + task: task, + producerContinuation: suspendedProducer, + consumerContinuation: suspendedConsumer + ) + + case .modifying: + preconditionFailure("Invalid state.") + + case .finished: + self.state = .finished(buffer: []) + return .none + } + } +} diff --git a/Sources/AsyncAlgorithms/Buffer/BoundedBufferStorage.swift b/Sources/AsyncAlgorithms/Buffer/BoundedBufferStorage.swift new file mode 100644 index 00000000..c00360e1 --- /dev/null +++ b/Sources/AsyncAlgorithms/Buffer/BoundedBufferStorage.swift @@ -0,0 +1,142 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2022 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +final class BoundedBufferStorage: Sendable where Base: Sendable { + private let stateMachine: ManagedCriticalState> + + init(base: Base, limit: Int) { + self.stateMachine = ManagedCriticalState(BoundedBufferStateMachine(base: base, limit: limit)) + } + + func next() async -> Result? { + return await withTaskCancellationHandler { + let (shouldSuspend, result) = self.stateMachine.withCriticalRegion { stateMachine -> (Bool, Result?) in + let action = stateMachine.next() + switch action { + case .startTask(let base): + self.startTask(stateMachine: &stateMachine, base: base) + return (true, nil) + case .suspend: + return (true, nil) + case .returnResult(let producerContinuation, let result): + producerContinuation?.resume() + return (false, result) + } + } + + if !shouldSuspend { + return result + } + + return await withUnsafeContinuation { (continuation: UnsafeContinuation?, Never>) in + self.stateMachine.withCriticalRegion { stateMachine in + let action = stateMachine.nextSuspended(continuation: continuation) + switch action { + case .none: + break + case .returnResult(let producerContinuation, let result): + producerContinuation?.resume() + continuation.resume(returning: result) + } + } + } + } onCancel: { + self.interrupted() + } + } + + private func startTask( + stateMachine: inout BoundedBufferStateMachine, + base: Base + ) { + let task = Task { + do { + var iterator = base.makeAsyncIterator() + + loop: while true { + let shouldSuspend = self.stateMachine.withCriticalRegion { stateMachine -> Bool in + return stateMachine.shouldSuspendProducer() + } + + if shouldSuspend { + await withUnsafeContinuation { (continuation: UnsafeContinuation) in + self.stateMachine.withCriticalRegion { stateMachine in + let action = stateMachine.producerSuspended(continuation: continuation) + + switch action { + case .none: + break + case .resumeProducer: + continuation.resume() + } + } + } + } + + guard let element = try await iterator.next() else { + // the upstream is finished + break loop + } + + self.stateMachine.withCriticalRegion { stateMachine in + let action = stateMachine.elementProduced(element: element) + switch action { + case .none: + break + case .resumeConsumer(let continuation, let result): + continuation.resume(returning: result) + } + } + } + + self.stateMachine.withCriticalRegion { stateMachine in + let action = stateMachine.finish(error: nil) + switch action { + case .none: + break + case .resumeConsumer(let continuation): + continuation?.resume(returning: nil) + } + } + } catch { + self.stateMachine.withCriticalRegion { stateMachine in + let action = stateMachine.finish(error: error) + switch action { + case .none: + break + case .resumeConsumer(let continuation): + continuation?.resume(returning: .failure(error)) + } + } + } + } + + stateMachine.taskStarted(task: task) + } + + func interrupted() { + self.stateMachine.withCriticalRegion { stateMachine in + let action = stateMachine.interrupted() + switch action { + case .none: + break + case .resumeProducerAndConsumer(let task, let producerContinuation, let consumerContinuation): + task.cancel() + producerContinuation?.resume() + consumerContinuation?.resume(returning: nil) + } + } + } + + deinit { + self.interrupted() + } +} diff --git a/Sources/AsyncAlgorithms/Buffer/UnboundedBufferStateMachine.swift b/Sources/AsyncAlgorithms/Buffer/UnboundedBufferStateMachine.swift new file mode 100644 index 00000000..b163619a --- /dev/null +++ b/Sources/AsyncAlgorithms/Buffer/UnboundedBufferStateMachine.swift @@ -0,0 +1,250 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2022 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +@_implementationOnly import DequeModule + +struct UnboundedBufferStateMachine { + typealias Element = Base.Element + typealias SuspendedConsumer = UnsafeContinuation?, Never> + + enum Policy { + case unlimited + case bufferingNewest(Int) + case bufferingOldest(Int) + } + + private enum State { + case initial(base: Base) + case buffering( + task: Task, + buffer: Deque>, + suspendedConsumer: SuspendedConsumer? + ) + case modifying + case finished(buffer: Deque>) + } + + private var state: State + private let policy: Policy + + init(base: Base, policy: Policy) { + self.state = .initial(base: base) + self.policy = policy + } + + var task: Task? { + switch self.state { + case .buffering(let task, _, _): + return task + default: + return nil + } + } + + mutating func taskStarted(task: Task) { + switch self.state { + case .initial: + self.state = .buffering(task: task, buffer: [], suspendedConsumer: nil) + + case .buffering: + preconditionFailure("Invalid state.") + + case .modifying: + preconditionFailure("Invalid state.") + + case .finished: + preconditionFailure("Invalid state.") + } + } + + enum ElementProducedAction { + case none + case resumeConsumer( + continuation: SuspendedConsumer, + result: Result + ) + } + + mutating func elementProduced(element: Element) -> ElementProducedAction { + switch self.state { + case .initial: + preconditionFailure("Invalid state. The task should already by started.") + + case .buffering(let task, var buffer, .none): + // we are either idle or the buffer is already in use (no awaiting consumer) + // we have to apply the policy when stacking the new element + self.state = .modifying + switch self.policy { + case .unlimited: + buffer.append(.success(element)) + case .bufferingNewest(let limit): + if buffer.count >= limit { + _ = buffer.popFirst() + } + buffer.append(.success(element)) + case .bufferingOldest(let limit): + if buffer.count < limit { + buffer.append(.success(element)) + } + } + self.state = .buffering(task: task, buffer: buffer, suspendedConsumer: nil) + return .none + + case .buffering(let task, let buffer, .some(let suspendedConsumer)): + // we have an awaiting consumer, we can resume it with the element + precondition(buffer.isEmpty, "Invalid state. The buffer should be empty.") + self.state = .buffering(task: task, buffer: buffer, suspendedConsumer: nil) + return .resumeConsumer( + continuation: suspendedConsumer, + result: .success(element) + ) + + case .modifying: + preconditionFailure("Invalid state.") + + case .finished: + return .none + } + } + + enum FinishAction { + case none + case resumeConsumer(continuation: SuspendedConsumer?) + } + + mutating func finish(error: Error?) -> FinishAction { + switch self.state { + case .initial: + preconditionFailure("Invalid state. The task should already by started.") + + case .buffering(_, var buffer, .none): + // we are either idle or the buffer is already in use (no awaiting consumer) + // if we have an error we stack it in the buffer so it can be consumed later + if let error { + buffer.append(.failure(error)) + } + self.state = .finished(buffer: buffer) + return .none + + case .buffering(_, let buffer, let suspendedConsumer): + // we have an awaiting consumer, we can resume it with nil or the error + precondition(buffer.isEmpty, "Invalid state. The buffer should be empty.") + self.state = .finished(buffer: []) + return .resumeConsumer(continuation: suspendedConsumer) + + case .modifying: + preconditionFailure("Invalid state.") + + case .finished: + return .none + } + } + + enum NextAction { + case startTask(base: Base) + case suspend + case returnResult(Result?) + } + + mutating func next() -> NextAction { + switch self.state { + case .initial(let base): + return .startTask(base: base) + + case .buffering(_, let buffer, let suspendedConsumer) where buffer.isEmpty: + // we are idle, we have to suspend the consumer + precondition(suspendedConsumer == nil, "Invalid states. There is already a suspended consumer.") + return .suspend + + case .buffering(let task, var buffer, let suspendedConsumer): + // the buffer is already in use, we can unstack a value and directly resume the consumer + precondition(suspendedConsumer == nil, "Invalid states. There is already a suspended consumer.") + self.state = .modifying + let result = buffer.popFirst()! + self.state = .buffering(task: task, buffer: buffer, suspendedConsumer: nil) + return .returnResult(result) + + case .modifying: + preconditionFailure("Invalid state.") + + case .finished(let buffer) where buffer.isEmpty: + return .returnResult(nil) + + case .finished(var buffer): + self.state = .modifying + let result = buffer.popFirst()! + self.state = .finished(buffer: buffer) + return .returnResult(result) + } + } + + enum NextSuspendedAction { + case none + case resumeConsumer(Result?) + } + + mutating func nextSuspended(continuation: SuspendedConsumer) -> NextSuspendedAction { + switch self.state { + case .initial: + preconditionFailure("Invalid state. The task should already by started.") + + case .buffering(let task, let buffer, let suspendedConsumer) where buffer.isEmpty: + // we are idle, we confirm the suspension of the consumer + precondition(suspendedConsumer == nil, "Invalid states. There is already a suspended consumer.") + self.state = .buffering(task: task, buffer: buffer, suspendedConsumer: continuation) + return .none + + case .buffering(let task, var buffer, let suspendedConsumer): + // the buffer is already in use, we can unstack a value and directly resume the consumer + precondition(suspendedConsumer == nil, "Invalid states. There is already a suspended consumer.") + self.state = .modifying + let result = buffer.popFirst()! + self.state = .buffering(task: task, buffer: buffer, suspendedConsumer: nil) + return .resumeConsumer(result) + + case .modifying: + preconditionFailure("Invalid state.") + + case .finished(let buffer) where buffer.isEmpty: + return .resumeConsumer(nil) + + case .finished(var buffer): + self.state = .modifying + let result = buffer.popFirst()! + self.state = .finished(buffer: buffer) + return .resumeConsumer(result) + } + } + + enum InterruptedAction { + case none + case resumeConsumer(task: Task, continuation: SuspendedConsumer?) + } + + mutating func interrupted() -> InterruptedAction { + switch self.state { + case .initial: + state = .finished(buffer: []) + return .none + + case .buffering(let task, _, let suspendedConsumer): + self.state = .finished(buffer: []) + return .resumeConsumer(task: task, continuation: suspendedConsumer) + + case .modifying: + preconditionFailure("Invalid state.") + + case .finished: + self.state = .finished(buffer: []) + return .none + } + } +} diff --git a/Sources/AsyncAlgorithms/Buffer/UnboundedBufferStorage.swift b/Sources/AsyncAlgorithms/Buffer/UnboundedBufferStorage.swift new file mode 100644 index 00000000..59b02810 --- /dev/null +++ b/Sources/AsyncAlgorithms/Buffer/UnboundedBufferStorage.swift @@ -0,0 +1,114 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2022 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +final class UnboundedBufferStorage: Sendable where Base: Sendable { + private let stateMachine: ManagedCriticalState> + + init(base: Base, policy: UnboundedBufferStateMachine.Policy) { + self.stateMachine = ManagedCriticalState(UnboundedBufferStateMachine(base: base, policy: policy)) + } + + func next() async -> Result? { + return await withTaskCancellationHandler { + + let (shouldSuspend, result) = self.stateMachine.withCriticalRegion { stateMachine -> (Bool, Result?) in + let action = stateMachine.next() + switch action { + case .startTask(let base): + self.startTask(stateMachine: &stateMachine, base: base) + return (true, nil) + case .suspend: + return (true, nil) + case .returnResult(let result): + return (false, result) + } + } + + if !shouldSuspend { + return result + } + + return await withUnsafeContinuation { (continuation: UnsafeContinuation?, Never>) in + self.stateMachine.withCriticalRegion { stateMachine in + let action = stateMachine.nextSuspended(continuation: continuation) + switch action { + case .none: + break + case .resumeConsumer(let result): + continuation.resume(returning: result) + } + } + } + } onCancel: { + self.interrupted() + } + } + + private func startTask( + stateMachine: inout UnboundedBufferStateMachine, + base: Base + ) { + let task = Task { + do { + for try await element in base { + self.stateMachine.withCriticalRegion { stateMachine in + let action = stateMachine.elementProduced(element: element) + switch action { + case .none: + break + case .resumeConsumer(let continuation, let result): + continuation.resume(returning: result) + } + } + } + + self.stateMachine.withCriticalRegion { stateMachine in + let action = stateMachine.finish(error: nil) + switch action { + case .none: + break + case .resumeConsumer(let continuation): + continuation?.resume(returning: nil) + } + } + } catch { + self.stateMachine.withCriticalRegion { stateMachine in + let action = stateMachine.finish(error: error) + switch action { + case .none: + break + case .resumeConsumer(let continuation): + continuation?.resume(returning: .failure(error)) + } + } + } + } + + stateMachine.taskStarted(task: task) + } + + func interrupted() { + self.stateMachine.withCriticalRegion { stateMachine in + let action = stateMachine.interrupted() + switch action { + case .none: + break + case .resumeConsumer(let task, let continuation): + task.cancel() + continuation?.resume(returning: nil) + } + } + } + + deinit { + self.interrupted() + } +} diff --git a/Tests/AsyncAlgorithmsTests/Performance/TestThroughput.swift b/Tests/AsyncAlgorithmsTests/Performance/TestThroughput.swift index 834b7303..d8003ca8 100644 --- a/Tests/AsyncAlgorithmsTests/Performance/TestThroughput.swift +++ b/Tests/AsyncAlgorithmsTests/Performance/TestThroughput.swift @@ -20,6 +20,26 @@ final class TestThroughput: XCTestCase { func test_throwingChannel() async { await measureThrowingChannelThroughput(output: 1) } + func test_buffer_bounded() async { + await measureSequenceThroughput(output: 1) { + $0.buffer(policy: .bounded(5)) + } + } + func test_buffer_unbounded() async { + await measureSequenceThroughput(output: 1) { + $0.buffer(policy: .unbounded) + } + } + func test_buffer_bufferingNewest() async { + await measureSequenceThroughput(output: 1) { + $0.buffer(policy: .bufferingLatest(5)) + } + } + func test_buffer_bufferingOldest() async { + await measureSequenceThroughput(output: 1) { + $0.buffer(policy: .bufferingOldest(5)) + } + } func test_chain2() async { await measureSequenceThroughput(firstOutput: 1, secondOutput: 2) { chain($0, $1) diff --git a/Tests/AsyncAlgorithmsTests/TestBuffer.swift b/Tests/AsyncAlgorithmsTests/TestBuffer.swift index cd989c41..f8ef8a3c 100644 --- a/Tests/AsyncAlgorithmsTests/TestBuffer.swift +++ b/Tests/AsyncAlgorithmsTests/TestBuffer.swift @@ -13,263 +13,317 @@ import XCTest import AsyncAlgorithms final class TestBuffer: XCTestCase { - func test_buffering() async { - var gated = GatedSequence([1, 2, 3, 4, 5]) - let sequence = gated.buffer(policy: .unbounded) - var iterator = sequence.makeAsyncIterator() - - gated.advance() + func test_given_a_base_sequence_when_buffering_with_unbounded_then_the_buffer_is_filled_in() async { + // Given + var base = GatedSequence([1, 2, 3, 4, 5]) + let buffered = base.buffer(policy: .unbounded) + var iterator = buffered.makeAsyncIterator() + + // When + base.advance() + + // Then var value = await iterator.next() XCTAssertEqual(value, 1) - gated.advance() - gated.advance() - gated.advance() + + // When + base.advance() + base.advance() + base.advance() + + // Then value = await iterator.next() XCTAssertEqual(value, 2) + value = await iterator.next() XCTAssertEqual(value, 3) + value = await iterator.next() XCTAssertEqual(value, 4) - gated.advance() - gated.advance() + + // When + base.advance() + base.advance() + + // Then value = await iterator.next() XCTAssertEqual(value, 5) value = await iterator.next() XCTAssertEqual(value, nil) - value = await iterator.next() - XCTAssertEqual(value, nil) + + let pastEnd = await iterator.next() + XCTAssertEqual(value, pastEnd) } - func test_buffering_withError() async { + func test_given_a_failable_base_sequence_when_buffering_with_unbounded_then_the_failure_is_forwarded() async { + // Given var gated = GatedSequence([1, 2, 3, 4, 5, 6, 7]) - let gated_map = gated.map { try throwOn(3, $0) } - let sequence = gated_map.buffer(policy: .unbounded) - var iterator = sequence.makeAsyncIterator() + let base = gated.map { try throwOn(3, $0) } + let buffered = base.buffer(policy: .unbounded) + var iterator = buffered.makeAsyncIterator() + // When gated.advance() + + // Then var value = try! await iterator.next() XCTAssertEqual(value, 1) + + // When gated.advance() gated.advance() gated.advance() + + // Then value = try! await iterator.next() XCTAssertEqual(value, 2) + // When gated.advance() gated.advance() gated.advance() gated.advance() + + // Then do { value = try await iterator.next() XCTFail("next() should have thrown.") - } catch { } + } catch { + XCTAssert(error is Failure) + } - value = try! await iterator.next() - XCTAssertNil(value) + var pastFailure = try! await iterator.next() + XCTAssertNil(pastFailure) - value = try! await iterator.next() - XCTAssertNil(value) + pastFailure = try! await iterator.next() + XCTAssertNil(pastFailure) - value = try! await iterator.next() - XCTAssertNil(value) + pastFailure = try! await iterator.next() + XCTAssertNil(pastFailure) - value = try! await iterator.next() - XCTAssertNil(value) + pastFailure = try! await iterator.next() + XCTAssertNil(pastFailure) - value = try! await iterator.next() - XCTAssertNil(value) + pastFailure = try! await iterator.next() + XCTAssertNil(pastFailure) } - - func test_buffer_delegation() async { - actor BufferDelegate: AsyncBuffer { - var buffer = [Int]() - var pushed = [Int]() - - func push(_ element: Int) async { - buffer.append(element) - pushed.append(element) - } - - func pop() async -> Int? { - if buffer.count > 0 { - return buffer.removeFirst() + + func test_given_a_base_sequence_when_bufferingOldest_then_the_policy_is_applied() async { + validate { + "X-12- 34- 5 |" + $0.inputs[0].buffer(policy: .bufferingOldest(2)) + "X,,,[1,],,[2,],[3,][5,]|" + } + } + + func test_given_a_base_sequence_when_bufferingOldest_with_0_limit_then_the_policy_is_transparent() async { + validate { + "X-12- 34- 5 |" + $0.inputs[0].buffer(policy: .bufferingOldest(0)) + "X-12- 34- 5 |" + } + } + + func test_given_a_base_sequence_when_bufferingOldest_at_slow_pace_then_no_element_is_dropped() async { + validate { + "X-12 3 4 5 |" + $0.inputs[0].buffer(policy: .bufferingOldest(2)) + "X,,[1,][2,][3,][45]|" + } + } + + func test_given_a_failable_base_sequence_when_bufferingOldest_then_the_failure_is_forwarded() async { + validate { + "X-12345^" + $0.inputs[0].buffer(policy: .bufferingOldest(2)) + "X,,,,,,[12^]" + } + } + + func test_given_a_base_sequence_when_bufferingNewest_then_the_policy_is_applied() async { + validate { + "X-12- 34 -5|" + $0.inputs[0].buffer(policy: .bufferingLatest(2)) + "X,,,[1,],,[3,],[4,][5,]|" + } + } + + func test_given_a_base_sequence_when_bufferingNewest_with_limit_0_then_the_policy_is_transparent() async { + validate { + "X-12- 34 -5|" + $0.inputs[0].buffer(policy: .bufferingLatest(0)) + "X-12- 34 -5|" + } + } + + func test_given_a_base_sequence_when_bufferingNewest_at_slow_pace_then_no_element_is_dropped() async { + validate { + "X-12 3 4 5 |" + $0.inputs[0].buffer(policy: .bufferingLatest(2)) + "X,,[1,][2,][3,][45]|" + } + } + + func test_given_a_failable_base_sequence_when_bufferingNewest_then_the_failure_is_forwarded() async { + validate { + "X-12345^" + $0.inputs[0].buffer(policy: .bufferingLatest(2)) + "X,,,,,,[45^]" + } + } + + func test_given_a_buffered_with_unbounded_sequence_when_cancelling_consumer_then_the_iteration_finishes_and_the_base_is_cancelled() async { + // Given + let buffered = Indefinite(value: 1).async.buffer(policy: .unbounded) + + let finished = expectation(description: "finished") + let iterated = expectation(description: "iterated") + + let task = Task { + var firstIteration = false + for await _ in buffered { + if !firstIteration { + firstIteration = true + iterated.fulfill() } - return nil } + finished.fulfill() } - let delegate = BufferDelegate() - var gated = GatedSequence([1, 2, 3, 4, 5]) - let sequence = gated.buffer { - delegate - } - var iterator = sequence.makeAsyncIterator() - - gated.advance() + // ensure the task actually starts + wait(for: [iterated], timeout: 1.0) + + // When + task.cancel() + + // Then + wait(for: [finished], timeout: 1.0) + } + + func test_given_a_base_sequence_when_buffering_with_bounded_then_the_buffer_is_filled_in_and_suspends() async { + // Gicen + var base = GatedSequence([1, 2, 3, 4, 5]) + let buffered = base.buffer(policy: .bounded(2)) + var iterator = buffered.makeAsyncIterator() + + // When + base.advance() + + // Then var value = await iterator.next() - var pushed = await delegate.pushed - XCTAssertEqual(pushed, [1]) XCTAssertEqual(value, 1) - gated.advance() - gated.advance() - gated.advance() + + // When + base.advance() + base.advance() + base.advance() + + // Then value = await iterator.next() XCTAssertEqual(value, 2) value = await iterator.next() - pushed = await delegate.pushed XCTAssertEqual(value, 3) value = await iterator.next() - pushed = await delegate.pushed - XCTAssertEqual(pushed, [1, 2, 3, 4]) XCTAssertEqual(value, 4) - gated.advance() - gated.advance() + + // When + base.advance() + base.advance() + + // Then value = await iterator.next() - pushed = await delegate.pushed - XCTAssertEqual(pushed, [1, 2, 3, 4, 5]) XCTAssertEqual(value, 5) value = await iterator.next() XCTAssertEqual(value, nil) - value = await iterator.next() - XCTAssertEqual(value, nil) - } - - func test_delegatedBuffer_withError() async { - actor BufferDelegate: AsyncBuffer { - var buffer = [Int]() - var pushed = [Int]() - - func push(_ element: Int) async { - buffer.append(element) - pushed.append(element) - } - func pop() async throws -> Int? { - if buffer.count > 0 { - let value = buffer.removeFirst() - if value == 3 { - throw Failure() - } - return value - } - return nil - } - } - let delegate = BufferDelegate() + let pastEnd = await iterator.next() + XCTAssertEqual(value, pastEnd) + } + func test_given_a_failable_base_sequence_when_buffering_with_bounded_then_the_failure_is_forwarded() async { + // Given var gated = GatedSequence([1, 2, 3, 4, 5, 6, 7]) - let sequence = gated.buffer { delegate } - var iterator = sequence.makeAsyncIterator() + let base = gated.map { try throwOn(3, $0) } + let buffered = base.buffer(policy: .bounded(5)) + var iterator = buffered.makeAsyncIterator() + // When gated.advance() + + // Then var value = try! await iterator.next() XCTAssertEqual(value, 1) + + // When gated.advance() gated.advance() gated.advance() + + // Then value = try! await iterator.next() XCTAssertEqual(value, 2) + // When gated.advance() gated.advance() gated.advance() gated.advance() + + // Then do { value = try await iterator.next() XCTFail("next() should have thrown.") - } catch { } + } catch { + XCTAssert(error is Failure) + } - value = try! await iterator.next() - XCTAssertNil(value) + var pastFailure = try! await iterator.next() + XCTAssertNil(pastFailure) - value = try! await iterator.next() - XCTAssertNil(value) + pastFailure = try! await iterator.next() + XCTAssertNil(pastFailure) - value = try! await iterator.next() - XCTAssertNil(value) + pastFailure = try! await iterator.next() + XCTAssertNil(pastFailure) - value = try! await iterator.next() - XCTAssertNil(value) + pastFailure = try! await iterator.next() + XCTAssertNil(pastFailure) - value = try! await iterator.next() - XCTAssertNil(value) - } - - func test_byteBuffer() async { - actor ByteBuffer: AsyncBuffer { - var buffer: [UInt8]? - - func push(_ element: UInt8) async { - if buffer == nil { - buffer = [UInt8]() - } - buffer?.append(element) - } - - func pop() async -> [UInt8]? { - defer { buffer = nil } - return buffer - } - } - - var data = Data() - for _ in 0..<4096 { - data.append(UInt8.random(in: 0..