Skip to content

Tolerate futures from arbitrary event loops #96

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Sep 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 27 additions & 15 deletions Sources/AsyncHTTPClient/HTTPHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ internal class ResponseAccumulator: HTTPClientResponseDelegate {
case .error:
break
}
return task.eventLoop.makeSucceededFuture(())
return task.currentEventLoop.makeSucceededFuture(())
}

func didReceiveBodyPart(task: HTTPClient.Task<Response>, _ part: ByteBuffer) -> EventLoopFuture<Void> {
Expand All @@ -245,7 +245,7 @@ internal class ResponseAccumulator: HTTPClientResponseDelegate {
case .error:
break
}
return task.eventLoop.makeSucceededFuture(())
return task.currentEventLoop.makeSucceededFuture(())
}

func didReceiveError(task: HTTPClient.Task<Response>, _ error: Error) {
Expand Down Expand Up @@ -343,9 +343,9 @@ extension HTTPClientResponseDelegate {

public func didSendRequest(task: HTTPClient.Task<Response>) {}

public func didReceiveHead(task: HTTPClient.Task<Response>, _: HTTPResponseHead) -> EventLoopFuture<Void> { return task.eventLoop.makeSucceededFuture(()) }
public func didReceiveHead(task: HTTPClient.Task<Response>, _: HTTPResponseHead) -> EventLoopFuture<Void> { return task.currentEventLoop.makeSucceededFuture(()) }

public func didReceiveBodyPart(task: HTTPClient.Task<Response>, _: ByteBuffer) -> EventLoopFuture<Void> { return task.eventLoop.makeSucceededFuture(()) }
public func didReceiveBodyPart(task: HTTPClient.Task<Response>, _: ByteBuffer) -> EventLoopFuture<Void> { return task.currentEventLoop.makeSucceededFuture(()) }

public func didReceiveError(task: HTTPClient.Task<Response>, _: Error) {}
}
Expand All @@ -366,15 +366,23 @@ extension HTTPClient {
/// `EventLoopFuture<Response>` of the execution or cancellation of the execution.
public final class Task<Response> {
/// `EventLoop` used to execute and process this request.
public let eventLoop: EventLoop
let promise: EventLoopPromise<Response>
public var currentEventLoop: EventLoop {
return self.lock.withLock {
_currentEventLoop
}
}

/// The stored property used by `currentEventLoop` in combination with the `lock`
///
/// In most cases you should use `currentEventLoop` instead
private var _currentEventLoop: EventLoop
let promise: EventLoopPromise<Response>
private var channel: Channel?
private var cancelled: Bool
private let lock: Lock

public init(eventLoop: EventLoop) {
self.eventLoop = eventLoop
init(eventLoop: EventLoop) {
self._currentEventLoop = eventLoop
self.promise = eventLoop.makePromise()
self.cancelled = false
self.lock = Lock()
Expand Down Expand Up @@ -405,8 +413,8 @@ extension HTTPClient {

@discardableResult
func setChannel(_ channel: Channel) -> Channel {
precondition(self.eventLoop === channel.eventLoop, "Channel must use same event loop as this task.")
return self.lock.withLock {
self._currentEventLoop = channel.eventLoop
self.channel = channel
return channel
}
Expand Down Expand Up @@ -539,9 +547,11 @@ internal class TaskHandler<T: HTTPClientResponseDelegate>: ChannelInboundHandler
} else {
self.state = .head
self.mayRead = false
self.delegate.didReceiveHead(task: self.task, head).whenComplete { result in
self.handleBackpressureResult(context: context, result: result)
}
self.delegate.didReceiveHead(task: self.task, head)
.hop(to: context.eventLoop)
.whenComplete { result in
self.handleBackpressureResult(context: context, result: result)
}
}
case .body(let body):
switch self.state {
Expand All @@ -550,9 +560,11 @@ internal class TaskHandler<T: HTTPClientResponseDelegate>: ChannelInboundHandler
default:
self.state = .body
self.mayRead = false
self.delegate.didReceiveBodyPart(task: self.task, body).whenComplete { result in
self.handleBackpressureResult(context: context, result: result)
}
self.delegate.didReceiveBodyPart(task: self.task, body)
.hop(to: context.eventLoop)
.whenComplete { result in
self.handleBackpressureResult(context: context, result: result)
}
}
case .end:
switch self.state {
Expand Down
12 changes: 9 additions & 3 deletions Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ import NIOSSL
class TestHTTPDelegate: HTTPClientResponseDelegate {
typealias Response = Void

init(backpressureEventLoop: EventLoop? = nil) {
self.backpressureEventLoop = backpressureEventLoop
}

var backpressureEventLoop: EventLoop?

enum State {
case idle
case head(HTTPResponseHead)
Expand All @@ -33,7 +39,7 @@ class TestHTTPDelegate: HTTPClientResponseDelegate {

func didReceiveHead(task: HTTPClient.Task<Response>, _ head: HTTPResponseHead) -> EventLoopFuture<Void> {
self.state = .head(head)
return task.eventLoop.makeSucceededFuture(())
return (self.backpressureEventLoop ?? task.currentEventLoop).makeSucceededFuture(())
}

func didReceiveBodyPart(task: HTTPClient.Task<Response>, _ buffer: ByteBuffer) -> EventLoopFuture<Void> {
Expand All @@ -47,7 +53,7 @@ class TestHTTPDelegate: HTTPClientResponseDelegate {
default:
preconditionFailure("expecting head or body")
}
return task.eventLoop.makeSucceededFuture(())
return (self.backpressureEventLoop ?? task.currentEventLoop).makeSucceededFuture(())
}

func didFinishRequest(task: HTTPClient.Task<Response>) throws {}
Expand All @@ -63,7 +69,7 @@ class CountingDelegate: HTTPClientResponseDelegate {
if str?.starts(with: "id:") ?? false {
self.count += 1
}
return task.eventLoop.makeSucceededFuture(())
return task.currentEventLoop.makeSucceededFuture(())
}

func didFinishRequest(task: HTTPClient.Task<Response>) throws -> Int {
Expand Down
2 changes: 1 addition & 1 deletion Tests/AsyncHTTPClientTests/HTTPClientTests+XCTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ extension HTTPClientTests {
("testBadRequestURI", testBadRequestURI),
("testSchemaCasing", testSchemaCasing),
("testGet", testGet),
("testGetWithSharedEventLoopGroup", testGetWithSharedEventLoopGroup),
("testGetWithDifferentEventLoopBackpressure", testGetWithDifferentEventLoopBackpressure),
("testPost", testPost),
("testGetHttps", testGetHttps),
("testPostHttps", testPostHttps),
Expand Down
20 changes: 8 additions & 12 deletions Tests/AsyncHTTPClientTests/HTTPClientTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -62,23 +62,19 @@ class HTTPClientTests: XCTestCase {
XCTAssertEqual(.ok, response.status)
}

func testGetWithSharedEventLoopGroup() throws {
Copy link
Contributor Author

@PopFlamingo PopFlamingo Aug 31, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shouldn't be needed anymore now that we take care of the threading issues. This test could be re-introduced when we add a requires event loop preference to check that the requirements are always met.

func testGetWithDifferentEventLoopBackpressure() throws {
let httpBin = HttpBin()
let elg = MultiThreadedEventLoopGroup(numberOfThreads: 8)
let httpClient = HTTPClient(eventLoopGroupProvider: .shared(elg))
let loopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
let external = MultiThreadedEventLoopGroup(numberOfThreads: 1)
let httpClient = HTTPClient(eventLoopGroupProvider: .shared(loopGroup))
defer {
XCTAssertNoThrow(try httpClient.syncShutdown())
XCTAssertNoThrow(try elg.syncShutdownGracefully())
XCTAssertNoThrow(try loopGroup.syncShutdownGracefully())
httpBin.shutdown()
}

let delegate = TestHTTPDelegate()
let request = try HTTPClient.Request(url: "http://localhost:\(httpBin.port)/events/10/1")
let delegate = TestHTTPDelegate(backpressureEventLoop: external.next())
let task = httpClient.execute(request: request, delegate: delegate)
let expectedEventLoop = task.eventLoop
task.futureResult.whenComplete { _ in
XCTAssertTrue(expectedEventLoop.inEventLoop)
}
try task.wait()
}

Expand Down Expand Up @@ -508,8 +504,8 @@ class HTTPClientTests: XCTestCase {
}

func didReceiveHead(task: HTTPClient.Task<Bool>, _ head: HTTPResponseHead) -> EventLoopFuture<Void> {
self.result = task.eventLoop === self.eventLoop
return task.eventLoop.makeSucceededFuture(())
self.result = task.currentEventLoop === self.eventLoop
return task.currentEventLoop.makeSucceededFuture(())
}

func didFinishRequest(task: HTTPClient.Task<Bool>) throws -> Bool {
Expand Down