Skip to content

Close idle pool connections #170

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Mar 3, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 52 additions & 11 deletions Sources/AsyncHTTPClient/HTTPClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ public class HTTPClient {
redirectHandler = nil
}

let task = Task<Delegate.Response>(eventLoop: taskEL)
let task = Task<Delegate.Response>(eventLoop: taskEL, poolingTimeout: self.configuration.poolingTimeout)
self.stateLock.withLock {
self.tasks[task.id] = task
}
Expand All @@ -321,17 +321,18 @@ public class HTTPClient {

connection.flatMap { connection -> EventLoopFuture<Void> in
let channel = connection.channel
let addedFuture: EventLoopFuture<Void>

switch self.configuration.decompression {
case .disabled:
addedFuture = channel.eventLoop.makeSucceededFuture(())
case .enabled(let limit):
let decompressHandler = NIOHTTPResponseDecompressor(limit: limit)
addedFuture = channel.pipeline.addHandler(decompressHandler)
}

return addedFuture.flatMap {
return connection.removeHandler(IdleStateHandler.self).flatMap {
connection.removeHandler(IdlePoolConnectionHandler.self)
}.flatMap {
switch self.configuration.decompression {
case .disabled:
return channel.eventLoop.makeSucceededFuture(())
case .enabled(let limit):
let decompressHandler = NIOHTTPResponseDecompressor(limit: limit)
return channel.pipeline.addHandler(decompressHandler)
}
}.flatMap {
if let timeout = self.resolve(timeout: self.configuration.timeout.read, deadline: deadline) {
return channel.pipeline.addHandler(IdleStateHandler(readTimeout: timeout))
} else {
Expand Down Expand Up @@ -408,6 +409,8 @@ public class HTTPClient {
public var redirectConfiguration: RedirectConfiguration
/// Default client timeout, defaults to no timeouts.
public var timeout: Timeout
/// Timeout of pooled connections
public var poolingTimeout: TimeAmount?
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a good name for this setting?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd make it a bit longer and more descriptive. Maybe maximumAllowedIdleTimeInConnectionPool or so? @Lukasa ?

Copy link
Contributor Author

@PopFlamingo PopFlamingo Feb 26, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I set it to this for now, thanks.

/// Upstream proxy, defaults to no proxy.
public var proxy: Proxy?
/// Enables automatic body decompression. Supported algorithms are gzip and deflate.
Expand All @@ -418,30 +421,68 @@ public class HTTPClient {
public init(tlsConfiguration: TLSConfiguration? = nil,
redirectConfiguration: RedirectConfiguration? = nil,
timeout: Timeout = Timeout(),
poolingTimeout: TimeAmount,
proxy: Proxy? = nil,
ignoreUncleanSSLShutdown: Bool = false,
decompression: Decompression = .disabled) {
self.tlsConfiguration = tlsConfiguration
self.redirectConfiguration = redirectConfiguration ?? RedirectConfiguration()
self.timeout = timeout
self.poolingTimeout = poolingTimeout
self.proxy = proxy
self.ignoreUncleanSSLShutdown = ignoreUncleanSSLShutdown
self.decompression = decompression
}

public init(tlsConfiguration: TLSConfiguration? = nil,
redirectConfiguration: RedirectConfiguration? = nil,
timeout: Timeout = Timeout(),
proxy: Proxy? = nil,
ignoreUncleanSSLShutdown: Bool = false,
decompression: Decompression = .disabled) {
self.init(
tlsConfiguration: tlsConfiguration,
redirectConfiguration: redirectConfiguration,
timeout: timeout,
poolingTimeout: .seconds(60),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think of the default 60 seconds timeout?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm okay with this, @Lukasa ?

proxy: proxy,
ignoreUncleanSSLShutdown: ignoreUncleanSSLShutdown,
decompression: decompression
)
}

public init(certificateVerification: CertificateVerification,
redirectConfiguration: RedirectConfiguration? = nil,
timeout: Timeout = Timeout(),
poolingTimeout: TimeAmount = .seconds(60),
proxy: Proxy? = nil,
ignoreUncleanSSLShutdown: Bool = false,
decompression: Decompression = .disabled) {
self.tlsConfiguration = TLSConfiguration.forClient(certificateVerification: certificateVerification)
self.redirectConfiguration = redirectConfiguration ?? RedirectConfiguration()
self.timeout = timeout
self.poolingTimeout = poolingTimeout
self.proxy = proxy
self.ignoreUncleanSSLShutdown = ignoreUncleanSSLShutdown
self.decompression = decompression
}

public init(certificateVerification: CertificateVerification,
redirectConfiguration: RedirectConfiguration? = nil,
timeout: Timeout = Timeout(),
proxy: Proxy? = nil,
ignoreUncleanSSLShutdown: Bool = false,
decompression: Decompression = .disabled) {
self.init(
certificateVerification: certificateVerification,
redirectConfiguration: redirectConfiguration,
timeout: timeout,
poolingTimeout: .seconds(60),
proxy: proxy,
ignoreUncleanSSLShutdown: ignoreUncleanSSLShutdown,
decompression: decompression
)
}
}

/// Specifies how `EventLoopGroup` will be created and establishes lifecycle ownership.
Expand Down
26 changes: 25 additions & 1 deletion Sources/AsyncHTTPClient/HTTPHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -497,13 +497,15 @@ extension HTTPClient {
var cancelled: Bool
let lock: Lock
let id = UUID()
let poolingTimeout: TimeAmount?

init(eventLoop: EventLoop) {
init(eventLoop: EventLoop, poolingTimeout: TimeAmount? = nil) {
self.eventLoop = eventLoop
self.promise = eventLoop.makePromise()
self.completion = self.promise.futureResult.map { _ in }
self.cancelled = false
self.lock = Lock()
self.poolingTimeout = poolingTimeout
}

static func failedTask(eventLoop: EventLoop, error: Error) -> Task<Response> {
Expand Down Expand Up @@ -571,6 +573,19 @@ extension HTTPClient {
connection.removeHandler(IdleStateHandler.self)
}.flatMap {
connection.removeHandler(TaskHandler<Delegate>.self)
}.flatMap {
connection.channel.pipeline.addHandlers([
IdleStateHandler(writeTimeout: self.poolingTimeout),
IdlePoolConnectionHandler(),
])
}.flatMapError { error in
if let error = error as? ChannelError, error == .ioOnClosedChannel {
// We may get this error if channel is released because it is
// closed, it is safe to ignore it
return connection.channel.eventLoop.makeSucceededFuture(())
} else {
return connection.channel.eventLoop.makeFailedFuture(error)
}
}.map {
connection.release()
}.flatMapError { error in
Expand Down Expand Up @@ -1008,3 +1023,12 @@ internal struct RedirectHandler<ResponseType> {
}
}
}

class IdlePoolConnectionHandler: ChannelInboundHandler, RemovableChannelHandler {
typealias InboundIn = NIOAny
func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) {
if let idleEvent = event as? IdleStateHandler.IdleStateEvent, idleEvent == .write {
context.close(promise: nil)
}
}
}
33 changes: 32 additions & 1 deletion Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,11 @@ internal final class HTTPBin {
let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
let serverChannel: Channel
let isShutdown: NIOAtomic<Bool> = .makeAtomic(value: false)
var connectionCount: NIOAtomic<Int> = .makeAtomic(value: 0)
private let activeConnCounterHandler: CountActiveConnectionsHandler
var activeConnections: Int {
return self.activeConnCounterHandler.currentlyActiveConnections
}

enum BindTarget {
case unixDomainSocket(String)
Expand Down Expand Up @@ -204,10 +209,15 @@ internal final class HTTPBin {
socketAddress = try! SocketAddress(unixDomainSocketPath: path)
}

let activeConnCounterHandler = CountActiveConnectionsHandler()
self.activeConnCounterHandler = activeConnCounterHandler

self.serverChannel = try! ServerBootstrap(group: self.group)
.serverChannelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
.childChannelOption(ChannelOptions.socket(IPPROTO_TCP, TCP_NODELAY), value: 1)
.childChannelInitializer { channel in
.serverChannelInitializer { channel in
channel.pipeline.addHandler(activeConnCounterHandler)
}.childChannelInitializer { channel in
guard !refusesConnections else {
return channel.eventLoop.makeFailedFuture(HTTPBinError.refusedConnection)
}
Expand Down Expand Up @@ -537,6 +547,27 @@ internal final class HttpBinHandler: ChannelInboundHandler {
}
}

final class CountActiveConnectionsHandler: ChannelInboundHandler {
typealias InboundIn = Channel

private let activeConns = NIOAtomic<Int>.makeAtomic(value: 0)

public var currentlyActiveConnections: Int {
return self.activeConns.load()
}

func channelRead(context: ChannelHandlerContext, data: NIOAny) {
let channel = self.unwrapInboundIn(data)

_ = self.activeConns.add(1)
channel.closeFuture.whenComplete { _ in
_ = self.activeConns.sub(1)
}

context.fireChannelRead(data)
}
}

internal class HttpBinForSSLUncleanShutdown {
let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
let serverChannel: Channel
Expand Down
1 change: 1 addition & 0 deletions Tests/AsyncHTTPClientTests/HTTPClientTests+XCTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ extension HTTPClientTests {
("testUDSSocketAndPath", testUDSSocketAndPath),
("testUseExistingConnectionOnDifferentEL", testUseExistingConnectionOnDifferentEL),
("testWeRecoverFromServerThatClosesTheConnectionOnUs", testWeRecoverFromServerThatClosesTheConnectionOnUs),
("testPoolClosesIdleConnections", testPoolClosesIdleConnections),
]
}
}
13 changes: 13 additions & 0 deletions Tests/AsyncHTTPClientTests/HTTPClientTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -1631,4 +1631,17 @@ class HTTPClientTests: XCTestCase {
XCTAssertEqual(2, sharedStateServerHandler.connectionNumber.load())
XCTAssertEqual(3, sharedStateServerHandler.requestNumber.load())
}

func testPoolClosesIdleConnections() {
let httpBin = HTTPBin()
let httpClient = HTTPClient(eventLoopGroupProvider: .createNew, configuration: .init(poolingTimeout: .seconds(1)))
defer {
XCTAssertNoThrow(try httpBin.shutdown())
XCTAssertNoThrow(try httpClient.syncShutdown(requiresCleanClose: true))
}
XCTAssertNoThrow(try httpClient.get(url: "http://localhost:\(httpBin.port)/get").wait())
XCTAssertEqual(httpBin.activeConnections, 1)
Thread.sleep(forTimeInterval: 2)
XCTAssertEqual(httpBin.activeConnections, 0)
}
}