Skip to content

Commit 52f1ba7

Browse files
committed
Improvements to synchronisation in case of failures
This fixes an issue that would sometimes happen (exposed randomly by testFailingConnectionIsReleased)
1 parent 4672f31 commit 52f1ba7

File tree

2 files changed

+42
-39
lines changed

2 files changed

+42
-39
lines changed

Sources/AsyncHTTPClient/ConnectionPool.swift

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,7 @@ final class ConnectionPool {
319319
channel = bootstrap.connect(unixDomainSocketPath: self.key.unixPath)
320320
}
321321

322-
let connection = channel.flatMap { channel -> EventLoopFuture<ConnectionPool.Connection> in
322+
return channel.flatMap { channel -> EventLoopFuture<ConnectionPool.Connection> in
323323
channel.pipeline.addSSLHandlerIfNeeded(for: self.key, tlsConfiguration: self.configuration.tlsConfiguration, handshakePromise: handshakePromise).flatMap {
324324
channel.pipeline.addHTTPClientHandlers(leftOverBytesStrategy: .forwardBytes)
325325
}.map {
@@ -331,9 +331,7 @@ final class ConnectionPool {
331331
let (connection, _) = arg
332332
self.configureCloseCallback(of: connection)
333333
return connection
334-
}
335-
336-
connection.whenFailure { _ in
334+
}.flatMapError { error in
337335
let action = self.stateLock.withLock {
338336
self.state.failedConnectionAction()
339337
}
@@ -343,9 +341,8 @@ final class ConnectionPool {
343341
case .none:
344342
break
345343
}
344+
return self.eventLoop.makeFailedFuture(error)
346345
}
347-
348-
return connection
349346
}
350347

351348
/// Adds a callback on connection close that asks the `state` what to do about this

Sources/AsyncHTTPClient/HTTPClient.swift

Lines changed: 39 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -320,43 +320,49 @@ public class HTTPClient {
320320

321321
let connection = self.pool.getConnection(for: request, preference: eventLoopPreference, on: taskEL, deadline: deadline)
322322

323-
connection.flatMap { connection -> EventLoopFuture<Void> in
324-
let channel = connection.channel
325-
let addedFuture: EventLoopFuture<Void>
326-
327-
switch self.configuration.decompression {
328-
case .disabled:
329-
addedFuture = channel.eventLoop.makeSucceededFuture(())
330-
case .enabled(let limit):
331-
let decompressHandler = NIOHTTPResponseDecompressor(limit: limit)
332-
addedFuture = channel.pipeline.addHandler(decompressHandler, name: "decompressHandler")
333-
}
334-
335-
return addedFuture.flatMap {
336-
if let timeout = self.resolve(timeout: self.configuration.timeout.read, deadline: deadline) {
337-
return channel.pipeline.addHandler(IdleStateHandler(readTimeout: timeout), name: "timeoutHandler")
338-
} else {
339-
return channel.eventLoop.makeSucceededFuture(())
340-
}
341-
}.flatMap {
342-
let taskHandler = TaskHandler(task: task, delegate: delegate, redirectHandler: redirectHandler, ignoreUncleanSSLShutdown: self.configuration.ignoreUncleanSSLShutdown)
343-
return channel.pipeline.addHandler(taskHandler, name: "taskHandler")
344-
}.flatMap {
345-
task.setConnection(connection)
346-
347-
let isCancelled = task.lock.withLock {
348-
task.cancelled
323+
_ = connection
324+
.flatMap { connection -> EventLoopFuture<Void> in
325+
let channel = connection.channel
326+
let addedFuture: EventLoopFuture<Void>
327+
328+
switch self.configuration.decompression {
329+
case .disabled:
330+
addedFuture = channel.eventLoop.makeSucceededFuture(())
331+
case .enabled(let limit):
332+
let decompressHandler = NIOHTTPResponseDecompressor(limit: limit)
333+
addedFuture = channel.pipeline.addHandler(decompressHandler)
349334
}
350335

351-
if !isCancelled {
352-
return channel.writeAndFlush(request)
353-
} else {
354-
return channel.eventLoop.makeSucceededFuture(())
336+
return addedFuture.flatMap {
337+
if let timeout = self.resolve(timeout: self.configuration.timeout.read, deadline: deadline) {
338+
return channel.pipeline.addHandler(IdleStateHandler(readTimeout: timeout))
339+
} else {
340+
return channel.eventLoop.makeSucceededFuture(())
341+
}
342+
}.flatMap {
343+
let taskHandler = TaskHandler(task: task, delegate: delegate, redirectHandler: redirectHandler, ignoreUncleanSSLShutdown: self.configuration.ignoreUncleanSSLShutdown)
344+
return channel.pipeline.addHandler(taskHandler)
345+
}.flatMap {
346+
task.setConnection(connection)
347+
348+
let isCancelled = task.lock.withLock {
349+
task.cancelled
350+
}
351+
352+
if !isCancelled {
353+
return channel.writeAndFlush(request).flatMapError { _ in
354+
// At this point the `TaskHandler` will already be present
355+
// to handle the failure and pass it to the `promise`
356+
channel.eventLoop.makeSucceededFuture(())
357+
}
358+
} else {
359+
return channel.eventLoop.makeSucceededFuture(())
360+
}
355361
}
362+
}.whenFailure {
363+
promise.fail($0)
356364
}
357-
}.whenFailure {
358-
promise.fail($0)
359-
}
365+
360366
return task
361367
}
362368

0 commit comments

Comments
 (0)