Skip to content

Crash fix: HTTP2ClientRequestHandler can deal with failing writes #558

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -139,30 +139,13 @@ final class HTTP2ClientRequestHandler: ChannelDuplexHandler {
// MARK: Run Actions

private func run(_ action: HTTPRequestStateMachine.Action, context: ChannelHandlerContext) {
// NOTE: We can bang the request in the following actions, since the `HTTPRequestStateMachine`
// ensures, that actions that require a request are only called, if the request is
// still present. The request is only nilled as a response to a state machine action
// (.failRequest or .succeedRequest).

switch action {
case .sendRequestHead(let head, let startBody):
if startBody {
context.writeAndFlush(self.wrapOutboundOut(.head(head)), promise: nil)
self.request!.requestHeadSent()
self.request!.resumeRequestBodyStream()
} else {
context.write(self.wrapOutboundOut(.head(head)), promise: nil)
context.write(self.wrapOutboundOut(.end(nil)), promise: nil)
context.flush()

self.request!.requestHeadSent()

if let timeoutAction = self.idleReadTimeoutStateMachine?.requestEndSent() {
self.runTimeoutAction(timeoutAction, context: context)
}
}
self.sendRequestHead(head, startBody: startBody, context: context)

case .pauseRequestBodyStream:
// We can force unwrap the request here, as we have just validated in the state machine,
// that the request is neither failed nor finished yet
self.request!.pauseRequestBodyStream()

case .sendBodyPart(let data):
Expand All @@ -182,18 +165,29 @@ final class HTTP2ClientRequestHandler: ChannelDuplexHandler {
break

case .resumeRequestBodyStream:
// We can force unwrap the request here, as we have just validated in the state machine,
// that the request is neither failed nor finished yet
self.request!.resumeRequestBodyStream()

case .forwardResponseHead(let head, pauseRequestBodyStream: let pauseRequestBodyStream):
// We can force unwrap the request here, as we have just validated in the state machine,
// that the request is neither failed nor finished yet
self.request!.receiveResponseHead(head)
if pauseRequestBodyStream {
self.request!.pauseRequestBodyStream()
if pauseRequestBodyStream, let request = self.request {
// The above response head forward might lead the request to mark itself as
// cancelled, which in turn might pop the request of the handler. For this reason we
// must check if the request is still present here.
request.pauseRequestBodyStream()
Copy link
Contributor

@karwa karwa Feb 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This all seems very fragile. Each action is very tightly coupled to the state machine to check its own internal state, and some of these methods require non-local reasoning (e.g. calling a method on a request which reaches up and nils that request variable in its owner object).

It's perhaps worth questioning: with the tight coupling, is it worth dispatching all of these operations with a single enum? Perhaps each case should be an individual method, invoked directly with the unwrapped request value. The cancellation stuff is also not ideal but I don't have a concrete suggestion at this time.

Certainly the crash fix is important and I'm not suggesting it be delayed for refactoring, but it's maybe worth making a note to take another look at it later, and see if it can be made more robust.

Copy link
Member Author

@fabianfett fabianfett Feb 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very much agree! I guess that's the big learning from this state machine.

}

case .forwardResponseBodyParts(let parts):
// We can force unwrap the request here, as we have just validated in the state machine,
// that the request is neither failed nor finished yet
self.request!.receiveResponseBodyParts(parts)

case .failRequest(let error, _):
// We can force unwrap the request here, as we have just validated in the state machine,
// that the request object is still present.
self.request!.fail(error)
self.request = nil
self.runTimeoutAction(.clearIdleReadTimeoutTimer, context: context)
Expand All @@ -204,13 +198,42 @@ final class HTTP2ClientRequestHandler: ChannelDuplexHandler {
self.runFinalAction(.close, context: context)

case .succeedRequest(let finalAction, let finalParts):
// We can force unwrap the request here, as we have just validated in the state machine,
// that the request object is still present.
self.request!.succeedRequest(finalParts)
self.request = nil
self.runTimeoutAction(.clearIdleReadTimeoutTimer, context: context)
self.runFinalAction(finalAction, context: context)
}
}

private func sendRequestHead(_ head: HTTPRequestHead, startBody: Bool, context: ChannelHandlerContext) {
if startBody {
context.writeAndFlush(self.wrapOutboundOut(.head(head)), promise: nil)

// The above write might trigger an error, which may lead to a call to `errorCaught`,
// which in turn, may fail the request and pop it from the handler. For this reason
// we must check if the request is still present here.
guard let request = self.request else { return }
request.requestHeadSent()
request.resumeRequestBodyStream()
} else {
context.write(self.wrapOutboundOut(.head(head)), promise: nil)
context.write(self.wrapOutboundOut(.end(nil)), promise: nil)
context.flush()

// The above write might trigger an error, which may lead to a call to `errorCaught`,
// which in turn, may fail the request and pop it from the handler. For this reason
// we must check if the request is still present here.
guard let request = self.request else { return }
request.requestHeadSent()

if let timeoutAction = self.idleReadTimeoutStateMachine?.requestEndSent() {
self.runTimeoutAction(timeoutAction, context: context)
}
}
}

private func runFinalAction(_ action: HTTPRequestStateMachine.Action.FinalStreamAction, context: ChannelHandlerContext) {
switch action {
case .close:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ extension HTTP2ClientRequestHandlerTests {
("testWriteBackpressure", testWriteBackpressure),
("testIdleReadTimeout", testIdleReadTimeout),
("testIdleReadTimeoutIsCanceledIfRequestIsCanceled", testIdleReadTimeoutIsCanceledIfRequestIsCanceled),
("testWriteHTTPHeadFails", testWriteHTTPHeadFails),
]
}
}
60 changes: 60 additions & 0 deletions Tests/AsyncHTTPClientTests/HTTP2ClientRequestHandlerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -285,4 +285,64 @@ class HTTP2ClientRequestHandlerTests: XCTestCase {
// therefore advancing the time should not trigger a crash
embedded.embeddedEventLoop.advanceTime(by: .milliseconds(250))
}

func testWriteHTTPHeadFails() {
struct WriteError: Error, Equatable {}

class FailWriteHandler: ChannelOutboundHandler {
typealias OutboundIn = HTTPClientRequestPart
typealias OutboundOut = HTTPClientRequestPart

func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
let error = WriteError()
promise?.fail(error)
context.fireErrorCaught(error)
}
}

let bodies: [HTTPClient.Body?] = [
.none,
.some(.byteBuffer(ByteBuffer(string: "hello world"))),
]

for body in bodies {
let embeddedEventLoop = EmbeddedEventLoop()
let requestHandler = HTTP2ClientRequestHandler(eventLoop: embeddedEventLoop)
let embedded = EmbeddedChannel(handlers: [FailWriteHandler(), requestHandler], loop: embeddedEventLoop)

let logger = Logger(label: "test")

var maybeRequest: HTTPClient.Request?
XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: body))
guard let request = maybeRequest else { return XCTFail("Expected to be able to create a request") }

let delegate = ResponseAccumulator(request: request)
var maybeRequestBag: RequestBag<ResponseAccumulator>?
XCTAssertNoThrow(maybeRequestBag = try RequestBag(
request: request,
eventLoopPreference: .delegate(on: embedded.eventLoop),
task: .init(eventLoop: embedded.eventLoop, logger: logger),
redirectHandler: nil,
connectionDeadline: .now() + .seconds(30),
requestOptions: .forTests(idleReadTimeout: .milliseconds(200)),
delegate: delegate
))
guard let requestBag = maybeRequestBag else { return XCTFail("Expected to be able to create a request bag") }

embedded.isWritable = false
XCTAssertNoThrow(try embedded.connect(to: .makeAddressResolvingHost("localhost", port: 0)).wait())
embedded.write(requestBag, promise: nil)

// the handler only writes once the channel is writable
XCTAssertEqual(try embedded.readOutbound(as: HTTPClientRequestPart.self), .none)
embedded.isWritable = true
embedded.pipeline.fireChannelWritabilityChanged()

XCTAssertThrowsError(try requestBag.task.futureResult.wait()) {
XCTAssertEqual($0 as? WriteError, WriteError())
}

XCTAssertEqual(embedded.isActive, false)
}
}
}