From 9b5d4d985bc2fdd1b35917fc123d0e2c4269a13a Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Mon, 11 Apr 2022 14:10:15 +0200 Subject: [PATCH] [HTTP1] Tolerate a write error Same fix for HTTP/1 that landed for HTTP/2 in #558. ### Motivation `HTTP1ClientChannelHandler` currently does not tolerate immediate write errors. ### Changes Make `HTTP1ClientChannelHandler` resilient to failing writes. ### Result Less crashes in AHC HTTP/1. --- .../HTTP1/HTTP1ClientChannelHandler.swift | 63 +++++++++++++------ ...TTP1ClientChannelHandlerTests+XCTest.swift | 1 + .../HTTP1ClientChannelHandlerTests.swift | 63 +++++++++++++++++++ 3 files changed, 108 insertions(+), 19 deletions(-) diff --git a/Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ClientChannelHandler.swift b/Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ClientChannelHandler.swift index 15544dc4a..9d1a3b5fd 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ClientChannelHandler.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ClientChannelHandler.swift @@ -183,23 +183,7 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler { private func run(_ action: HTTP1ConnectionStateMachine.Action, context: ChannelHandlerContext) { switch action { case .sendRequestHead(let head, startBody: let startBody): - if startBody { - context.write(self.wrapOutboundOut(.head(head)), promise: nil) - context.flush() - - self.request!.requestHeadSent() - self.request!.resumeRequestBodyStream() - } else { - context.write(self.wrapOutboundOut(.head(head)), promise: nil) - context.write(self.wrapOutboundOut(.end(nil)), promise: nil) - context.flush() - - self.request!.requestHeadSent() - - if let timeoutAction = self.idleReadTimeoutStateMachine?.requestEndSent() { - self.runTimeoutAction(timeoutAction, context: context) - } - } + self.sendRequestHead(head, startBody: startBody, context: context) case .sendBodyPart(let part): context.writeAndFlush(self.wrapOutboundOut(.body(part)), promise: nil) @@ -212,9 +196,13 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler { } case .pauseRequestBodyStream: + // We can force unwrap the request here, as we have just validated in the state machine, + // that the request is neither failed nor finished yet self.request!.pauseRequestBodyStream() case .resumeRequestBodyStream: + // We can force unwrap the request here, as we have just validated in the state machine, + // that the request is neither failed nor finished yet self.request!.resumeRequestBodyStream() case .fireChannelActive: @@ -239,15 +227,25 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler { break case .forwardResponseHead(let head, let pauseRequestBodyStream): + // We can force unwrap the request here, as we have just validated in the state machine, + // that the request is neither failed nor finished yet self.request!.receiveResponseHead(head) - if pauseRequestBodyStream { - self.request!.pauseRequestBodyStream() + if pauseRequestBodyStream, let request = self.request { + // The above response head forward might lead the request to mark itself as + // cancelled, which in turn might pop the request of the handler. For this reason we + // must check if the request is still present here. + request.pauseRequestBodyStream() } case .forwardResponseBodyParts(let buffer): + // We can force unwrap the request here, as we have just validated in the state machine, + // that the request is neither failed nor finished yet self.request!.receiveResponseBodyParts(buffer) case .succeedRequest(let finalAction, let buffer): + // We can force unwrap the request here, as we have just validated in the state machine, + // that the request is neither failed nor finished yet + // The order here is very important... // We first nil our own task property! `taskCompleted` will potentially lead to // situations in which we get a new request right away. We should finish the task @@ -293,6 +291,33 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler { } } + private func sendRequestHead(_ head: HTTPRequestHead, startBody: Bool, context: ChannelHandlerContext) { + if startBody { + context.writeAndFlush(self.wrapOutboundOut(.head(head)), promise: nil) + + // The above write might trigger an error, which may lead to a call to `errorCaught`, + // which in turn, may fail the request and pop it from the handler. For this reason + // we must check if the request is still present here. + guard let request = self.request else { return } + request.requestHeadSent() + request.resumeRequestBodyStream() + } else { + context.write(self.wrapOutboundOut(.head(head)), promise: nil) + context.write(self.wrapOutboundOut(.end(nil)), promise: nil) + context.flush() + + // The above write might trigger an error, which may lead to a call to `errorCaught`, + // which in turn, may fail the request and pop it from the handler. For this reason + // we must check if the request is still present here. + guard let request = self.request else { return } + request.requestHeadSent() + + if let timeoutAction = self.idleReadTimeoutStateMachine?.requestEndSent() { + self.runTimeoutAction(timeoutAction, context: context) + } + } + } + private func runTimeoutAction(_ action: IdleReadStateMachine.Action, context: ChannelHandlerContext) { switch action { case .startIdleReadTimeoutTimer(let timeAmount): diff --git a/Tests/AsyncHTTPClientTests/HTTP1ClientChannelHandlerTests+XCTest.swift b/Tests/AsyncHTTPClientTests/HTTP1ClientChannelHandlerTests+XCTest.swift index 8d28c15c4..86707520c 100644 --- a/Tests/AsyncHTTPClientTests/HTTP1ClientChannelHandlerTests+XCTest.swift +++ b/Tests/AsyncHTTPClientTests/HTTP1ClientChannelHandlerTests+XCTest.swift @@ -31,6 +31,7 @@ extension HTTP1ClientChannelHandlerTests { ("testIdleReadTimeout", testIdleReadTimeout), ("testIdleReadTimeoutIsCanceledIfRequestIsCanceled", testIdleReadTimeoutIsCanceledIfRequestIsCanceled), ("testFailHTTPRequestWithContentLengthBecauseOfChannelInactiveWaitingForDemand", testFailHTTPRequestWithContentLengthBecauseOfChannelInactiveWaitingForDemand), + ("testWriteHTTPHeadFails", testWriteHTTPHeadFails), ] } } diff --git a/Tests/AsyncHTTPClientTests/HTTP1ClientChannelHandlerTests.swift b/Tests/AsyncHTTPClientTests/HTTP1ClientChannelHandlerTests.swift index 62e42f94e..4769d2c7e 100644 --- a/Tests/AsyncHTTPClientTests/HTTP1ClientChannelHandlerTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTP1ClientChannelHandlerTests.swift @@ -394,6 +394,69 @@ class HTTP1ClientChannelHandlerTests: XCTestCase { XCTAssertEqual($0 as? HTTPClientError, .remoteConnectionClosed) } } + + func testWriteHTTPHeadFails() { + struct WriteError: Error, Equatable {} + + class FailWriteHandler: ChannelOutboundHandler { + typealias OutboundIn = HTTPClientRequestPart + typealias OutboundOut = HTTPClientRequestPart + + func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise?) { + let error = WriteError() + promise?.fail(error) + context.fireErrorCaught(error) + } + } + + let bodies: [HTTPClient.Body?] = [ + .none, + .some(.byteBuffer(ByteBuffer(string: "hello world"))), + ] + + for body in bodies { + let embedded = EmbeddedChannel() + var maybeTestUtils: HTTP1TestTools? + XCTAssertNoThrow(maybeTestUtils = try embedded.setupHTTP1Connection()) + guard let testUtils = maybeTestUtils else { return XCTFail("Expected connection setup works") } + + XCTAssertNoThrow(try embedded.pipeline.syncOperations.addHandler(FailWriteHandler(), position: .after(testUtils.readEventHandler))) + + let logger = Logger(label: "test") + + var maybeRequest: HTTPClient.Request? + XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: body)) + guard let request = maybeRequest else { return XCTFail("Expected to be able to create a request") } + + let delegate = ResponseAccumulator(request: request) + var maybeRequestBag: RequestBag? + XCTAssertNoThrow(maybeRequestBag = try RequestBag( + request: request, + eventLoopPreference: .delegate(on: embedded.eventLoop), + task: .init(eventLoop: embedded.eventLoop, logger: logger), + redirectHandler: nil, + connectionDeadline: .now() + .seconds(30), + requestOptions: .forTests(idleReadTimeout: .milliseconds(200)), + delegate: delegate + )) + guard let requestBag = maybeRequestBag else { return XCTFail("Expected to be able to create a request bag") } + + embedded.isWritable = false + XCTAssertNoThrow(try embedded.connect(to: .makeAddressResolvingHost("localhost", port: 0)).wait()) + embedded.write(requestBag, promise: nil) + + // the handler only writes once the channel is writable + XCTAssertEqual(try embedded.readOutbound(as: HTTPClientRequestPart.self), .none) + embedded.isWritable = true + embedded.pipeline.fireChannelWritabilityChanged() + + XCTAssertThrowsError(try requestBag.task.futureResult.wait()) { + XCTAssertEqual($0 as? WriteError, WriteError()) + } + + XCTAssertEqual(embedded.isActive, false) + } + } } class TestBackpressureWriter {