Skip to content

Commit 244aea6

Browse files
Trevörweissi
Trevör
authored andcommitted
Tolerate futures from arbitrary event loops (#96)
This commit fixes #95 by always hopping event loop futures received from the delegate to the right event loop. This could be a source of bugs if the library users forgot to hop(to:) futures from their delegates implementations.
1 parent 26afbc1 commit 244aea6

File tree

4 files changed

+45
-31
lines changed

4 files changed

+45
-31
lines changed

Sources/AsyncHTTPClient/HTTPHandler.swift

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@ internal class ResponseAccumulator: HTTPClientResponseDelegate {
227227
case .error:
228228
break
229229
}
230-
return task.eventLoop.makeSucceededFuture(())
230+
return task.currentEventLoop.makeSucceededFuture(())
231231
}
232232

233233
func didReceiveBodyPart(task: HTTPClient.Task<Response>, _ part: ByteBuffer) -> EventLoopFuture<Void> {
@@ -245,7 +245,7 @@ internal class ResponseAccumulator: HTTPClientResponseDelegate {
245245
case .error:
246246
break
247247
}
248-
return task.eventLoop.makeSucceededFuture(())
248+
return task.currentEventLoop.makeSucceededFuture(())
249249
}
250250

251251
func didReceiveError(task: HTTPClient.Task<Response>, _ error: Error) {
@@ -343,9 +343,9 @@ extension HTTPClientResponseDelegate {
343343

344344
public func didSendRequest(task: HTTPClient.Task<Response>) {}
345345

346-
public func didReceiveHead(task: HTTPClient.Task<Response>, _: HTTPResponseHead) -> EventLoopFuture<Void> { return task.eventLoop.makeSucceededFuture(()) }
346+
public func didReceiveHead(task: HTTPClient.Task<Response>, _: HTTPResponseHead) -> EventLoopFuture<Void> { return task.currentEventLoop.makeSucceededFuture(()) }
347347

348-
public func didReceiveBodyPart(task: HTTPClient.Task<Response>, _: ByteBuffer) -> EventLoopFuture<Void> { return task.eventLoop.makeSucceededFuture(()) }
348+
public func didReceiveBodyPart(task: HTTPClient.Task<Response>, _: ByteBuffer) -> EventLoopFuture<Void> { return task.currentEventLoop.makeSucceededFuture(()) }
349349

350350
public func didReceiveError(task: HTTPClient.Task<Response>, _: Error) {}
351351
}
@@ -366,15 +366,23 @@ extension HTTPClient {
366366
/// `EventLoopFuture<Response>` of the execution or cancellation of the execution.
367367
public final class Task<Response> {
368368
/// `EventLoop` used to execute and process this request.
369-
public let eventLoop: EventLoop
370-
let promise: EventLoopPromise<Response>
369+
public var currentEventLoop: EventLoop {
370+
return self.lock.withLock {
371+
_currentEventLoop
372+
}
373+
}
371374

375+
/// The stored property used by `currentEventLoop` in combination with the `lock`
376+
///
377+
/// In most cases you should use `currentEventLoop` instead
378+
private var _currentEventLoop: EventLoop
379+
let promise: EventLoopPromise<Response>
372380
private var channel: Channel?
373381
private var cancelled: Bool
374382
private let lock: Lock
375383

376-
public init(eventLoop: EventLoop) {
377-
self.eventLoop = eventLoop
384+
init(eventLoop: EventLoop) {
385+
self._currentEventLoop = eventLoop
378386
self.promise = eventLoop.makePromise()
379387
self.cancelled = false
380388
self.lock = Lock()
@@ -405,8 +413,8 @@ extension HTTPClient {
405413

406414
@discardableResult
407415
func setChannel(_ channel: Channel) -> Channel {
408-
precondition(self.eventLoop === channel.eventLoop, "Channel must use same event loop as this task.")
409416
return self.lock.withLock {
417+
self._currentEventLoop = channel.eventLoop
410418
self.channel = channel
411419
return channel
412420
}
@@ -539,9 +547,11 @@ internal class TaskHandler<T: HTTPClientResponseDelegate>: ChannelInboundHandler
539547
} else {
540548
self.state = .head
541549
self.mayRead = false
542-
self.delegate.didReceiveHead(task: self.task, head).whenComplete { result in
543-
self.handleBackpressureResult(context: context, result: result)
544-
}
550+
self.delegate.didReceiveHead(task: self.task, head)
551+
.hop(to: context.eventLoop)
552+
.whenComplete { result in
553+
self.handleBackpressureResult(context: context, result: result)
554+
}
545555
}
546556
case .body(let body):
547557
switch self.state {
@@ -550,9 +560,11 @@ internal class TaskHandler<T: HTTPClientResponseDelegate>: ChannelInboundHandler
550560
default:
551561
self.state = .body
552562
self.mayRead = false
553-
self.delegate.didReceiveBodyPart(task: self.task, body).whenComplete { result in
554-
self.handleBackpressureResult(context: context, result: result)
555-
}
563+
self.delegate.didReceiveBodyPart(task: self.task, body)
564+
.hop(to: context.eventLoop)
565+
.whenComplete { result in
566+
self.handleBackpressureResult(context: context, result: result)
567+
}
556568
}
557569
case .end:
558570
switch self.state {

Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,12 @@ import NIOSSL
2121
class TestHTTPDelegate: HTTPClientResponseDelegate {
2222
typealias Response = Void
2323

24+
init(backpressureEventLoop: EventLoop? = nil) {
25+
self.backpressureEventLoop = backpressureEventLoop
26+
}
27+
28+
var backpressureEventLoop: EventLoop?
29+
2430
enum State {
2531
case idle
2632
case head(HTTPResponseHead)
@@ -33,7 +39,7 @@ class TestHTTPDelegate: HTTPClientResponseDelegate {
3339

3440
func didReceiveHead(task: HTTPClient.Task<Response>, _ head: HTTPResponseHead) -> EventLoopFuture<Void> {
3541
self.state = .head(head)
36-
return task.eventLoop.makeSucceededFuture(())
42+
return (self.backpressureEventLoop ?? task.currentEventLoop).makeSucceededFuture(())
3743
}
3844

3945
func didReceiveBodyPart(task: HTTPClient.Task<Response>, _ buffer: ByteBuffer) -> EventLoopFuture<Void> {
@@ -47,7 +53,7 @@ class TestHTTPDelegate: HTTPClientResponseDelegate {
4753
default:
4854
preconditionFailure("expecting head or body")
4955
}
50-
return task.eventLoop.makeSucceededFuture(())
56+
return (self.backpressureEventLoop ?? task.currentEventLoop).makeSucceededFuture(())
5157
}
5258

5359
func didFinishRequest(task: HTTPClient.Task<Response>) throws {}
@@ -63,7 +69,7 @@ class CountingDelegate: HTTPClientResponseDelegate {
6369
if str?.starts(with: "id:") ?? false {
6470
self.count += 1
6571
}
66-
return task.eventLoop.makeSucceededFuture(())
72+
return task.currentEventLoop.makeSucceededFuture(())
6773
}
6874

6975
func didFinishRequest(task: HTTPClient.Task<Response>) throws -> Int {

Tests/AsyncHTTPClientTests/HTTPClientTests+XCTest.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ extension HTTPClientTests {
2929
("testBadRequestURI", testBadRequestURI),
3030
("testSchemaCasing", testSchemaCasing),
3131
("testGet", testGet),
32-
("testGetWithSharedEventLoopGroup", testGetWithSharedEventLoopGroup),
32+
("testGetWithDifferentEventLoopBackpressure", testGetWithDifferentEventLoopBackpressure),
3333
("testPost", testPost),
3434
("testGetHttps", testGetHttps),
3535
("testPostHttps", testPostHttps),

Tests/AsyncHTTPClientTests/HTTPClientTests.swift

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -62,23 +62,19 @@ class HTTPClientTests: XCTestCase {
6262
XCTAssertEqual(.ok, response.status)
6363
}
6464

65-
func testGetWithSharedEventLoopGroup() throws {
65+
func testGetWithDifferentEventLoopBackpressure() throws {
6666
let httpBin = HttpBin()
67-
let elg = MultiThreadedEventLoopGroup(numberOfThreads: 8)
68-
let httpClient = HTTPClient(eventLoopGroupProvider: .shared(elg))
67+
let loopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
68+
let external = MultiThreadedEventLoopGroup(numberOfThreads: 1)
69+
let httpClient = HTTPClient(eventLoopGroupProvider: .shared(loopGroup))
6970
defer {
7071
XCTAssertNoThrow(try httpClient.syncShutdown())
71-
XCTAssertNoThrow(try elg.syncShutdownGracefully())
72+
XCTAssertNoThrow(try loopGroup.syncShutdownGracefully())
7273
httpBin.shutdown()
7374
}
74-
75-
let delegate = TestHTTPDelegate()
7675
let request = try HTTPClient.Request(url: "http://localhost:\(httpBin.port)/events/10/1")
76+
let delegate = TestHTTPDelegate(backpressureEventLoop: external.next())
7777
let task = httpClient.execute(request: request, delegate: delegate)
78-
let expectedEventLoop = task.eventLoop
79-
task.futureResult.whenComplete { _ in
80-
XCTAssertTrue(expectedEventLoop.inEventLoop)
81-
}
8278
try task.wait()
8379
}
8480

@@ -508,8 +504,8 @@ class HTTPClientTests: XCTestCase {
508504
}
509505

510506
func didReceiveHead(task: HTTPClient.Task<Bool>, _ head: HTTPResponseHead) -> EventLoopFuture<Void> {
511-
self.result = task.eventLoop === self.eventLoop
512-
return task.eventLoop.makeSucceededFuture(())
507+
self.result = task.currentEventLoop === self.eventLoop
508+
return task.currentEventLoop.makeSucceededFuture(())
513509
}
514510

515511
func didFinishRequest(task: HTTPClient.Task<Bool>) throws -> Bool {

0 commit comments

Comments
 (0)