From a920ff24cefae5e9719e6fcdbbef28a909ed4609 Mon Sep 17 00:00:00 2001 From: Artem Redkin Date: Thu, 5 Dec 2019 15:08:45 +0000 Subject: [PATCH 1/8] wip --- Sources/AsyncHTTPClient/HTTPClient.swift | 30 +++++++++++++++++++++--- 1 file changed, 27 insertions(+), 3 deletions(-) diff --git a/Sources/AsyncHTTPClient/HTTPClient.swift b/Sources/AsyncHTTPClient/HTTPClient.swift index 912e0b2fc..37bc4dcb6 100644 --- a/Sources/AsyncHTTPClient/HTTPClient.swift +++ b/Sources/AsyncHTTPClient/HTTPClient.swift @@ -72,15 +72,39 @@ public class HTTPClient { /// Shuts down the client and `EventLoopGroup` if it was created by the client. public func syncShutdown() throws { + let errorStorageLock = Lock() + var errorStorage: Error? = nil + let continuation = DispatchWorkItem {} + self.shutdownGracefully { error in + if let error = error { + errorStorageLock.withLock { + errorStorage = error + } + } + continuation.perform() + } + continuation.wait() + try errorStorageLock.withLock { + if let error = errorStorage { + throw error + } + } + } + + /// Shuts down the client and `EventLoopGroup` if it was created by the client. + /// + /// - parameters: + /// - callback: Callback to call when shutdown is complete. + public func shutdownGracefully(_ callback: @escaping (Error?) -> Void) { switch self.eventLoopGroupProvider { case .shared: self.isShutdown.store(true) - return + callback(nil) case .createNew: if self.isShutdown.compareAndExchange(expected: false, desired: true) { - try self.eventLoopGroup.syncShutdownGracefully() + self.eventLoopGroup.shutdownGracefully(callback) } else { - throw HTTPClientError.alreadyShutdown + callback(nil) } } } From f62e798e65b59c69558a90808a57a6c3d323cef6 Mon Sep 17 00:00:00 2001 From: Artem Redkin Date: Fri, 27 Mar 2020 09:40:28 +0000 Subject: [PATCH 2/8] first-try implementation --- Sources/AsyncHTTPClient/ConnectionPool.swift | 52 ++++---- Sources/AsyncHTTPClient/HTTPClient.swift | 123 +++++++++++++------ 2 files changed, 116 insertions(+), 59 deletions(-) diff --git a/Sources/AsyncHTTPClient/ConnectionPool.swift b/Sources/AsyncHTTPClient/ConnectionPool.swift index ebede078b..cc453e04a 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool.swift @@ -93,20 +93,23 @@ final class ConnectionPool { } } - func prepareForClose() { - let connectionProviders = self.connectionProvidersLock.withLock { self.connectionProviders.values } - for connectionProvider in connectionProviders { - connectionProvider.prepareForClose() + func prepareForClose(on eventLoop: EventLoop) -> EventLoopFuture { + let connectionProviders = self.connectionProvidersLock.withLock { + self.connectionProviders.values } + + return EventLoopFuture.andAllComplete(connectionProviders.map { $0.prepareForClose() }, on: eventLoop) } - func syncClose() { - let connectionProviders = self.connectionProvidersLock.withLock { self.connectionProviders.values } - for connectionProvider in connectionProviders { - connectionProvider.syncClose() + func close(on eventLoop: EventLoop) -> EventLoopFuture { + let connectionProviders = self.connectionProvidersLock.withLock { + self.connectionProviders.values } - self.connectionProvidersLock.withLock { - assert(self.connectionProviders.count == 0, "left-overs: \(self.connectionProviders)") + + return EventLoopFuture.andAllComplete(connectionProviders.map { $0.close() }, on: eventLoop).map { + self.connectionProvidersLock.withLock { + assert(self.connectionProviders.count == 0, "left-overs: \(self.connectionProviders)") + } } } @@ -448,7 +451,7 @@ final class ConnectionPool { } /// Removes and fails all `waiters`, remove existing `availableConnections` and sets `state.activity` to `.closing` - func prepareForClose() { + func prepareForClose() -> EventLoopFuture { assert(MultiThreadedEventLoopGroup.currentEventLoop == nil, "HTTPClient shutdown on EventLoop unsupported") // calls .wait() so it would crash later anyway let (waitersFutures, closeFutures) = self.stateLock.withLock { () -> ([EventLoopFuture], [EventLoopFuture]) in @@ -461,26 +464,29 @@ final class ConnectionPool { let closeFutures = self.state.availableConnections.map { $0.close() } return (waitersFutures, closeFutures) } - try? EventLoopFuture.andAllComplete(waitersFutures, on: self.eventLoop).wait() - try? EventLoopFuture.andAllComplete(closeFutures, on: self.eventLoop).wait() - self.stateLock.withLock { - if self.state.leased == 0, self.state.availableConnections.isEmpty { - self.state.activity = .closed - } else { - self.state.activity = .closing + return EventLoopFuture.andAllComplete(waitersFutures, on: self.eventLoop) + .flatMap { + EventLoopFuture.andAllComplete(closeFutures, on: self.eventLoop) + } + .map { _ in + self.stateLock.withLock { + if self.state.leased == 0, self.state.availableConnections.isEmpty { + self.state.activity = .closed + } else { + self.state.activity = .closing + } + } } - } } - func syncClose() { - assert(MultiThreadedEventLoopGroup.currentEventLoop == nil, - "HTTPClient shutdown on EventLoop unsupported") // calls .wait() so it would crash later anyway + func close() -> EventLoopFuture { let availableConnections = self.stateLock.withLock { () -> CircularBuffer in assert(self.state.activity == .closing) return self.state.availableConnections } - try? EventLoopFuture.andAllComplete(availableConnections.map { $0.close() }, on: self.eventLoop).wait() + + return EventLoopFuture.andAllComplete(availableConnections.map { $0.close() }, on: self.eventLoop) } private func activityPrecondition(expected: Set) { diff --git a/Sources/AsyncHTTPClient/HTTPClient.swift b/Sources/AsyncHTTPClient/HTTPClient.swift index 0ecd49a05..6a506ea9d 100644 --- a/Sources/AsyncHTTPClient/HTTPClient.swift +++ b/Sources/AsyncHTTPClient/HTTPClient.swift @@ -93,55 +93,106 @@ public class HTTPClient { /// this indicate shutdown was called too early before tasks were completed or explicitly canceled. /// In general, setting this parameter to `true` should make it easier and faster to catch related programming errors. internal func syncShutdown(requiresCleanClose: Bool) throws { - var closeError: Error? - - let tasks = try self.stateLock.withLock { () -> Dictionary.Values in - if self.state != .upAndRunning { - throw HTTPClientError.alreadyShutdown + if let eventLoop = MultiThreadedEventLoopGroup.currentEventLoop { + preconditionFailure(""" + BUG DETECTED: syncShutdown() must not be called when on an EventLoop. + Calling syncShutdown() on any EventLoop can lead to deadlocks. + Current eventLoop: \(eventLoop) + """) + } + let errorStorageLock = Lock() + var errorStorage: Error? = nil + let continuation = DispatchWorkItem {} + self.shutdown(requiresCleanClose: requiresCleanClose, queue: .global()) { error in + if let error = error { + errorStorageLock.withLock { + errorStorage = error + } + } + continuation.perform() + } + continuation.wait() + try errorStorageLock.withLock { + if let error = errorStorage { + throw error } - self.state = .shuttingDown - return self.tasks.values } + } - self.pool.prepareForClose() + /// Shuts down the client and event loop gracefully. This function is clearly an outlier in that it uses a completion + /// callback instead of an EventLoopFuture. The reason for that is that NIO's EventLoopFutures will call back on an event loop. + /// The virtue of this function is to shut the event loop down. To work around that we call back on a DispatchQueue + /// instead. + public func shutdown( _ callback: @escaping (Error?) -> Void) { + self.shutdown(requiresCleanClose: false, queue: .global(), callback) + } - if !tasks.isEmpty, requiresCleanClose { - closeError = HTTPClientError.uncleanShutdown - } + public func shutdown(queue: DispatchQueue, _ callback: @escaping (Error?) -> Void) { + self.shutdown(requiresCleanClose: false, queue: queue, callback) + } + + private func shutdown(requiresCleanClose: Bool, queue: DispatchQueue, _ callback: @escaping (Error?) -> Void) { + let tasks: Dictionary.Values + do { + tasks = try self.stateLock.withLock { + if self.state != .upAndRunning { + throw HTTPClientError.alreadyShutdown + } - for task in tasks { - task.cancel() + self.state = .shuttingDown + return self.tasks.values + } + } catch { + callback(error) + return } - try? EventLoopFuture.andAllComplete((tasks.map { $0.completion }), on: self.eventLoopGroup.next()).wait() + self.pool.prepareForClose(on: self.eventLoopGroup.next()) + .flatMapThrowing { () -> Dictionary.Values in + if !tasks.isEmpty, requiresCleanClose { + throw HTTPClientError.uncleanShutdown + } - self.pool.syncClose() + for task in tasks { + task.cancel() + } - do { - try self.stateLock.withLock { - switch self.eventLoopGroupProvider { - case .shared: - self.state = .shutDown - return - case .createNew: - switch self.state { - case .shuttingDown: + return tasks + } + .flatMap { tasks in EventLoopFuture.andAllComplete((tasks.map { $0.completion }), on: self.eventLoopGroup.next()) } + .flatMap { self.pool.close(on: self.eventLoopGroup.next()) } + .whenComplete { result in + var closeError: Error? + switch result { + case .failure(let error): + closeError = error + case .success: + break + } + + self.stateLock.withLock { + switch self.eventLoopGroupProvider { + case .shared: self.state = .shutDown - try self.eventLoopGroup.syncShutdownGracefully() - case .shutDown, .upAndRunning: - assertionFailure("The only valid state at this point is \(State.shutDown)") + callback(closeError) + case .createNew: + switch self.state { + case .shuttingDown: + self.state = .shutDown + self.eventLoopGroup.shutdownGracefully(queue: queue) { eventLoopError in + // we ignore event loop close error in favour of closeError + if let error = closeError { + callback(error) + } else { + callback(eventLoopError) + } + } + case .shutDown, .upAndRunning: + assertionFailure("The only valid state at this point is \(State.shutDown)") + } } } } - } catch { - if closeError == nil { - closeError = error - } - } - - if let closeError = closeError { - throw closeError - } } /// Execute `GET` request using specified URL. From 4c5b9cc05e96c192c49a71ceb06a72588945b426 Mon Sep 17 00:00:00 2001 From: Artem Redkin Date: Fri, 27 Mar 2020 11:29:31 +0000 Subject: [PATCH 3/8] refactor --- Sources/AsyncHTTPClient/ConnectionPool.swift | 2 - Sources/AsyncHTTPClient/HTTPClient.swift | 96 ++++++++++---------- 2 files changed, 48 insertions(+), 50 deletions(-) diff --git a/Sources/AsyncHTTPClient/ConnectionPool.swift b/Sources/AsyncHTTPClient/ConnectionPool.swift index cc453e04a..dfb7d5f4f 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool.swift @@ -452,8 +452,6 @@ final class ConnectionPool { /// Removes and fails all `waiters`, remove existing `availableConnections` and sets `state.activity` to `.closing` func prepareForClose() -> EventLoopFuture { - assert(MultiThreadedEventLoopGroup.currentEventLoop == nil, - "HTTPClient shutdown on EventLoop unsupported") // calls .wait() so it would crash later anyway let (waitersFutures, closeFutures) = self.stateLock.withLock { () -> ([EventLoopFuture], [EventLoopFuture]) in // Fail waiters let waitersCopy = self.state.waiters diff --git a/Sources/AsyncHTTPClient/HTTPClient.swift b/Sources/AsyncHTTPClient/HTTPClient.swift index 6a506ea9d..a7a1f5c4c 100644 --- a/Sources/AsyncHTTPClient/HTTPClient.swift +++ b/Sources/AsyncHTTPClient/HTTPClient.swift @@ -131,68 +131,68 @@ public class HTTPClient { self.shutdown(requiresCleanClose: false, queue: queue, callback) } - private func shutdown(requiresCleanClose: Bool, queue: DispatchQueue, _ callback: @escaping (Error?) -> Void) { - let tasks: Dictionary.Values - do { - tasks = try self.stateLock.withLock { - if self.state != .upAndRunning { - throw HTTPClientError.alreadyShutdown - } - - self.state = .shuttingDown - return self.tasks.values - } - } catch { - callback(error) - return + private func cancelTasks(_ tasks: Dictionary.Values) -> EventLoopFuture { + for task in tasks { + task.cancel() } - self.pool.prepareForClose(on: self.eventLoopGroup.next()) - .flatMapThrowing { () -> Dictionary.Values in - if !tasks.isEmpty, requiresCleanClose { - throw HTTPClientError.uncleanShutdown - } + return EventLoopFuture.andAllComplete(tasks.map { $0.completion }, on: self.eventLoopGroup.next()) + } - for task in tasks { - task.cancel() + private func shutdownEventLoop(queue: DispatchQueue, _ callback: @escaping (Error?) -> Void) { + self.stateLock.withLock { + switch self.eventLoopGroupProvider { + case .shared: + self.state = .shutDown + callback(nil) + case .createNew: + switch self.state { + case .shuttingDown: + self.state = .shutDown + self.eventLoopGroup.shutdownGracefully(queue: queue, callback) + case .shutDown, .upAndRunning: + assertionFailure("The only valid state at this point is \(State.shutDown)") } + } + } + } - return tasks + private func shutdown(requiresCleanClose: Bool, queue: DispatchQueue, _ callback: @escaping (Error?) -> Void) { + let result: Result.Values, Error> = self.stateLock.withLock { + if self.state != .upAndRunning { + return .failure(HTTPClientError.alreadyShutdown) + } else { + self.state = .shuttingDown + return .success(self.tasks.values) } - .flatMap { tasks in EventLoopFuture.andAllComplete((tasks.map { $0.completion }), on: self.eventLoopGroup.next()) } - .flatMap { self.pool.close(on: self.eventLoopGroup.next()) } - .whenComplete { result in + } + + switch result { + case .failure(let error): + callback(error) + case .success(let tasks): + self.pool.prepareForClose(on: self.eventLoopGroup.next()).whenComplete { _ in var closeError: Error? - switch result { - case .failure(let error): - closeError = error - case .success: - break + if !tasks.isEmpty, requiresCleanClose { + closeError = HTTPClientError.uncleanShutdown } - self.stateLock.withLock { - switch self.eventLoopGroupProvider { - case .shared: - self.state = .shutDown - callback(closeError) - case .createNew: - switch self.state { - case .shuttingDown: - self.state = .shutDown - self.eventLoopGroup.shutdownGracefully(queue: queue) { eventLoopError in - // we ignore event loop close error in favour of closeError - if let error = closeError { - callback(error) - } else { - callback(eventLoopError) - } + // we ignore errors here + self.cancelTasks(tasks).whenComplete { _ in + // we ignore errors here + self.pool.close(on: self.eventLoopGroup.next()).whenComplete { _ in + self.shutdownEventLoop(queue: queue) { eventLoopError in + // we prioritise .uncleanShutdown here + if let error = closeError { + callback(error) + } else { + callback(eventLoopError) } - case .shutDown, .upAndRunning: - assertionFailure("The only valid state at this point is \(State.shutDown)") } } } } + } } /// Execute `GET` request using specified URL. From e8f5a9d4646b3b4beb0579640f5ce6c712cd5c32 Mon Sep 17 00:00:00 2001 From: Artem Redkin Date: Fri, 27 Mar 2020 11:40:10 +0000 Subject: [PATCH 4/8] fix formatting issues --- Sources/AsyncHTTPClient/HTTPClient.swift | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Sources/AsyncHTTPClient/HTTPClient.swift b/Sources/AsyncHTTPClient/HTTPClient.swift index a7a1f5c4c..d5adf8cc5 100644 --- a/Sources/AsyncHTTPClient/HTTPClient.swift +++ b/Sources/AsyncHTTPClient/HTTPClient.swift @@ -101,7 +101,7 @@ public class HTTPClient { """) } let errorStorageLock = Lock() - var errorStorage: Error? = nil + var errorStorage: Error? let continuation = DispatchWorkItem {} self.shutdown(requiresCleanClose: requiresCleanClose, queue: .global()) { error in if let error = error { @@ -123,7 +123,7 @@ public class HTTPClient { /// callback instead of an EventLoopFuture. The reason for that is that NIO's EventLoopFutures will call back on an event loop. /// The virtue of this function is to shut the event loop down. To work around that we call back on a DispatchQueue /// instead. - public func shutdown( _ callback: @escaping (Error?) -> Void) { + public func shutdown(_ callback: @escaping (Error?) -> Void) { self.shutdown(requiresCleanClose: false, queue: .global(), callback) } From f2ee93e269b9cbd2ac7dcb9b77225dddad9ab11a Mon Sep 17 00:00:00 2001 From: Artem Redkin Date: Fri, 27 Mar 2020 12:01:23 +0000 Subject: [PATCH 5/8] add a test --- Tests/AsyncHTTPClientTests/HTTPClientTests.swift | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/Tests/AsyncHTTPClientTests/HTTPClientTests.swift b/Tests/AsyncHTTPClientTests/HTTPClientTests.swift index bba05a63a..7380aa947 100644 --- a/Tests/AsyncHTTPClientTests/HTTPClientTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPClientTests.swift @@ -1673,4 +1673,17 @@ class HTTPClientTests: XCTestCase { } } } + + func testAsyncShutdown() { + let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) + let httpClient = HTTPClient(eventLoopGroupProvider: .shared(eventLoopGroup)) + let promise = eventLoopGroup.next().makePromise(of: Void.self) + eventLoopGroup.next().execute { + httpClient.shutdown { error in + XCTAssertNil(error) + promise.succeed(()) + } + } + XCTAssertNoThrow(try promise.futureResult.wait()) + } } From 7b07de611ca3336a9e40b37553a16231c2e1176d Mon Sep 17 00:00:00 2001 From: Artem Redkin Date: Fri, 27 Mar 2020 12:02:50 +0000 Subject: [PATCH 6/8] fix linux tests --- Tests/AsyncHTTPClientTests/HTTPClientTests+XCTest.swift | 1 + 1 file changed, 1 insertion(+) diff --git a/Tests/AsyncHTTPClientTests/HTTPClientTests+XCTest.swift b/Tests/AsyncHTTPClientTests/HTTPClientTests+XCTest.swift index 9a53e36ce..0ffeb42a6 100644 --- a/Tests/AsyncHTTPClientTests/HTTPClientTests+XCTest.swift +++ b/Tests/AsyncHTTPClientTests/HTTPClientTests+XCTest.swift @@ -96,6 +96,7 @@ extension HTTPClientTests { ("testPoolClosesIdleConnections", testPoolClosesIdleConnections), ("testRacePoolIdleConnectionsAndGet", testRacePoolIdleConnectionsAndGet), ("testAvoidLeakingTLSHandshakeCompletionPromise", testAvoidLeakingTLSHandshakeCompletionPromise), + ("testAsyncShutdown", testAsyncShutdown), ] } } From ed17f2b32ecaa2e58eec1e1b7bae12e94b2b84d9 Mon Sep 17 00:00:00 2001 From: Artem Redkin Date: Fri, 27 Mar 2020 12:23:31 +0000 Subject: [PATCH 7/8] review fixes --- Sources/AsyncHTTPClient/HTTPClient.swift | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/Sources/AsyncHTTPClient/HTTPClient.swift b/Sources/AsyncHTTPClient/HTTPClient.swift index d5adf8cc5..548f038ad 100644 --- a/Sources/AsyncHTTPClient/HTTPClient.swift +++ b/Sources/AsyncHTTPClient/HTTPClient.swift @@ -103,7 +103,7 @@ public class HTTPClient { let errorStorageLock = Lock() var errorStorage: Error? let continuation = DispatchWorkItem {} - self.shutdown(requiresCleanClose: requiresCleanClose, queue: .global()) { error in + self.shutdown(requiresCleanClose: requiresCleanClose, queue: DispatchQueue(label: "async-http-client.shutdown")) { error in if let error = error { errorStorageLock.withLock { errorStorage = error @@ -123,10 +123,6 @@ public class HTTPClient { /// callback instead of an EventLoopFuture. The reason for that is that NIO's EventLoopFutures will call back on an event loop. /// The virtue of this function is to shut the event loop down. To work around that we call back on a DispatchQueue /// instead. - public func shutdown(_ callback: @escaping (Error?) -> Void) { - self.shutdown(requiresCleanClose: false, queue: .global(), callback) - } - public func shutdown(queue: DispatchQueue, _ callback: @escaping (Error?) -> Void) { self.shutdown(requiresCleanClose: false, queue: queue, callback) } From df346490f76abb351659d4ed8e9d713b79293153 Mon Sep 17 00:00:00 2001 From: Artem Redkin Date: Fri, 27 Mar 2020 12:26:26 +0000 Subject: [PATCH 8/8] fix compilation --- Tests/AsyncHTTPClientTests/HTTPClientTests.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Tests/AsyncHTTPClientTests/HTTPClientTests.swift b/Tests/AsyncHTTPClientTests/HTTPClientTests.swift index 7380aa947..731586db9 100644 --- a/Tests/AsyncHTTPClientTests/HTTPClientTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPClientTests.swift @@ -1679,7 +1679,7 @@ class HTTPClientTests: XCTestCase { let httpClient = HTTPClient(eventLoopGroupProvider: .shared(eventLoopGroup)) let promise = eventLoopGroup.next().makePromise(of: Void.self) eventLoopGroup.next().execute { - httpClient.shutdown { error in + httpClient.shutdown(queue: DispatchQueue(label: "testAsyncShutdown")) { error in XCTAssertNil(error) promise.succeed(()) }