Skip to content

Tolerate cancelling before we create our task #250

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,9 @@ struct CombineLatestStateMachine<
mutating func cancelled() -> CancelledAction? {
switch self.state {
case .initial:
preconditionFailure("Internal inconsistency current state \(self.state) and received cancelled()")
state = .finished

return .none

case .waitingForDemand(let task, let upstreams, _):
// The downstream task got cancelled so we need to cancel our upstream Task
Expand Down
5 changes: 2 additions & 3 deletions Sources/AsyncAlgorithms/Debounce/DebounceStateMachine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -583,9 +583,8 @@ struct DebounceStateMachine<Base: AsyncSequence, C: Clock> {
mutating func cancelled() -> CancelledAction? {
switch self.state {
case .initial:
// Since we are transitioning to `merging` before we return from `makeAsyncIterator`
// this can never happen
preconditionFailure("Internal inconsistency current state \(self.state) and received cancelled()")
state = .finished
return .none

case .waitingForDemand:
// We got cancelled before we event got any demand. This can happen if a cancelled task
Expand Down
8 changes: 5 additions & 3 deletions Sources/AsyncAlgorithms/Merge/MergeStateMachine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -465,9 +465,11 @@ struct MergeStateMachine<
mutating func cancelled() -> CancelledAction {
switch state {
case .initial:
// Since we are transitioning to `merging` before we return from `makeAsyncIterator`
// this can never happen
preconditionFailure("Internal inconsistency current state \(self.state) and received cancelled()")
// Since we are only transitioning to merging when the task is started we
// can be cancelled already.
state = .finished

return .none

case let .merging(task, _, upstreamContinuations, _, .some(downstreamContinuation)):
// The downstream Task got cancelled so we need to cancel our upstream Task
Expand Down
4 changes: 3 additions & 1 deletion Sources/AsyncAlgorithms/Zip/ZipStateMachine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,9 @@ struct ZipStateMachine<
mutating func cancelled() -> CancelledAction? {
switch self.state {
case .initial:
preconditionFailure("Internal inconsistency current state \(self.state) and received cancelled()")
state = .finished

return .none

case .waitingForDemand(let task, let upstreams):
// The downstream task got cancelled so we need to cancel our upstream Task
Expand Down
10 changes: 10 additions & 0 deletions Tests/AsyncAlgorithmsTests/TestCombineLatest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,16 @@ final class TestCombineLatest2: XCTestCase {
task.cancel()
wait(for: [finished], timeout: 1.0)
}

func test_combineLatest_when_cancelled() async {
let t = Task {
try? await Task.sleep(nanoseconds: 1_000_000_000)
let c1 = Indefinite(value: "test1").async
let c2 = Indefinite(value: "test1").async
for await _ in combineLatest(c1, c2) {}
}
t.cancel()
}
}

final class TestCombineLatest3: XCTestCase {
Expand Down
11 changes: 11 additions & 0 deletions Tests/AsyncAlgorithmsTests/TestDebounce.swift
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,15 @@ final class TestDebounce: XCTestCase {
let throwingDebounce = [1].async.map { try throwOn(2, $0) }.debounce(for: .zero, clock: ContinuousClock())
for try await _ in throwingDebounce {}
}

func test_debounce_when_cancelled() async throws {
guard #available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) else { throw XCTSkip("Skipped due to Clock/Instant/Duration availability") }

let t = Task {
try? await Task.sleep(nanoseconds: 1_000_000_000)
let c1 = Indefinite(value: "test1").async
for await _ in c1.debounce(for: .seconds(1), clock: .continuous) {}
}
t.cancel()
}
}
10 changes: 10 additions & 0 deletions Tests/AsyncAlgorithmsTests/TestMerge.swift
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,16 @@ final class TestMerge2: XCTestCase {
task.cancel()
wait(for: [finished], timeout: 1.0)
}

func test_merge_when_cancelled() async {
let t = Task {
try? await Task.sleep(nanoseconds: 1_000_000_000)
let c1 = Indefinite(value: "test1").async
let c2 = Indefinite(value: "test1").async
for await _ in merge(c1, c2) {}
}
t.cancel()
}
}

final class TestMerge3: XCTestCase {
Expand Down
10 changes: 10 additions & 0 deletions Tests/AsyncAlgorithmsTests/TestZip.swift
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,16 @@ final class TestZip2: XCTestCase {
task.cancel()
wait(for: [finished], timeout: 1.0)
}

func test_zip_when_cancelled() async {
let t = Task {
try? await Task.sleep(nanoseconds: 1_000_000_000)
let c1 = Indefinite(value: "test1").async
let c2 = Indefinite(value: "test1").async
for await _ in zip(c1, c2) {}
}
t.cancel()
}
}

final class TestZip3: XCTestCase {
Expand Down