Skip to content

Commit 856956d

Browse files
committed
Schedule deadline timeout
1 parent 298525f commit 856956d

File tree

5 files changed

+144
-17
lines changed

5 files changed

+144
-17
lines changed

Sources/AsyncHTTPClient/AsyncAwait/HTTPClient+execute.swift

Lines changed: 36 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -94,13 +94,20 @@ extension HTTPClient {
9494
let cancelHandler = TransactionCancelHandler()
9595

9696
return try await withTaskCancellationHandler(operation: { () async throws -> HTTPClientResponse in
97-
try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<HTTPClientResponse, Swift.Error>) -> Void in
97+
let eventLoop = self.eventLoopGroup.any()
98+
let deadlineTask = eventLoop.scheduleTask(deadline: deadline) {
99+
cancelHandler.cancel(reason: .deadlineExceeded)
100+
}
101+
defer {
102+
deadlineTask.cancel()
103+
}
104+
return try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<HTTPClientResponse, Swift.Error>) -> Void in
98105
let transaction = Transaction(
99106
request: request,
100107
requestOptions: .init(idleReadTimeout: nil),
101108
logger: logger,
102109
connectionDeadline: deadline,
103-
preferredEventLoop: self.eventLoopGroup.next(),
110+
preferredEventLoop: eventLoop,
104111
responseContinuation: continuation
105112
)
106113

@@ -109,7 +116,7 @@ extension HTTPClient {
109116
self.poolManager.executeRequest(transaction)
110117
}
111118
}, onCancel: {
112-
cancelHandler.cancel()
119+
cancelHandler.cancel(reason: .taskCanceled)
113120
})
114121
}
115122
}
@@ -119,22 +126,38 @@ extension HTTPClient {
119126
/// in the `body` closure and cancelation from the `onCancel` closure of `withTaskCancellationHandler`.
120127
@available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *)
121128
private actor TransactionCancelHandler {
129+
enum CancelReason {
130+
/// swift concurrency task was canceled
131+
case taskCanceled
132+
/// deadline timeout
133+
case deadlineExceeded
134+
}
135+
122136
private enum State {
123137
case initialised
124138
case register(Transaction)
125-
case cancelled
139+
case cancelled(CancelReason)
126140
}
127141

128142
private var state: State = .initialised
129143

130144
init() {}
131145

146+
private func cancelTransaction(_ transaction: Transaction, for reason: CancelReason) {
147+
switch reason {
148+
case .taskCanceled:
149+
transaction.cancel()
150+
case .deadlineExceeded:
151+
transaction.deadlineExceeded()
152+
}
153+
}
154+
132155
private func _registerTransaction(_ transaction: Transaction) {
133156
switch self.state {
134157
case .initialised:
135158
self.state = .register(transaction)
136-
case .cancelled:
137-
transaction.cancel()
159+
case .cancelled(let reason):
160+
self.cancelTransaction(transaction, for: reason)
138161
case .register:
139162
preconditionFailure("transaction already set")
140163
}
@@ -146,21 +169,21 @@ private actor TransactionCancelHandler {
146169
}
147170
}
148171

149-
private func _cancel() {
172+
private func _cancel(reason: CancelReason) {
150173
switch self.state {
151-
case .register(let bag):
152-
self.state = .cancelled
153-
bag.cancel()
174+
case .register(let transaction):
175+
self.state = .cancelled(reason)
176+
self.cancelTransaction(transaction, for: reason)
154177
case .cancelled:
155178
break
156179
case .initialised:
157-
self.state = .cancelled
180+
self.state = .cancelled(reason)
158181
}
159182
}
160183

161-
nonisolated func cancel() {
184+
nonisolated func cancel(reason: CancelReason) {
162185
Task {
163-
await self._cancel()
186+
await self._cancel(reason: reason)
164187
}
165188
}
166189
}

Sources/AsyncHTTPClient/AsyncAwait/Transaction+StateMachine.swift

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -674,6 +674,67 @@ extension Transaction {
674674
preconditionFailure("Already received an eof or error before. Must not receive further events. Invalid state: \(self.state)")
675675
}
676676
}
677+
678+
enum DeadlineExceededAction {
679+
case none
680+
/// fail response before head received. scheduler and executor are exclusive here.
681+
case cancel(
682+
requestContinuation: CheckedContinuation<HTTPClientResponse, Error>,
683+
scheduler: HTTPRequestScheduler?,
684+
executor: HTTPRequestExecutor?,
685+
bodyStreamContinuation: CheckedContinuation<Void, Error>?
686+
)
687+
}
688+
689+
mutating func deadlineExceeded() -> DeadlineExceededAction {
690+
let error = HTTPClientError.deadlineExceeded
691+
switch self.state {
692+
case .initialized(let continuation):
693+
self.state = .finished(error: error, nil)
694+
return .cancel(
695+
requestContinuation: continuation,
696+
scheduler: nil,
697+
executor: nil,
698+
bodyStreamContinuation: nil
699+
)
700+
701+
case .queued(let continuation, let scheduler):
702+
self.state = .finished(error: error, nil)
703+
return .cancel(
704+
requestContinuation: continuation,
705+
scheduler: scheduler,
706+
executor: nil,
707+
bodyStreamContinuation: nil
708+
)
709+
710+
case .executing(let context, let requestStreamState, .waitingForResponseHead):
711+
switch requestStreamState {
712+
case .paused(continuation: .some(let continuation)):
713+
self.state = .finished(error: error, nil)
714+
return .cancel(
715+
requestContinuation: context.continuation,
716+
scheduler: nil,
717+
executor: context.executor,
718+
bodyStreamContinuation: continuation
719+
)
720+
case .requestHeadSent, .finished, .producing, .paused(continuation: .none):
721+
self.state = .finished(error: error, nil)
722+
return .cancel(
723+
requestContinuation: context.continuation,
724+
scheduler: nil,
725+
executor: context.executor,
726+
bodyStreamContinuation: nil
727+
)
728+
}
729+
730+
case .executing, .finished:
731+
// The user specified deadline is only used until we received the response head.
732+
// If we already received the head, we have also resumed the continuation and
733+
// therefore return the HTTPClientResponse to the user. We do not want to cancel
734+
// the request body streaming nor the response body streaming afterwards.
735+
return .none
736+
}
737+
}
677738
}
678739
}
679740

Sources/AsyncHTTPClient/AsyncAwait/Transaction.swift

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,26 @@ extension Transaction: HTTPExecutableRequest {
294294
bodyStreamContinuation.resume(throwing: error)
295295
}
296296
}
297+
298+
func deadlineExceeded() {
299+
let action = self.stateLock.withLock {
300+
self.state.deadlineExceeded()
301+
}
302+
self.performDeadlineExceededAction(action)
303+
}
304+
305+
private func performDeadlineExceededAction(_ action: StateMachine.DeadlineExceededAction) {
306+
switch action {
307+
case .cancel(let requestContinuation, let scheduler, let executor, let bodyStreamContinuation):
308+
requestContinuation.resume(throwing: HTTPClientError.deadlineExceeded)
309+
scheduler?.cancelRequest(self)
310+
executor?.cancelRequest(self)
311+
bodyStreamContinuation?.resume(throwing: HTTPClientError.deadlineExceeded)
312+
313+
case .none:
314+
break
315+
}
316+
}
297317
}
298318

299319
@available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *)

Tests/AsyncHTTPClientTests/AsyncAwaitEndToEndTests+XCTest.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ extension AsyncAwaitEndToEndTests {
3737
("testPostWithFragmentedAsyncSequenceOfLargeByteBuffers", testPostWithFragmentedAsyncSequenceOfLargeByteBuffers),
3838
("testCanceling", testCanceling),
3939
("testDeadline", testDeadline),
40-
("testConnectTimeout", testConnectTimeout),
40+
("testImmediateDeadline", testImmediateDeadline),
4141
("testInvalidURL", testInvalidURL),
4242
]
4343
}

Tests/AsyncHTTPClientTests/AsyncAwaitEndToEndTests.swift

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,6 @@ final class AsyncAwaitEndToEndTests: XCTestCase {
346346
}
347347

348348
func testDeadline() throws {
349-
try XCTSkipIf(true, "deadline is currently not correctly implemented. We only use it to timeout connection establishment. will be fixed in a follow up PR")
350349
#if compiler(>=5.5) && canImport(_Concurrency)
351350
guard #available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *) else { return }
352351
XCTAsyncTest(timeout: 5) {
@@ -358,15 +357,39 @@ final class AsyncAwaitEndToEndTests: XCTestCase {
358357
let request = HTTPClientRequest(url: "https://localhost:\(bin.port)/wait")
359358

360359
let task = Task<HTTPClientResponse, Error> { [request] in
361-
try await client.execute(request, deadline: .now() + .seconds(1), logger: logger)
360+
try await client.execute(request, deadline: .now() + .milliseconds(100), logger: logger)
362361
}
363362
await XCTAssertThrowsError(try await task.value) {
364-
XCTAssertEqual($0 as? HTTPClientError, HTTPClientError.readTimeout)
363+
XCTAssertEqual($0 as? HTTPClientError, HTTPClientError.deadlineExceeded)
365364
}
366365
}
367366
#endif
368367
}
369368

369+
func testImmediateDeadline() throws {
370+
// does not work on nether Linux nor Darwin
371+
try XCTSkipIf(true, "test times out because of a swift concurrency bug: https://bugs.swift.org/browse/SR-15592")
372+
#if compiler(>=5.5) && canImport(_Concurrency)
373+
guard #available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *) else { return }
374+
XCTAsyncTest(timeout: 5) {
375+
let bin = HTTPBin(.http2(compress: false))
376+
defer { XCTAssertNoThrow(try bin.shutdown()) }
377+
let client = makeDefaultHTTPClient()
378+
defer { XCTAssertNoThrow(try client.syncShutdown()) }
379+
let logger = Logger(label: "HTTPClient", factory: StreamLogHandler.standardOutput(label:))
380+
let request = HTTPClientRequest(url: "https://localhost:\(bin.port)/wait")
381+
382+
let task = Task<HTTPClientResponse, Error> { [request] in
383+
try await client.execute(request, deadline: .now(), logger: logger)
384+
}
385+
await XCTAssertThrowsError(try await task.value) {
386+
XCTAssertEqual($0 as? HTTPClientError, HTTPClientError.deadlineExceeded)
387+
}
388+
print("done")
389+
}
390+
#endif
391+
}
392+
370393
func testInvalidURL() {
371394
#if compiler(>=5.5) && canImport(_Concurrency)
372395
guard #available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *) else { return }

0 commit comments

Comments
 (0)