diff --git a/Sources/AsyncHTTPClient/RequestBag.swift b/Sources/AsyncHTTPClient/RequestBag.swift index 9c45728b7..50c0057ba 100644 --- a/Sources/AsyncHTTPClient/RequestBag.swift +++ b/Sources/AsyncHTTPClient/RequestBag.swift @@ -276,14 +276,7 @@ final class RequestBag { self.delegate.didReceiveBodyPart(task: self.task, buffer) .hop(to: self.task.eventLoop) .whenComplete { - switch $0 { - case .success: - self.consumeMoreBodyData0(resultOfPreviousConsume: $0) - case .failure(let error): - // if in the response stream consumption an error has occurred, we need to - // cancel the running request and fail the task. - self.fail(error) - } + self.consumeMoreBodyData0(resultOfPreviousConsume: $0) } case .succeedRequest: @@ -325,18 +318,13 @@ final class RequestBag { self.delegate.didReceiveBodyPart(task: self.task, byteBuffer) .hop(to: self.task.eventLoop) .whenComplete { result in - switch result { - case .success: - if self.consumeBodyPartStackDepth < Self.maxConsumeBodyPartStackDepth { + if self.consumeBodyPartStackDepth < Self.maxConsumeBodyPartStackDepth { + self.consumeMoreBodyData0(resultOfPreviousConsume: result) + } else { + // We need to unwind the stack, let's take a break. + self.task.eventLoop.execute { self.consumeMoreBodyData0(resultOfPreviousConsume: result) - } else { - // We need to unwind the stack, let's take a break. - self.task.eventLoop.execute { - self.consumeMoreBodyData0(resultOfPreviousConsume: result) - } } - case .failure(let error): - self.fail(error) } } diff --git a/Tests/AsyncHTTPClientTests/RequestBagTests+XCTest.swift b/Tests/AsyncHTTPClientTests/RequestBagTests+XCTest.swift index 72046f68c..b6a05733c 100644 --- a/Tests/AsyncHTTPClientTests/RequestBagTests+XCTest.swift +++ b/Tests/AsyncHTTPClientTests/RequestBagTests+XCTest.swift @@ -34,6 +34,7 @@ extension RequestBagTests { ("testCancelFailsTaskWhenTaskIsQueued", testCancelFailsTaskWhenTaskIsQueued), ("testFailsTaskWhenTaskIsWaitingForMoreFromServer", testFailsTaskWhenTaskIsWaitingForMoreFromServer), ("testChannelBecomingWritableDoesntCrashCancelledTask", testChannelBecomingWritableDoesntCrashCancelledTask), + ("testDidReceiveBodyPartFailedPromise", testDidReceiveBodyPartFailedPromise), ("testHTTPUploadIsCancelledEvenThoughRequestSucceeds", testHTTPUploadIsCancelledEvenThoughRequestSucceeds), ("testRaceBetweenConnectionCloseAndDemandMoreData", testRaceBetweenConnectionCloseAndDemandMoreData), ("testRedirectWith3KBBody", testRedirectWith3KBBody), diff --git a/Tests/AsyncHTTPClientTests/RequestBagTests.swift b/Tests/AsyncHTTPClientTests/RequestBagTests.swift index 6993c0df9..43062405c 100644 --- a/Tests/AsyncHTTPClientTests/RequestBagTests.swift +++ b/Tests/AsyncHTTPClientTests/RequestBagTests.swift @@ -455,6 +455,73 @@ final class RequestBagTests: XCTestCase { } } + func testDidReceiveBodyPartFailedPromise() { + let embeddedEventLoop = EmbeddedEventLoop() + defer { XCTAssertNoThrow(try embeddedEventLoop.syncShutdownGracefully()) } + let logger = Logger(label: "test") + + var maybeRequest: HTTPClient.Request? + + XCTAssertNoThrow(maybeRequest = try HTTPClient.Request( + url: "https://swift.org", + method: .POST, + body: .byteBuffer(.init(bytes: [1])) + )) + guard let request = maybeRequest else { return XCTFail("Expected to have a request") } + + struct MyError: Error, Equatable {} + final class Delegate: HTTPClientResponseDelegate { + typealias Response = Void + let didFinishPromise: EventLoopPromise + init(didFinishPromise: EventLoopPromise) { + self.didFinishPromise = didFinishPromise + } + + func didReceiveBodyPart(task: HTTPClient.Task, _ buffer: ByteBuffer) -> EventLoopFuture { + task.eventLoop.makeFailedFuture(MyError()) + } + + func didReceiveError(task: HTTPClient.Task, _ error: Error) { + self.didFinishPromise.fail(error) + } + + func didFinishRequest(task: AsyncHTTPClient.HTTPClient.Task) throws { + XCTFail("\(#function) should not be called") + self.didFinishPromise.succeed(()) + } + } + let delegate = Delegate(didFinishPromise: embeddedEventLoop.makePromise()) + var maybeRequestBag: RequestBag? + XCTAssertNoThrow(maybeRequestBag = try RequestBag( + request: request, + eventLoopPreference: .delegate(on: embeddedEventLoop), + task: .init(eventLoop: embeddedEventLoop, logger: logger), + redirectHandler: nil, + connectionDeadline: .now() + .seconds(30), + requestOptions: .forTests(), + delegate: delegate + )) + guard let bag = maybeRequestBag else { return XCTFail("Expected to be able to create a request bag.") } + + let executor = MockRequestExecutor(eventLoop: embeddedEventLoop) + + executor.runRequest(bag) + + bag.resumeRequestBodyStream() + XCTAssertNoThrow(try executor.receiveRequestBody { XCTAssertEqual($0, ByteBuffer(bytes: [1])) }) + + bag.receiveResponseHead(.init(version: .http1_1, status: .ok)) + + bag.succeedRequest([ByteBuffer([1])]) + + XCTAssertThrowsError(try delegate.didFinishPromise.futureResult.wait()) { error in + XCTAssertEqualTypeAndValue(error, MyError()) + } + XCTAssertThrowsError(try bag.task.futureResult.wait()) { error in + XCTAssertEqualTypeAndValue(error, MyError()) + } + } + func testHTTPUploadIsCancelledEvenThoughRequestSucceeds() { let embeddedEventLoop = EmbeddedEventLoop() defer { XCTAssertNoThrow(try embeddedEventLoop.syncShutdownGracefully()) }