Skip to content

Commit d2d3566

Browse files
authored
Add an idle write timeout (#718)
1 parent c70e085 commit d2d3566

10 files changed

+585
-21
lines changed

Sources/AsyncHTTPClient/AsyncAwait/Transaction+StateMachine.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -356,6 +356,7 @@ extension Transaction {
356356
// response body stream.
357357
let body = TransactionBody.makeSequence(
358358
backPressureStrategy: .init(lowWatermark: 1, highWatermark: 1),
359+
finishOnDeinit: true,
359360
delegate: AnyAsyncSequenceProducerDelegate(delegate)
360361
)
361362

Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ClientChannelHandler.swift

Lines changed: 163 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,17 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
4242
if let idleReadTimeout = newRequest.requestOptions.idleReadTimeout {
4343
self.idleReadTimeoutStateMachine = .init(timeAmount: idleReadTimeout)
4444
}
45+
46+
if let idleWriteTimeout = newRequest.requestOptions.idleWriteTimeout {
47+
self.idleWriteTimeoutStateMachine = .init(
48+
timeAmount: idleWriteTimeout,
49+
isWritabilityEnabled: self.channelContext?.channel.isWritable ?? false
50+
)
51+
}
4552
} else {
4653
self.logger = self.backgroundLogger
4754
self.idleReadTimeoutStateMachine = nil
55+
self.idleWriteTimeoutStateMachine = nil
4856
}
4957
}
5058
}
@@ -57,6 +65,14 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
5765
/// We check in the task if the timer ID has changed in the meantime and do not execute any action if has changed.
5866
private var currentIdleReadTimeoutTimerID: Int = 0
5967

68+
private var idleWriteTimeoutStateMachine: IdleWriteStateMachine?
69+
private var idleWriteTimeoutTimer: Scheduled<Void>?
70+
71+
/// Cancelling a task in NIO does *not* guarantee that the task will not execute under certain race conditions.
72+
/// We therefore give each timer an ID and increase the ID every time we reset or cancel it.
73+
/// We check in the task if the timer ID has changed in the meantime and do not execute any action if has changed.
74+
private var currentIdleWriteTimeoutTimerID: Int = 0
75+
6076
private let backgroundLogger: Logger
6177
private var logger: Logger
6278
private let eventLoop: EventLoop
@@ -106,6 +122,10 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
106122
"ahc-channel-writable": "\(context.channel.isWritable)",
107123
])
108124

125+
if let timeoutAction = self.idleWriteTimeoutStateMachine?.channelWritabilityChanged(context: context) {
126+
self.runTimeoutAction(timeoutAction, context: context)
127+
}
128+
109129
let action = self.state.writabilityChanged(writable: context.channel.isWritable)
110130
self.run(action, context: context)
111131
context.fireChannelWritabilityChanged()
@@ -150,6 +170,11 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
150170
self.request = req
151171

152172
self.logger.debug("Request was scheduled on connection")
173+
174+
if let timeoutAction = self.idleWriteTimeoutStateMachine?.write() {
175+
self.runTimeoutAction(timeoutAction, context: context)
176+
}
177+
153178
req.willExecuteRequest(self)
154179

155180
let action = self.state.runNewRequest(
@@ -196,8 +221,12 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
196221
request.resumeRequestBodyStream()
197222
}
198223
if startIdleTimer {
199-
if let timeoutAction = self.idleReadTimeoutStateMachine?.requestEndSent() {
200-
self.runTimeoutAction(timeoutAction, context: context)
224+
if let readTimeoutAction = self.idleReadTimeoutStateMachine?.requestEndSent() {
225+
self.runTimeoutAction(readTimeoutAction, context: context)
226+
}
227+
228+
if let writeTimeoutAction = self.idleWriteTimeoutStateMachine?.requestEndSent() {
229+
self.runTimeoutAction(writeTimeoutAction, context: context)
201230
}
202231
}
203232
case .sendBodyPart(let part, let writePromise):
@@ -206,8 +235,12 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
206235
case .sendRequestEnd(let writePromise):
207236
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: writePromise)
208237

209-
if let timeoutAction = self.idleReadTimeoutStateMachine?.requestEndSent() {
210-
self.runTimeoutAction(timeoutAction, context: context)
238+
if let readTimeoutAction = self.idleReadTimeoutStateMachine?.requestEndSent() {
239+
self.runTimeoutAction(readTimeoutAction, context: context)
240+
}
241+
242+
if let writeTimeoutAction = self.idleWriteTimeoutStateMachine?.requestEndSent() {
243+
self.runTimeoutAction(writeTimeoutAction, context: context)
211244
}
212245

213246
case .pauseRequestBodyStream:
@@ -380,6 +413,40 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
380413
}
381414
}
382415

416+
private func runTimeoutAction(_ action: IdleWriteStateMachine.Action, context: ChannelHandlerContext) {
417+
switch action {
418+
case .startIdleWriteTimeoutTimer(let timeAmount):
419+
assert(self.idleWriteTimeoutTimer == nil, "Expected there is no timeout timer so far.")
420+
421+
let timerID = self.currentIdleWriteTimeoutTimerID
422+
self.idleWriteTimeoutTimer = self.eventLoop.scheduleTask(in: timeAmount) {
423+
guard self.currentIdleWriteTimeoutTimerID == timerID else { return }
424+
let action = self.state.idleWriteTimeoutTriggered()
425+
self.run(action, context: context)
426+
}
427+
case .resetIdleWriteTimeoutTimer(let timeAmount):
428+
if let oldTimer = self.idleWriteTimeoutTimer {
429+
oldTimer.cancel()
430+
}
431+
432+
self.currentIdleWriteTimeoutTimerID &+= 1
433+
let timerID = self.currentIdleWriteTimeoutTimerID
434+
self.idleWriteTimeoutTimer = self.eventLoop.scheduleTask(in: timeAmount) {
435+
guard self.currentIdleWriteTimeoutTimerID == timerID else { return }
436+
let action = self.state.idleWriteTimeoutTriggered()
437+
self.run(action, context: context)
438+
}
439+
case .clearIdleWriteTimeoutTimer:
440+
if let oldTimer = self.idleWriteTimeoutTimer {
441+
self.idleWriteTimeoutTimer = nil
442+
self.currentIdleWriteTimeoutTimerID &+= 1
443+
oldTimer.cancel()
444+
}
445+
case .none:
446+
break
447+
}
448+
}
449+
383450
// MARK: Private HTTPRequestExecutor
384451

385452
private func writeRequestBodyPart0(_ data: IOData, request: HTTPExecutableRequest, promise: EventLoopPromise<Void>?) {
@@ -393,6 +460,10 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
393460
return
394461
}
395462

463+
if let timeoutAction = self.idleWriteTimeoutStateMachine?.write() {
464+
self.runTimeoutAction(timeoutAction, context: context)
465+
}
466+
396467
let action = self.state.requestStreamPartReceived(data, promise: promise)
397468
self.run(action, context: context)
398469
}
@@ -428,6 +499,10 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
428499

429500
self.logger.trace("Request was cancelled")
430501

502+
if let timeoutAction = self.idleWriteTimeoutStateMachine?.cancelRequest() {
503+
self.runTimeoutAction(timeoutAction, context: context)
504+
}
505+
431506
let action = self.state.requestCancelled(closeConnection: true)
432507
self.run(action, context: context)
433508
}
@@ -540,3 +615,87 @@ struct IdleReadStateMachine {
540615
}
541616
}
542617
}
618+
619+
struct IdleWriteStateMachine {
620+
enum Action {
621+
case startIdleWriteTimeoutTimer(TimeAmount)
622+
case resetIdleWriteTimeoutTimer(TimeAmount)
623+
case clearIdleWriteTimeoutTimer
624+
case none
625+
}
626+
627+
enum State {
628+
case waitingForRequestEnd
629+
case waitingForWritabilityEnabled
630+
case requestEndSent
631+
}
632+
633+
private var state: State
634+
private let timeAmount: TimeAmount
635+
636+
init(timeAmount: TimeAmount, isWritabilityEnabled: Bool) {
637+
self.timeAmount = timeAmount
638+
if isWritabilityEnabled {
639+
self.state = .waitingForRequestEnd
640+
} else {
641+
self.state = .waitingForWritabilityEnabled
642+
}
643+
}
644+
645+
mutating func cancelRequest() -> Action {
646+
switch self.state {
647+
case .waitingForRequestEnd, .waitingForWritabilityEnabled:
648+
self.state = .requestEndSent
649+
return .clearIdleWriteTimeoutTimer
650+
case .requestEndSent:
651+
return .none
652+
}
653+
}
654+
655+
mutating func write() -> Action {
656+
switch self.state {
657+
case .waitingForRequestEnd:
658+
return .resetIdleWriteTimeoutTimer(self.timeAmount)
659+
case .waitingForWritabilityEnabled:
660+
return .none
661+
case .requestEndSent:
662+
preconditionFailure("If the request end has been sent, we can't write more data.")
663+
}
664+
}
665+
666+
mutating func requestEndSent() -> Action {
667+
switch self.state {
668+
case .waitingForRequestEnd:
669+
self.state = .requestEndSent
670+
return .clearIdleWriteTimeoutTimer
671+
case .waitingForWritabilityEnabled:
672+
preconditionFailure("If the channel is not writable, we can't have sent the request end.")
673+
case .requestEndSent:
674+
return .none
675+
}
676+
}
677+
678+
mutating func channelWritabilityChanged(context: ChannelHandlerContext) -> Action {
679+
if context.channel.isWritable {
680+
switch self.state {
681+
case .waitingForRequestEnd:
682+
preconditionFailure("If waiting for more data, the channel was already writable.")
683+
case .waitingForWritabilityEnabled:
684+
self.state = .waitingForRequestEnd
685+
return .startIdleWriteTimeoutTimer(self.timeAmount)
686+
case .requestEndSent:
687+
return .none
688+
}
689+
} else {
690+
switch self.state {
691+
case .waitingForRequestEnd:
692+
self.state = .waitingForWritabilityEnabled
693+
return .clearIdleWriteTimeoutTimer
694+
case .waitingForWritabilityEnabled:
695+
preconditionFailure("If the channel was writable before, then we should have been waiting for more data.")
696+
case .requestEndSent:
697+
return .none
698+
}
699+
}
700+
}
701+
}

Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ConnectionStateMachine.swift

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -355,6 +355,18 @@ struct HTTP1ConnectionStateMachine {
355355
}
356356
}
357357

358+
mutating func idleWriteTimeoutTriggered() -> Action {
359+
guard case .inRequest(var requestStateMachine, let close) = self.state else {
360+
preconditionFailure("Invalid state: \(self.state)")
361+
}
362+
363+
return self.avoidingStateMachineCoW { state -> Action in
364+
let action = requestStateMachine.idleWriteTimeoutTriggered()
365+
state = .inRequest(requestStateMachine, close: close)
366+
return state.modify(with: action)
367+
}
368+
}
369+
358370
mutating func headSent() -> Action {
359371
guard case .inRequest(var requestStateMachine, let close) = self.state else {
360372
return .wait

0 commit comments

Comments
 (0)