@@ -42,20 +42,32 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
42
42
if let idleReadTimeout = newRequest. requestOptions. idleReadTimeout {
43
43
self . idleReadTimeoutStateMachine = . init( timeAmount: idleReadTimeout)
44
44
}
45
+
46
+ if let idleWriteTimeout = newRequest. requestOptions. idleWriteTimeout {
47
+ self . idleWriteTimeoutStateMachine = . init(
48
+ timeAmount: idleWriteTimeout,
49
+ isWritabilityEnabled: self . channelContext? . channel. isWritable ?? false
50
+ )
51
+ }
45
52
} else {
46
53
self . logger = self . backgroundLogger
47
54
self . idleReadTimeoutStateMachine = nil
55
+ self . idleWriteTimeoutStateMachine = nil
48
56
}
49
57
}
50
58
}
51
59
52
60
private var idleReadTimeoutStateMachine : IdleReadStateMachine ?
53
61
private var idleReadTimeoutTimer : Scheduled < Void > ?
54
62
63
+ private var idleWriteTimeoutStateMachine : IdleWriteStateMachine ?
64
+ private var idleWriteTimeoutTimer : Scheduled < Void > ?
65
+
55
66
/// Cancelling a task in NIO does *not* guarantee that the task will not execute under certain race conditions.
56
67
/// We therefore give each timer an ID and increase the ID every time we reset or cancel it.
57
68
/// We check in the task if the timer ID has changed in the meantime and do not execute any action if has changed.
58
69
private var currentIdleReadTimeoutTimerID : Int = 0
70
+ private var currentIdleWriteTimeoutTimerID : Int = 0
59
71
60
72
private let backgroundLogger : Logger
61
73
private var logger : Logger
@@ -106,6 +118,10 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
106
118
" ahc-channel-writable " : " \( context. channel. isWritable) " ,
107
119
] )
108
120
121
+ if let timeoutAction = self . idleWriteTimeoutStateMachine? . channelWritabilityChanged ( context: context) {
122
+ self . runTimeoutAction ( timeoutAction, context: context)
123
+ }
124
+
109
125
let action = self . state. writabilityChanged ( writable: context. channel. isWritable)
110
126
self . run ( action, context: context)
111
127
context. fireChannelWritabilityChanged ( )
@@ -150,6 +166,11 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
150
166
self . request = req
151
167
152
168
self . logger. debug ( " Request was scheduled on connection " )
169
+
170
+ if let timeoutAction = self . idleWriteTimeoutStateMachine? . write ( ) {
171
+ self . runTimeoutAction ( timeoutAction, context: context)
172
+ }
173
+
153
174
req. willExecuteRequest ( self )
154
175
155
176
let action = self . state. runNewRequest (
@@ -196,8 +217,12 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
196
217
request. resumeRequestBodyStream ( )
197
218
}
198
219
if startIdleTimer {
199
- if let timeoutAction = self . idleReadTimeoutStateMachine? . requestEndSent ( ) {
200
- self . runTimeoutAction ( timeoutAction, context: context)
220
+ if let readTimeoutAction = self . idleReadTimeoutStateMachine? . requestEndSent ( ) {
221
+ self . runTimeoutAction ( readTimeoutAction, context: context)
222
+ }
223
+
224
+ if let writeTimeoutAction = self . idleWriteTimeoutStateMachine? . requestEndSent ( ) {
225
+ self . runTimeoutAction ( writeTimeoutAction, context: context)
201
226
}
202
227
}
203
228
case . sendBodyPart( let part, let writePromise) :
@@ -206,8 +231,12 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
206
231
case . sendRequestEnd( let writePromise) :
207
232
context. writeAndFlush ( self . wrapOutboundOut ( . end( nil ) ) , promise: writePromise)
208
233
209
- if let timeoutAction = self . idleReadTimeoutStateMachine? . requestEndSent ( ) {
210
- self . runTimeoutAction ( timeoutAction, context: context)
234
+ if let readTimeoutAction = self . idleReadTimeoutStateMachine? . requestEndSent ( ) {
235
+ self . runTimeoutAction ( readTimeoutAction, context: context)
236
+ }
237
+
238
+ if let writeTimeoutAction = self . idleWriteTimeoutStateMachine? . requestEndSent ( ) {
239
+ self . runTimeoutAction ( writeTimeoutAction, context: context)
211
240
}
212
241
213
242
case . pauseRequestBodyStream:
@@ -380,6 +409,40 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
380
409
}
381
410
}
382
411
412
+ private func runTimeoutAction( _ action: IdleWriteStateMachine . Action , context: ChannelHandlerContext ) {
413
+ switch action {
414
+ case . startIdleWriteTimeoutTimer( let timeAmount) :
415
+ assert ( self . idleWriteTimeoutTimer == nil , " Expected there is no timeout timer so far. " )
416
+
417
+ let timerID = self . currentIdleWriteTimeoutTimerID
418
+ self . idleWriteTimeoutTimer = self . eventLoop. scheduleTask ( in: timeAmount) {
419
+ guard self . currentIdleWriteTimeoutTimerID == timerID else { return }
420
+ let action = self . state. idleWriteTimeoutTriggered ( )
421
+ self . run ( action, context: context)
422
+ }
423
+ case . resetIdleWriteTimeoutTimer( let timeAmount) :
424
+ if let oldTimer = self . idleWriteTimeoutTimer {
425
+ oldTimer. cancel ( )
426
+ }
427
+
428
+ self . currentIdleWriteTimeoutTimerID &+= 1
429
+ let timerID = self . currentIdleWriteTimeoutTimerID
430
+ self . idleWriteTimeoutTimer = self . eventLoop. scheduleTask ( in: timeAmount) {
431
+ guard self . currentIdleWriteTimeoutTimerID == timerID else { return }
432
+ let action = self . state. idleWriteTimeoutTriggered ( )
433
+ self . run ( action, context: context)
434
+ }
435
+ case . clearIdleWriteTimeoutTimer:
436
+ if let oldTimer = self . idleWriteTimeoutTimer {
437
+ self . idleWriteTimeoutTimer = nil
438
+ self . currentIdleWriteTimeoutTimerID &+= 1
439
+ oldTimer. cancel ( )
440
+ }
441
+ case . none:
442
+ break
443
+ }
444
+ }
445
+
383
446
// MARK: Private HTTPRequestExecutor
384
447
385
448
private func writeRequestBodyPart0( _ data: IOData , request: HTTPExecutableRequest , promise: EventLoopPromise < Void > ? ) {
@@ -393,6 +456,10 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
393
456
return
394
457
}
395
458
459
+ if let timeoutAction = self . idleWriteTimeoutStateMachine? . write ( ) {
460
+ self . runTimeoutAction ( timeoutAction, context: context)
461
+ }
462
+
396
463
let action = self . state. requestStreamPartReceived ( data, promise: promise)
397
464
self . run ( action, context: context)
398
465
}
@@ -428,6 +495,10 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
428
495
429
496
self . logger. trace ( " Request was cancelled " )
430
497
498
+ if let timeoutAction = self . idleWriteTimeoutStateMachine? . cancelRequest ( ) {
499
+ self . runTimeoutAction ( timeoutAction, context: context)
500
+ }
501
+
431
502
let action = self . state. requestCancelled ( closeConnection: true )
432
503
self . run ( action, context: context)
433
504
}
@@ -540,3 +611,87 @@ struct IdleReadStateMachine {
540
611
}
541
612
}
542
613
}
614
+
615
+ struct IdleWriteStateMachine {
616
+ enum Action {
617
+ case startIdleWriteTimeoutTimer( TimeAmount )
618
+ case resetIdleWriteTimeoutTimer( TimeAmount )
619
+ case clearIdleWriteTimeoutTimer
620
+ case none
621
+ }
622
+
623
+ enum State {
624
+ case waitingForRequestEnd
625
+ case waitingForWritabilityEnabled
626
+ case requestEndSent
627
+ }
628
+
629
+ private var state : State
630
+ private let timeAmount : TimeAmount
631
+
632
+ init ( timeAmount: TimeAmount , isWritabilityEnabled: Bool ) {
633
+ self . timeAmount = timeAmount
634
+ if isWritabilityEnabled {
635
+ self . state = . waitingForRequestEnd
636
+ } else {
637
+ self . state = . waitingForWritabilityEnabled
638
+ }
639
+ }
640
+
641
+ mutating func cancelRequest( ) -> Action {
642
+ switch self . state {
643
+ case . waitingForRequestEnd, . waitingForWritabilityEnabled:
644
+ self . state = . requestEndSent
645
+ return . clearIdleWriteTimeoutTimer
646
+ case . requestEndSent:
647
+ return . none
648
+ }
649
+ }
650
+
651
+ mutating func write( ) -> Action {
652
+ switch self . state {
653
+ case . waitingForRequestEnd:
654
+ return . resetIdleWriteTimeoutTimer( self . timeAmount)
655
+ case . waitingForWritabilityEnabled:
656
+ return . none
657
+ case . requestEndSent:
658
+ preconditionFailure ( " If the request end has been sent, we can't write more data. " )
659
+ }
660
+ }
661
+
662
+ mutating func requestEndSent( ) -> Action {
663
+ switch self . state {
664
+ case . waitingForRequestEnd:
665
+ self . state = . requestEndSent
666
+ return . clearIdleWriteTimeoutTimer
667
+ case . waitingForWritabilityEnabled:
668
+ preconditionFailure ( " If the channel is not writable, we can't have sent the request end. " )
669
+ case . requestEndSent:
670
+ return . none
671
+ }
672
+ }
673
+
674
+ mutating func channelWritabilityChanged( context: ChannelHandlerContext ) -> Action {
675
+ if context. channel. isWritable {
676
+ switch self . state {
677
+ case . waitingForRequestEnd:
678
+ preconditionFailure ( " If waiting for more data, the channel was already writable. " )
679
+ case . waitingForWritabilityEnabled:
680
+ self . state = . waitingForRequestEnd
681
+ return . startIdleWriteTimeoutTimer( self . timeAmount)
682
+ case . requestEndSent:
683
+ return . none
684
+ }
685
+ } else {
686
+ switch self . state {
687
+ case . waitingForRequestEnd:
688
+ self . state = . waitingForWritabilityEnabled
689
+ return . clearIdleWriteTimeoutTimer
690
+ case . waitingForWritabilityEnabled:
691
+ preconditionFailure ( " If the channel was writable before, then we should have been waiting for more data. " )
692
+ case . requestEndSent:
693
+ return . none
694
+ }
695
+ }
696
+ }
697
+ }
0 commit comments