Skip to content

Report last connection error if request deadline is exceeded with async/await API #608

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 35 additions & 11 deletions Sources/AsyncHTTPClient/AsyncAwait/Transaction+StateMachine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ extension Transaction {
private enum State {
case initialized(CheckedContinuation<HTTPClientResponse, Error>)
case queued(CheckedContinuation<HTTPClientResponse, Error>, HTTPRequestScheduler)
case deadlineExceededWhileQueued(CheckedContinuation<HTTPClientResponse, Error>)
case executing(ExecutionContext, RequestStreamState, ResponseStreamState)
case finished(error: Error?, HTTPClientResponse.Body.IteratorStream.ID?)
}
Expand Down Expand Up @@ -105,7 +106,20 @@ extension Transaction {
case .queued(let continuation, let scheduler):
self.state = .finished(error: error, nil)
return .failResponseHead(continuation, error, scheduler, nil, bodyStreamContinuation: nil)

case .deadlineExceededWhileQueued(let continuation):
let realError: Error = {
if (error as? HTTPClientError) == .cancelled {
/// if we just get a `HTTPClientError.cancelled` we can use the original cancellation reason
/// to give a more descriptive error to the user.
return HTTPClientError.deadlineExceeded
} else {
/// otherwise we already had an intermediate connection error which we should present to the user instead
return error
}
}()

self.state = .finished(error: realError, nil)
return .failResponseHead(continuation, realError, nil, nil, bodyStreamContinuation: nil)
case .executing(let context, let requestStreamState, .waitingForResponseHead):
switch requestStreamState {
case .paused(continuation: .some(let continuation)):
Expand Down Expand Up @@ -178,6 +192,7 @@ extension Transaction {

enum StartExecutionAction {
case cancel(HTTPRequestExecutor)
case cancelAndFail(HTTPRequestExecutor, CheckedContinuation<HTTPClientResponse, Error>, with: Error)
case none
}

Expand All @@ -191,6 +206,8 @@ extension Transaction {
)
self.state = .executing(context, .requestHeadSent, .waitingForResponseHead)
return .none
case .deadlineExceededWhileQueued(let continuation):
return .cancelAndFail(executor, continuation, with: HTTPClientError.deadlineExceeded)

case .finished(error: .some, .none):
return .cancel(executor)
Expand All @@ -210,7 +227,7 @@ extension Transaction {

mutating func resumeRequestBodyStream() -> ResumeProducingAction {
switch self.state {
case .initialized, .queued:
case .initialized, .queued, .deadlineExceededWhileQueued:
preconditionFailure("Received a resumeBodyRequest on a request, that isn't executing. Invalid state: \(self.state)")

case .executing(let context, .requestHeadSent, let responseState):
Expand Down Expand Up @@ -246,6 +263,7 @@ extension Transaction {
switch self.state {
case .initialized,
.queued,
.deadlineExceededWhileQueued,
.executing(_, .requestHeadSent, _):
preconditionFailure("A request stream can only be resumed, if the request was started")

Expand All @@ -271,6 +289,7 @@ extension Transaction {
switch self.state {
case .initialized,
.queued,
.deadlineExceededWhileQueued,
.executing(_, .requestHeadSent, _):
preconditionFailure("A request stream can only produce, if the request was started. Invalid state: \(self.state)")

Expand Down Expand Up @@ -301,6 +320,7 @@ extension Transaction {
switch self.state {
case .initialized,
.queued,
.deadlineExceededWhileQueued,
.executing(_, .requestHeadSent, _),
.executing(_, .finished, _):
preconditionFailure("A request stream can only produce, if the request was started. Invalid state: \(self.state)")
Expand Down Expand Up @@ -334,6 +354,7 @@ extension Transaction {
switch self.state {
case .initialized,
.queued,
.deadlineExceededWhileQueued,
.executing(_, .finished, _):
preconditionFailure("Invalid state: \(self.state)")

Expand Down Expand Up @@ -372,6 +393,7 @@ extension Transaction {
switch self.state {
case .initialized,
.queued,
.deadlineExceededWhileQueued,
.executing(_, _, .waitingForResponseIterator),
.executing(_, _, .buffering),
.executing(_, _, .waitingForRemote):
Expand Down Expand Up @@ -401,7 +423,7 @@ extension Transaction {

mutating func receiveResponseBodyParts(_ buffer: CircularBuffer<ByteBuffer>) -> ReceiveResponsePartAction {
switch self.state {
case .initialized, .queued:
case .initialized, .queued, .deadlineExceededWhileQueued:
preconditionFailure("Received a response body part, but request hasn't started yet. Invalid state: \(self.state)")

case .executing(_, _, .waitingForResponseHead):
Expand Down Expand Up @@ -457,6 +479,7 @@ extension Transaction {
switch self.state {
case .initialized,
.queued,
.deadlineExceededWhileQueued,
.executing(_, _, .waitingForResponseHead):
preconditionFailure("Got notice about a deinited response, before we even received a response. Invalid state: \(self.state)")

Expand Down Expand Up @@ -486,7 +509,7 @@ extension Transaction {

mutating func responseBodyIteratorDeinited(streamID: HTTPClientResponse.Body.IteratorStream.ID) -> FailAction {
switch self.state {
case .initialized, .queued, .executing(_, _, .waitingForResponseHead):
case .initialized, .queued, .deadlineExceededWhileQueued, .executing(_, _, .waitingForResponseHead):
preconditionFailure("Got notice about a deinited response body iterator, before we even received a response. Invalid state: \(self.state)")

case .executing(_, _, .buffering(let registeredStreamID, _, next: _)),
Expand Down Expand Up @@ -516,6 +539,7 @@ extension Transaction {
switch self.state {
case .initialized,
.queued,
.deadlineExceededWhileQueued,
.executing(_, _, .waitingForResponseHead):
preconditionFailure("If we receive a response body, we must have received a head before")

Expand Down Expand Up @@ -635,6 +659,7 @@ extension Transaction {
switch self.state {
case .initialized,
.queued,
.deadlineExceededWhileQueued,
.executing(_, _, .waitingForResponseHead):
preconditionFailure("Received no response head, but received a response end. Invalid state: \(self.state)")

Expand Down Expand Up @@ -677,6 +702,7 @@ extension Transaction {

enum DeadlineExceededAction {
case none
case cancelSchedulerOnly(scheduler: HTTPRequestScheduler)
/// fail response before head received. scheduler and executor are exclusive here.
case cancel(
requestContinuation: CheckedContinuation<HTTPClientResponse, Error>,
Expand All @@ -699,14 +725,12 @@ extension Transaction {
)

case .queued(let continuation, let scheduler):
self.state = .finished(error: error, nil)
return .cancel(
requestContinuation: continuation,
scheduler: scheduler,
executor: nil,
bodyStreamContinuation: nil
self.state = .deadlineExceededWhileQueued(continuation)
return .cancelSchedulerOnly(
scheduler: scheduler
)

case .deadlineExceededWhileQueued:
return .none
case .executing(let context, let requestStreamState, .waitingForResponseHead):
switch requestStreamState {
case .paused(continuation: .some(let continuation)):
Expand Down
7 changes: 5 additions & 2 deletions Sources/AsyncHTTPClient/AsyncAwait/Transaction.swift
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,9 @@ extension Transaction: HTTPExecutableRequest {
switch action {
case .cancel(let executor):
executor.cancelRequest(self)

case .cancelAndFail(let executor, let continuation, with: let error):
executor.cancelRequest(self)
continuation.resume(throwing: error)
case .none:
break
}
Expand Down Expand Up @@ -309,7 +311,8 @@ extension Transaction: HTTPExecutableRequest {
scheduler?.cancelRequest(self)
executor?.cancelRequest(self)
bodyStreamContinuation?.resume(throwing: HTTPClientError.deadlineExceeded)

case .cancelSchedulerOnly(scheduler: let scheduler):
scheduler.cancelRequest(self)
case .none:
break
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ extension AsyncAwaitEndToEndTests {
("testCanceling", testCanceling),
("testDeadline", testDeadline),
("testImmediateDeadline", testImmediateDeadline),
("testSelfSignedCertificateIsRejectedWithCorrectErrorIfRequestDeadlineIsExceeded", testSelfSignedCertificateIsRejectedWithCorrectErrorIfRequestDeadlineIsExceeded),
("testInvalidURL", testInvalidURL),
("testRedirectChangesHostHeader", testRedirectChangesHostHeader),
("testShutdown", testShutdown),
Expand Down
48 changes: 48 additions & 0 deletions Tests/AsyncHTTPClientTests/AsyncAwaitEndToEndTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import Logging
import NIOCore
import NIOPosix
import NIOSSL
import XCTest

private func makeDefaultHTTPClient(
Expand Down Expand Up @@ -393,6 +394,53 @@ final class AsyncAwaitEndToEndTests: XCTestCase {
#endif
}

func testSelfSignedCertificateIsRejectedWithCorrectErrorIfRequestDeadlineIsExceeded() {
#if compiler(>=5.5.2) && canImport(_Concurrency)
guard #available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) else { return }
XCTAsyncTest(timeout: 5) {
/// key + cert was created with the follwing command:
/// openssl req -x509 -newkey rsa:4096 -keyout self_signed_key.pem -out self_signed_cert.pem -sha256 -days 99999 -nodes -subj '/CN=localhost'
let certPath = Bundle.module.path(forResource: "self_signed_cert", ofType: "pem")!
let keyPath = Bundle.module.path(forResource: "self_signed_key", ofType: "pem")!
let configuration = TLSConfiguration.makeServerConfiguration(
certificateChain: try NIOSSLCertificate.fromPEMFile(certPath).map { .certificate($0) },
privateKey: .file(keyPath)
)
let sslContext = try NIOSSLContext(configuration: configuration)
let serverGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
defer { XCTAssertNoThrow(try serverGroup.syncShutdownGracefully()) }
let server = ServerBootstrap(group: serverGroup)
.childChannelInitializer { channel in
channel.pipeline.addHandler(NIOSSLServerHandler(context: sslContext))
}
let serverChannel = try server.bind(host: "localhost", port: 0).wait()
defer { XCTAssertNoThrow(try serverChannel.close().wait()) }
let port = serverChannel.localAddress!.port!

var config = HTTPClient.Configuration()
config.timeout.connect = .seconds(3)
let localClient = HTTPClient(eventLoopGroupProvider: .createNew, configuration: config)
defer { XCTAssertNoThrow(try localClient.syncShutdown()) }
let request = HTTPClientRequest(url: "https://localhost:\(port)")
await XCTAssertThrowsError(try await localClient.execute(request, deadline: .now() + .seconds(2))) { error in
#if canImport(Network)
guard let nwTLSError = error as? HTTPClient.NWTLSError else {
XCTFail("could not cast \(error) of type \(type(of: error)) to \(HTTPClient.NWTLSError.self)")
return
}
XCTAssertEqual(nwTLSError.status, errSSLBadCert, "unexpected tls error: \(nwTLSError)")
#else
guard let sslError = error as? NIOSSLError,
case .handshakeFailed(.sslError) = sslError else {
XCTFail("unexpected error \(error)")
return
}
#endif
}
}
#endif
}

func testInvalidURL() {
#if compiler(>=5.5.2) && canImport(_Concurrency)
guard #available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) else { return }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ extension Transaction_StateMachineTests {
("testRequestWasQueuedAfterWillExecuteRequestWasCalled", testRequestWasQueuedAfterWillExecuteRequestWasCalled),
("testRequestBodyStreamWasPaused", testRequestBodyStreamWasPaused),
("testQueuedRequestGetsRemovedWhenDeadlineExceeded", testQueuedRequestGetsRemovedWhenDeadlineExceeded),
("testDeadlineExceededAndFullyFailedRequestCanBeCanceledWithNoEffect", testDeadlineExceededAndFullyFailedRequestCanBeCanceledWithNoEffect),
("testScheduledRequestGetsRemovedWhenDeadlineExceeded", testScheduledRequestGetsRemovedWhenDeadlineExceeded),
("testDeadlineExceededRaceWithRequestWillExecute", testDeadlineExceededRaceWithRequestWillExecute),
("testRequestWithHeadReceivedGetNotCancelledWhenDeadlineExceeded", testRequestWithHeadReceivedGetNotCancelledWhenDeadlineExceeded),
]
}
Expand Down
89 changes: 85 additions & 4 deletions Tests/AsyncHTTPClientTests/Transaction+StateMachineTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ final class Transaction_StateMachineTests: XCTestCase {
}

func testQueuedRequestGetsRemovedWhenDeadlineExceeded() {
struct MyError: Error, Equatable {}
#if compiler(>=5.5.2) && canImport(_Concurrency)
guard #available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) else { return }
XCTAsyncTest {
Expand All @@ -82,16 +83,62 @@ final class Transaction_StateMachineTests: XCTestCase {

state.requestWasQueued(queuer)

let failAction = state.deadlineExceeded()
guard case .cancel(let continuation, let scheduler, nil, nil) = failAction else {
let deadlineExceededAction = state.deadlineExceeded()
guard case .cancelSchedulerOnly(let scheduler) = deadlineExceededAction else {
return XCTFail("Unexpected fail action: \(deadlineExceededAction)")
}
XCTAssertIdentical(scheduler as? MockTaskQueuer, queuer)

let failAction = state.fail(MyError())
guard case .failResponseHead(let continuation, let error, nil, nil, bodyStreamContinuation: nil) = failAction else {
return XCTFail("Unexpected fail action: \(failAction)")
}
XCTAssertIdentical(scheduler as? MockTaskQueuer, queuer)

continuation.resume(throwing: HTTPClientError.deadlineExceeded)
continuation.resume(throwing: error)
}

await XCTAssertThrowsError(try await withCheckedThrowingContinuation(workaround))
await XCTAssertThrowsError(try await withCheckedThrowingContinuation(workaround)) {
XCTAssertEqualTypeAndValue($0, MyError())
}
}
#endif
}

func testDeadlineExceededAndFullyFailedRequestCanBeCanceledWithNoEffect() {
struct MyError: Error, Equatable {}
#if compiler(>=5.5.2) && canImport(_Concurrency)
guard #available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) else { return }
XCTAsyncTest {
func workaround(_ continuation: CheckedContinuation<HTTPClientResponse, Error>) {
var state = Transaction.StateMachine(continuation)
let queuer = MockTaskQueuer()

state.requestWasQueued(queuer)

let deadlineExceededAction = state.deadlineExceeded()
guard case .cancelSchedulerOnly(let scheduler) = deadlineExceededAction else {
return XCTFail("Unexpected fail action: \(deadlineExceededAction)")
}
XCTAssertIdentical(scheduler as? MockTaskQueuer, queuer)

let failAction = state.fail(MyError())
guard case .failResponseHead(let continuation, let error, nil, nil, bodyStreamContinuation: nil) = failAction else {
return XCTFail("Unexpected fail action: \(failAction)")
}
XCTAssertIdentical(scheduler as? MockTaskQueuer, queuer)

let secondFailAction = state.fail(HTTPClientError.cancelled)
guard case .none = secondFailAction else {
return XCTFail("Unexpected fail action: \(secondFailAction)")
}

continuation.resume(throwing: error)
}

await XCTAssertThrowsError(try await withCheckedThrowingContinuation(workaround)) {
XCTAssertEqualTypeAndValue($0, MyError())
}
}
#endif
}
Expand Down Expand Up @@ -123,6 +170,40 @@ final class Transaction_StateMachineTests: XCTestCase {
#endif
}

func testDeadlineExceededRaceWithRequestWillExecute() {
#if compiler(>=5.5.2) && canImport(_Concurrency)
guard #available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) else { return }
let eventLoop = EmbeddedEventLoop()
XCTAsyncTest {
func workaround(_ continuation: CheckedContinuation<HTTPClientResponse, Error>) {
var state = Transaction.StateMachine(continuation)
let expectedExecutor = MockRequestExecutor(eventLoop: eventLoop)
let queuer = MockTaskQueuer()

state.requestWasQueued(queuer)

let deadlineExceededAction = state.deadlineExceeded()
guard case .cancelSchedulerOnly(let scheduler) = deadlineExceededAction else {
return XCTFail("Unexpected fail action: \(deadlineExceededAction)")
}
XCTAssertIdentical(scheduler as? MockTaskQueuer, queuer)

let failAction = state.willExecuteRequest(expectedExecutor)
guard case .cancelAndFail(let returnedExecutor, let continuation, with: let error) = failAction else {
return XCTFail("Unexpected fail action: \(failAction)")
}
XCTAssertIdentical(returnedExecutor as? MockRequestExecutor, expectedExecutor)

continuation.resume(throwing: error)
}

await XCTAssertThrowsError(try await withCheckedThrowingContinuation(workaround)) {
XCTAssertEqualTypeAndValue($0, HTTPClientError.deadlineExceeded)
}
}
#endif
}

func testRequestWithHeadReceivedGetNotCancelledWhenDeadlineExceeded() {
#if compiler(>=5.5.2) && canImport(_Concurrency)
guard #available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) else { return }
Expand Down