Skip to content

[HTTP1] Tolerate immediate write errors #579

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 11, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -183,23 +183,7 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
private func run(_ action: HTTP1ConnectionStateMachine.Action, context: ChannelHandlerContext) {
switch action {
case .sendRequestHead(let head, startBody: let startBody):
if startBody {
context.write(self.wrapOutboundOut(.head(head)), promise: nil)
context.flush()

self.request!.requestHeadSent()
self.request!.resumeRequestBodyStream()
} else {
context.write(self.wrapOutboundOut(.head(head)), promise: nil)
context.write(self.wrapOutboundOut(.end(nil)), promise: nil)
context.flush()

self.request!.requestHeadSent()

if let timeoutAction = self.idleReadTimeoutStateMachine?.requestEndSent() {
self.runTimeoutAction(timeoutAction, context: context)
}
}
self.sendRequestHead(head, startBody: startBody, context: context)

case .sendBodyPart(let part):
context.writeAndFlush(self.wrapOutboundOut(.body(part)), promise: nil)
Expand All @@ -212,9 +196,13 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
}

case .pauseRequestBodyStream:
// We can force unwrap the request here, as we have just validated in the state machine,
// that the request is neither failed nor finished yet
self.request!.pauseRequestBodyStream()

case .resumeRequestBodyStream:
// We can force unwrap the request here, as we have just validated in the state machine,
// that the request is neither failed nor finished yet
self.request!.resumeRequestBodyStream()

case .fireChannelActive:
Expand All @@ -239,15 +227,25 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
break

case .forwardResponseHead(let head, let pauseRequestBodyStream):
// We can force unwrap the request here, as we have just validated in the state machine,
// that the request is neither failed nor finished yet
self.request!.receiveResponseHead(head)
if pauseRequestBodyStream {
self.request!.pauseRequestBodyStream()
if pauseRequestBodyStream, let request = self.request {
// The above response head forward might lead the request to mark itself as
// cancelled, which in turn might pop the request of the handler. For this reason we
// must check if the request is still present here.
request.pauseRequestBodyStream()
}

case .forwardResponseBodyParts(let buffer):
// We can force unwrap the request here, as we have just validated in the state machine,
// that the request is neither failed nor finished yet
self.request!.receiveResponseBodyParts(buffer)

case .succeedRequest(let finalAction, let buffer):
// We can force unwrap the request here, as we have just validated in the state machine,
// that the request is neither failed nor finished yet

// The order here is very important...
// We first nil our own task property! `taskCompleted` will potentially lead to
// situations in which we get a new request right away. We should finish the task
Expand Down Expand Up @@ -293,6 +291,33 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
}
}

private func sendRequestHead(_ head: HTTPRequestHead, startBody: Bool, context: ChannelHandlerContext) {
if startBody {
context.writeAndFlush(self.wrapOutboundOut(.head(head)), promise: nil)

// The above write might trigger an error, which may lead to a call to `errorCaught`,
// which in turn, may fail the request and pop it from the handler. For this reason
// we must check if the request is still present here.
guard let request = self.request else { return }
request.requestHeadSent()
request.resumeRequestBodyStream()
} else {
context.write(self.wrapOutboundOut(.head(head)), promise: nil)
context.write(self.wrapOutboundOut(.end(nil)), promise: nil)
context.flush()

// The above write might trigger an error, which may lead to a call to `errorCaught`,
// which in turn, may fail the request and pop it from the handler. For this reason
// we must check if the request is still present here.
guard let request = self.request else { return }
request.requestHeadSent()

if let timeoutAction = self.idleReadTimeoutStateMachine?.requestEndSent() {
self.runTimeoutAction(timeoutAction, context: context)
}
}
}

private func runTimeoutAction(_ action: IdleReadStateMachine.Action, context: ChannelHandlerContext) {
switch action {
case .startIdleReadTimeoutTimer(let timeAmount):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ extension HTTP1ClientChannelHandlerTests {
("testIdleReadTimeout", testIdleReadTimeout),
("testIdleReadTimeoutIsCanceledIfRequestIsCanceled", testIdleReadTimeoutIsCanceledIfRequestIsCanceled),
("testFailHTTPRequestWithContentLengthBecauseOfChannelInactiveWaitingForDemand", testFailHTTPRequestWithContentLengthBecauseOfChannelInactiveWaitingForDemand),
("testWriteHTTPHeadFails", testWriteHTTPHeadFails),
]
}
}
63 changes: 63 additions & 0 deletions Tests/AsyncHTTPClientTests/HTTP1ClientChannelHandlerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,69 @@ class HTTP1ClientChannelHandlerTests: XCTestCase {
XCTAssertEqual($0 as? HTTPClientError, .remoteConnectionClosed)
}
}

func testWriteHTTPHeadFails() {
struct WriteError: Error, Equatable {}

class FailWriteHandler: ChannelOutboundHandler {
typealias OutboundIn = HTTPClientRequestPart
typealias OutboundOut = HTTPClientRequestPart

func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
let error = WriteError()
promise?.fail(error)
context.fireErrorCaught(error)
}
}

let bodies: [HTTPClient.Body?] = [
.none,
.some(.byteBuffer(ByteBuffer(string: "hello world"))),
]

for body in bodies {
let embedded = EmbeddedChannel()
var maybeTestUtils: HTTP1TestTools?
XCTAssertNoThrow(maybeTestUtils = try embedded.setupHTTP1Connection())
guard let testUtils = maybeTestUtils else { return XCTFail("Expected connection setup works") }

XCTAssertNoThrow(try embedded.pipeline.syncOperations.addHandler(FailWriteHandler(), position: .after(testUtils.readEventHandler)))

let logger = Logger(label: "test")

var maybeRequest: HTTPClient.Request?
XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: body))
guard let request = maybeRequest else { return XCTFail("Expected to be able to create a request") }

let delegate = ResponseAccumulator(request: request)
var maybeRequestBag: RequestBag<ResponseAccumulator>?
XCTAssertNoThrow(maybeRequestBag = try RequestBag(
request: request,
eventLoopPreference: .delegate(on: embedded.eventLoop),
task: .init(eventLoop: embedded.eventLoop, logger: logger),
redirectHandler: nil,
connectionDeadline: .now() + .seconds(30),
requestOptions: .forTests(idleReadTimeout: .milliseconds(200)),
delegate: delegate
))
guard let requestBag = maybeRequestBag else { return XCTFail("Expected to be able to create a request bag") }

embedded.isWritable = false
XCTAssertNoThrow(try embedded.connect(to: .makeAddressResolvingHost("localhost", port: 0)).wait())
embedded.write(requestBag, promise: nil)

// the handler only writes once the channel is writable
XCTAssertEqual(try embedded.readOutbound(as: HTTPClientRequestPart.self), .none)
embedded.isWritable = true
embedded.pipeline.fireChannelWritabilityChanged()

XCTAssertThrowsError(try requestBag.task.futureResult.wait()) {
XCTAssertEqual($0 as? WriteError, WriteError())
}

XCTAssertEqual(embedded.isActive, false)
}
}
}

class TestBackpressureWriter {
Expand Down