From 4ba8eeb20f78d252dce799d1c1a271e0ccb2e14d Mon Sep 17 00:00:00 2001 From: Tristan Celder Date: Thu, 20 Oct 2022 17:29:27 +0100 Subject: [PATCH 1/6] Add Deferred algorithm --- Evolution/NNNN-deferred.md | 109 ++++++++++++++++++ .../AsyncDeferredSequence.swift | 77 +++++++++++++ Tests/AsyncAlgorithmsTests/TestDeferred.swift | 82 +++++++++++++ 3 files changed, 268 insertions(+) create mode 100644 Evolution/NNNN-deferred.md create mode 100644 Sources/AsyncAlgorithms/AsyncDeferredSequence.swift create mode 100644 Tests/AsyncAlgorithmsTests/TestDeferred.swift diff --git a/Evolution/NNNN-deferred.md b/Evolution/NNNN-deferred.md new file mode 100644 index 00000000..c06e7ee7 --- /dev/null +++ b/Evolution/NNNN-deferred.md @@ -0,0 +1,109 @@ +# Deferred + +* Proposal: [NNNN](NNNN-deferred.md) +* Authors: [Tristan Celder](https://github.com/tcldr) +* Review Manager: TBD +* Status: **Awaiting implementation** + +* Implementation: [[Source](https://github.com/tcldr/swift-async-algorithms/blob/pr/deferred/Sources/AsyncAlgorithms/AsyncDeferredSequence.swift) | +[Tests](https://github.com/tcldr/swift-async-algorithms/blob/pr/deferred/Tests/AsyncAlgorithmsTests/TestDeferred.swift)] +* Decision Notes: [Additional Commentary](https://forums.swift.org/) +* Bugs: + +## Introduction + +`AsyncDeferredSequence` provides a convenient way to postpone the initialization of a sequence to the point where it is requested by a sequence consumer. + +## Motivation + +Some source sequences may perform expensive work on initialization. This could be network activity, sensor activity, or anything else that consumes system resources. While this can be mitigated in some simple situtations by only passing around a sequence at the point of use, often it is favorable to be able to pass a sequence to its eventual point of use without commencing its initialization process. This is especially true for sequences which are intended for multicast/broadcast for which a reliable startup and shutdown procedure is essential. + +A simple example of a seqeunce which may benefit from being deferred is provided in the documentation for AsyncStream: + +```swift +extension QuakeMonitor { + + static var quakes: AsyncStream { + AsyncStream { continuation in + let monitor = QuakeMonitor() + monitor.quakeHandler = { quake in + continuation.yield(quake) + } + continuation.onTermination = { @Sendable _ in + monitor.stopMonitoring() + } + monitor.startMonitoring() + } + } +} +``` + +In the supplied code sample, the closure provided to the AsyncStream initializer will be executed immediately upon initialization; `QuakeMonitor.startMonitoring()` will be called, and the stream will then begin buffering its contents waiting to be iterated. Whilst this behavior is sometimes desirable, on other occasions it can cause system resources to be consumed unnecessarily. + +```swift +let nonDeferredSequence = QuakeMonitor.quakes // `Quake.startMonitoring()` is called now! + +... +// at some arbitrary point, possibly hours later... +for await quake in nonDeferredSequence { + print("Quake: \(quake.date)") +} +// Prints out hours of previously buffered quake data before showing the latest +``` + +## Proposed solution + +`AsyncDeferredSequence` provides a way to postpone the initialization of an an arbitrary async sequence until the point of use: + +```swift +let deferredSequence = deferred(QuakeMonitor.quakes) // Now, initialization is postponed + +... +// at some arbitrary point, possibly hours later... +for await quake in deferredSequence { // `Quake.startMonitoring()` is now called + print("Quake: \(quake.date)") +} +// Prints out only the latest quake data +``` + +Now, potentially expensive system resources are consumed only at the point they're needed. + +## Detailed design + +`AsyncDeferredSequence` is a trivial algorithm supported by some convenience functions. + +### Functions + +```swift +public func deferred(_ createSequence: @escaping @Sendable () async -> Base) -> AsyncDeferredSequence +public func deferred(_ createSequence: @autoclosure @escaping @Sendable () -> Base) -> AsyncDeferredSequence +``` + +The synchronous function can be auto-escaped, simplifying the call-site. While the async variant allows a sequence to be initialized within a concurrency context other than that of the end consumer. + +```swift +public struct AsyncDeferredSequence: Sendable { + public typealias Element = Base.Element + public struct Iterator: AsyncIteratorProtocol { + public mutating func next() async rethrows -> Element? + } + public func makeAsyncIterator() -> Iterator +} +``` + +### Naming + +The `deferred(_:)` function takes its inspiration from the Combine publisher of the same name with similar functionality. However, `lazy(_:)` could be quite fitting, too. + +### Comparison with other libraries + +**ReactiveX** ReactiveX has an [API definition of Defer](https://reactivex.io/documentation/operators/defer.html) as a top level operator for generating observables. + +**Combine** Combine has an [API definition of Deferred](https://developer.apple.com/documentation/combine/deferred) as a top-level convenience publisher. + + +## Effect on API resilience + +Deferred has a trivial implementation and is marked as `@frozen` and `@inlinable`. This removes the ability of this type and functions to be ABI resilient boundaries at the benefit of being highly optimizable. + +## Alternatives considered diff --git a/Sources/AsyncAlgorithms/AsyncDeferredSequence.swift b/Sources/AsyncAlgorithms/AsyncDeferredSequence.swift new file mode 100644 index 00000000..0b4a5694 --- /dev/null +++ b/Sources/AsyncAlgorithms/AsyncDeferredSequence.swift @@ -0,0 +1,77 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2022 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +@inlinable +public func deferred(_ createSequence: @escaping @Sendable () async -> Base) -> AsyncDeferredSequence { + AsyncDeferredSequence(createSequence) +} + +@inlinable +public func deferred(_ createSequence: @autoclosure @escaping @Sendable () -> Base) -> AsyncDeferredSequence { + AsyncDeferredSequence(createSequence) +} + +public struct AsyncDeferredSequence { + + @usableFromInline + let createSequence: @Sendable () async -> Base + + @usableFromInline + init(_ createSequence: @escaping @Sendable () async -> Base) { + self.createSequence = createSequence + } +} + +extension AsyncDeferredSequence: AsyncSequence { + + public typealias Element = Base.Element + + public struct Iterator: AsyncIteratorProtocol { + + @usableFromInline + enum State { + case pending(@Sendable () async -> Base) + case active(Base.AsyncIterator) + } + + @usableFromInline + var state: State + + @usableFromInline + init(_ createSequence: @escaping @Sendable () async -> Base) { + self.state = .pending(createSequence) + } + + @inlinable + public mutating func next() async rethrows -> Element? { + switch state { + case .pending(let generator): + state = .active(await generator().makeAsyncIterator()) + return try await next() + case .active(var base): + if let value = try await base.next() { + state = .active(base) + return value + } + else { + return nil + } + } + } + } + + @inlinable + public func makeAsyncIterator() -> Iterator { + Iterator(createSequence) + } +} + +extension AsyncDeferredSequence: Sendable { } diff --git a/Tests/AsyncAlgorithmsTests/TestDeferred.swift b/Tests/AsyncAlgorithmsTests/TestDeferred.swift new file mode 100644 index 00000000..5c579673 --- /dev/null +++ b/Tests/AsyncAlgorithmsTests/TestDeferred.swift @@ -0,0 +1,82 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2022 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +@preconcurrency import XCTest +import AsyncAlgorithms + +final class TestDeferred: XCTestCase { + func test_deferred() async { + let expected = [0,1,2,3,4] + let sequence = deferred { + return expected.async + } + var actual = [Int]() + for await item in sequence { + actual.append(item) + } + XCTAssertEqual(expected, actual) + } + + func test_deferred_remains_idle_pending_consumer() async { + let expectation = expectation(description: "pending") + expectation.isInverted = true + let _ = deferred { + AsyncStream { continuation in + expectation.fulfill() + continuation.yield(0) + continuation.finish() + } + } + wait(for: [expectation], timeout: 1.0) + } + + func test_deferred_generates_new_sequence_per_consumer() async { + let expectation = expectation(description: "started") + expectation.expectedFulfillmentCount = 3 + let sequence = deferred { + AsyncStream { continuation in + expectation.fulfill() + continuation.yield(0) + continuation.finish() + } + } + for await _ in sequence { } + for await _ in sequence { } + for await _ in sequence { } + wait(for: [expectation], timeout: 1.0) + } + + func test_deferred_throws() async { + let expectation = expectation(description: "throws") + let sequence = deferred { + AsyncThrowingStream { continuation in + continuation.finish(throwing: Failure()) + } + } + do { + for try await _ in sequence { } + } + catch { + expectation.fulfill() + } + wait(for: [expectation], timeout: 1.0) + } + + func test_deferred_autoclosure() async { + let expected = [0,1,2,3,4] + let sequence = deferred(expected.async) + var actual = [Int]() + for await item in sequence { + actual.append(item) + } + XCTAssertEqual(expected, actual) + } +} From fb735da60ee4bd6a6617a82689ede55005b7ee2b Mon Sep 17 00:00:00 2001 From: Tristan Celder Date: Thu, 20 Oct 2022 21:11:40 +0100 Subject: [PATCH 2/6] Update Deferred with review feedback --- Evolution/NNNN-deferred.md | 11 +++-- .../AsyncDeferredSequence.swift | 35 +++++++++++----- Tests/AsyncAlgorithmsTests/TestDeferred.swift | 40 +++++++++++++++++-- 3 files changed, 69 insertions(+), 17 deletions(-) diff --git a/Evolution/NNNN-deferred.md b/Evolution/NNNN-deferred.md index c06e7ee7..d411efd5 100644 --- a/Evolution/NNNN-deferred.md +++ b/Evolution/NNNN-deferred.md @@ -75,14 +75,19 @@ Now, potentially expensive system resources are consumed only at the point they' ### Functions ```swift -public func deferred(_ createSequence: @escaping @Sendable () async -> Base) -> AsyncDeferredSequence -public func deferred(_ createSequence: @autoclosure @escaping @Sendable () -> Base) -> AsyncDeferredSequence +public func deferred( + _ createSequence: @escaping @Sendable () async -> Base +) -> AsyncDeferredSequence where Base: AsyncSequence, Base: Sendable + +public func deferred( + _ createSequence: @autoclosure @escaping @Sendable () -> Base +) -> AsyncDeferredSequence where Base: AsyncSequence, Base: Sendable ``` The synchronous function can be auto-escaped, simplifying the call-site. While the async variant allows a sequence to be initialized within a concurrency context other than that of the end consumer. ```swift -public struct AsyncDeferredSequence: Sendable { +public struct AsyncDeferredSequence where Base: AsyncSequence, Base: Sendable { public typealias Element = Base.Element public struct Iterator: AsyncIteratorProtocol { public mutating func next() async rethrows -> Element? diff --git a/Sources/AsyncAlgorithms/AsyncDeferredSequence.swift b/Sources/AsyncAlgorithms/AsyncDeferredSequence.swift index 0b4a5694..ec786e82 100644 --- a/Sources/AsyncAlgorithms/AsyncDeferredSequence.swift +++ b/Sources/AsyncAlgorithms/AsyncDeferredSequence.swift @@ -10,21 +10,26 @@ //===----------------------------------------------------------------------===// @inlinable -public func deferred(_ createSequence: @escaping @Sendable () async -> Base) -> AsyncDeferredSequence { +public func deferred( + _ createSequence: @escaping @Sendable () async -> Base +) -> AsyncDeferredSequence where Base: AsyncSequence, Base: Sendable { AsyncDeferredSequence(createSequence) } @inlinable -public func deferred(_ createSequence: @autoclosure @escaping @Sendable () -> Base) -> AsyncDeferredSequence { +public func deferred( + _ createSequence: @autoclosure @escaping @Sendable () -> Base +) -> AsyncDeferredSequence where Base: AsyncSequence, Base: Sendable { AsyncDeferredSequence(createSequence) } -public struct AsyncDeferredSequence { +@frozen +public struct AsyncDeferredSequence where Base: AsyncSequence, Base: Sendable { @usableFromInline let createSequence: @Sendable () async -> Base - @usableFromInline + @inlinable init(_ createSequence: @escaping @Sendable () async -> Base) { self.createSequence = createSequence } @@ -40,12 +45,13 @@ extension AsyncDeferredSequence: AsyncSequence { enum State { case pending(@Sendable () async -> Base) case active(Base.AsyncIterator) + case terminal } @usableFromInline var state: State - @usableFromInline + @inlinable init(_ createSequence: @escaping @Sendable () async -> Base) { self.state = .pending(createSequence) } @@ -57,13 +63,22 @@ extension AsyncDeferredSequence: AsyncSequence { state = .active(await generator().makeAsyncIterator()) return try await next() case .active(var base): - if let value = try await base.next() { - state = .active(base) - return value + do { + if let value = try await base.next() { + state = .active(base) + return value + } + else { + state = .terminal + return nil + } } - else { - return nil + catch let error { + state = .terminal + throw error } + case .terminal: + return nil } } } diff --git a/Tests/AsyncAlgorithmsTests/TestDeferred.swift b/Tests/AsyncAlgorithmsTests/TestDeferred.swift index 5c579673..80b35e58 100644 --- a/Tests/AsyncAlgorithmsTests/TestDeferred.swift +++ b/Tests/AsyncAlgorithmsTests/TestDeferred.swift @@ -18,11 +18,14 @@ final class TestDeferred: XCTestCase { let sequence = deferred { return expected.async } + var iterator = sequence.makeAsyncIterator() var actual = [Int]() - for await item in sequence { + while let item = await iterator.next() { actual.append(item) } XCTAssertEqual(expected, actual) + let pastEnd = await iterator.next() + XCTAssertNil(pastEnd) } func test_deferred_remains_idle_pending_consumer() async { @@ -57,26 +60,55 @@ final class TestDeferred: XCTestCase { func test_deferred_throws() async { let expectation = expectation(description: "throws") let sequence = deferred { - AsyncThrowingStream { continuation in + AsyncThrowingStream { continuation in continuation.finish(throwing: Failure()) } } + var iterator = sequence.makeAsyncIterator() do { - for try await _ in sequence { } + while let _ = try await iterator.next() { } } catch { expectation.fulfill() } wait(for: [expectation], timeout: 1.0) + let pastEnd = try! await iterator.next() + XCTAssertNil(pastEnd) } func test_deferred_autoclosure() async { let expected = [0,1,2,3,4] let sequence = deferred(expected.async) + var iterator = sequence.makeAsyncIterator() var actual = [Int]() - for await item in sequence { + while let item = await iterator.next() { actual.append(item) } XCTAssertEqual(expected, actual) + let pastEnd = await iterator.next() + XCTAssertNil(pastEnd) + } + + func test_deferred_cancellation() async { + let source = Indefinite(value: 0) + let sequence = deferred(source.async) + let finished = expectation(description: "finished") + let iterated = expectation(description: "iterated") + let task = Task { + var firstIteration = false + for await _ in sequence { + if !firstIteration { + firstIteration = true + iterated.fulfill() + } + } + finished.fulfill() + } + // ensure the other task actually starts + wait(for: [iterated], timeout: 1.0) + // cancellation should ensure the loop finishes + // without regards to the remaining underlying sequence + task.cancel() + wait(for: [finished], timeout: 1.0) } } From d603dfca17101081f42ffe18f86c47d531dadad8 Mon Sep 17 00:00:00 2001 From: Tristan Celder Date: Mon, 24 Oct 2022 14:17:56 +0100 Subject: [PATCH 3/6] Update Deferred with review feedback --- Evolution/NNNN-deferred.md | 2 +- Sources/AsyncAlgorithms/AsyncDeferredSequence.swift | 7 +++++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/Evolution/NNNN-deferred.md b/Evolution/NNNN-deferred.md index d411efd5..75e98145 100644 --- a/Evolution/NNNN-deferred.md +++ b/Evolution/NNNN-deferred.md @@ -53,7 +53,7 @@ for await quake in nonDeferredSequence { ## Proposed solution -`AsyncDeferredSequence` provides a way to postpone the initialization of an an arbitrary async sequence until the point of use: +`AsyncDeferredSequence` uses a supplied closure to create a new asynchronous sequence each time it is iterated. This has the effect of postponing the initialization of an an arbitrary async sequence until the point of use: ```swift let deferredSequence = deferred(QuakeMonitor.quakes) // Now, initialization is postponed diff --git a/Sources/AsyncAlgorithms/AsyncDeferredSequence.swift b/Sources/AsyncAlgorithms/AsyncDeferredSequence.swift index ec786e82..440f55bb 100644 --- a/Sources/AsyncAlgorithms/AsyncDeferredSequence.swift +++ b/Sources/AsyncAlgorithms/AsyncDeferredSequence.swift @@ -9,6 +9,8 @@ // //===----------------------------------------------------------------------===// +/// Creates an asynchronous sequence that uses the supplied closure to create a new asynchronous sequence each +/// time it is iterated @inlinable public func deferred( _ createSequence: @escaping @Sendable () async -> Base @@ -16,6 +18,8 @@ public func deferred( AsyncDeferredSequence(createSequence) } +/// Creates an asynchronous sequence that uses the supplied closure to create a new asynchronous sequence each +/// time it is iterated @inlinable public func deferred( _ createSequence: @autoclosure @escaping @Sendable () -> Base @@ -90,3 +94,6 @@ extension AsyncDeferredSequence: AsyncSequence { } extension AsyncDeferredSequence: Sendable { } + +@available(*, unavailable) +extension AsyncDeferredSequence.Iterator: Sendable { } From 5df79f0a0b35871697227d306a5b32afb6f62719 Mon Sep 17 00:00:00 2001 From: Tristan Date: Mon, 24 Oct 2022 14:57:46 +0100 Subject: [PATCH 4/6] Update Evolution/NNNN-deferred.md Co-authored-by: Franz Busch --- Evolution/NNNN-deferred.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Evolution/NNNN-deferred.md b/Evolution/NNNN-deferred.md index 75e98145..4000b36a 100644 --- a/Evolution/NNNN-deferred.md +++ b/Evolution/NNNN-deferred.md @@ -53,7 +53,7 @@ for await quake in nonDeferredSequence { ## Proposed solution -`AsyncDeferredSequence` uses a supplied closure to create a new asynchronous sequence each time it is iterated. This has the effect of postponing the initialization of an an arbitrary async sequence until the point of use: +`AsyncDeferredSequence` uses a supplied closure to create a new asynchronous sequence. The closure is executed for each iterator on the first call to `next`. This has the effect of postponing the initialization of an arbitrary async sequence until the point of first demand: ```swift let deferredSequence = deferred(QuakeMonitor.quakes) // Now, initialization is postponed From 7be63836f84a04980b66616e003d3354118f5161 Mon Sep 17 00:00:00 2001 From: Tristan Date: Mon, 24 Oct 2022 14:59:27 +0100 Subject: [PATCH 5/6] Update AsyncDeferredSequence docs Co-authored-by: Franz Busch --- Sources/AsyncAlgorithms/AsyncDeferredSequence.swift | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/Sources/AsyncAlgorithms/AsyncDeferredSequence.swift b/Sources/AsyncAlgorithms/AsyncDeferredSequence.swift index 440f55bb..d7187827 100644 --- a/Sources/AsyncAlgorithms/AsyncDeferredSequence.swift +++ b/Sources/AsyncAlgorithms/AsyncDeferredSequence.swift @@ -9,8 +9,9 @@ // //===----------------------------------------------------------------------===// -/// Creates an asynchronous sequence that uses the supplied closure to create a new asynchronous sequence each -/// time it is iterated +/// Creates a ``AsyncDeferredSequence`` that uses the supplied closure to create a new `AsyncSequence`. +/// The closure is executed for each iterator on the first call to `next`. +/// This has the effect of postponing the initialization of an arbitrary `AsyncSequence` until the point of first demand. @inlinable public func deferred( _ createSequence: @escaping @Sendable () async -> Base From 9754d2236d60921e1cececdc4cf787cba1cc9124 Mon Sep 17 00:00:00 2001 From: Tristan Date: Mon, 24 Oct 2022 14:59:42 +0100 Subject: [PATCH 6/6] Update AsyncDeferredSequence docs Co-authored-by: Franz Busch --- Sources/AsyncAlgorithms/AsyncDeferredSequence.swift | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/Sources/AsyncAlgorithms/AsyncDeferredSequence.swift b/Sources/AsyncAlgorithms/AsyncDeferredSequence.swift index d7187827..97b2b71d 100644 --- a/Sources/AsyncAlgorithms/AsyncDeferredSequence.swift +++ b/Sources/AsyncAlgorithms/AsyncDeferredSequence.swift @@ -19,8 +19,9 @@ public func deferred( AsyncDeferredSequence(createSequence) } -/// Creates an asynchronous sequence that uses the supplied closure to create a new asynchronous sequence each -/// time it is iterated +/// Creates a ``AsyncDeferredSequence`` that uses the supplied closure to create a new `AsyncSequence`. +/// The closure is executed for each iterator on the first call to `next`. +/// This has the effect of postponing the initialization of an arbitrary `AsyncSequence` until the point of first demand. @inlinable public func deferred( _ createSequence: @autoclosure @escaping @Sendable () -> Base