@@ -17,13 +17,15 @@ import NIOConcurrencyHelpers
17
17
import NIOHTTP1
18
18
19
19
/// A barebone HTTP client to interact with AWS Runtime Engine which is an HTTP server.
20
+ /// Note that Lambda Runtime API dictate that only one requests runs at a time.
21
+ /// This means we can avoid locks and other concurrency concern we would otherwise need to build into the client
20
22
internal final class HTTPClient {
21
23
private let eventLoop : EventLoop
22
24
private let configuration : Lambda . Configuration . RuntimeEngine
23
25
private let targetHost : String
24
26
25
27
private var state = State . disconnected
26
- private let stateLock = Lock ( )
28
+ private let executing = NIOAtomic . makeAtomic ( value : false )
27
29
28
30
init ( eventLoop: EventLoop , configuration: Lambda . Configuration . RuntimeEngine ) {
29
31
self . eventLoop = eventLoop
@@ -47,38 +49,25 @@ internal final class HTTPClient {
47
49
}
48
50
49
51
// TODO: cap reconnect attempt
50
- private func execute( _ request: Request ) -> EventLoopFuture < Response > {
51
- self . stateLock. lock ( )
52
+ private func execute( _ request: Request , validate: Bool = true ) -> EventLoopFuture < Response > {
53
+ precondition ( !validate || self . executing. compareAndExchange ( expected: false , desired: true ) , " expecting single request at a time " )
54
+
52
55
switch self . state {
53
56
case . disconnected:
54
- let promise = self . eventLoop. makePromise ( of: Response . self)
55
- self . state = . connecting( promise. futureResult)
56
- self . stateLock. unlock ( )
57
- self . connect ( ) . flatMap { channel -> EventLoopFuture < Response > in
58
- self . stateLock. withLock {
59
- guard case . connecting = self . state else {
60
- preconditionFailure ( " invalid state \( self . state) " )
61
- }
62
- self . state = . connected( channel)
63
- }
64
- return self . execute ( request)
65
- } . cascade ( to: promise)
66
- return promise. futureResult
67
- case . connecting( let future) :
68
- let future = future. flatMap { _ in
69
- self . execute ( request)
57
+ return self . connect ( ) . flatMap { channel -> EventLoopFuture < Response > in
58
+ self . state = . connected( channel)
59
+ return self . execute ( request, validate: false )
70
60
}
71
- self . state = . connecting( future)
72
- self . stateLock. unlock ( )
73
- return future
74
61
case . connected( let channel) :
75
62
guard channel. isActive else {
76
63
self . state = . disconnected
77
- self . stateLock. unlock ( )
78
- return self . execute ( request)
64
+ return self . execute ( request, validate: false )
79
65
}
80
- self . stateLock . unlock ( )
66
+
81
67
let promise = channel. eventLoop. makePromise ( of: Response . self)
68
+ promise. futureResult. whenComplete { _ in
69
+ precondition ( self . executing. compareAndExchange ( expected: true , desired: false ) , " invalid execution state " )
70
+ }
82
71
let wrapper = HTTPRequestWrapper ( request: request, promise: promise)
83
72
channel. writeAndFlush ( wrapper) . cascadeFailure ( to: promise)
84
73
return promise. futureResult
@@ -134,7 +123,6 @@ internal final class HTTPClient {
134
123
135
124
private enum State {
136
125
case disconnected
137
- case connecting( EventLoopFuture < Response > )
138
126
case connected( Channel )
139
127
}
140
128
}
@@ -214,15 +202,15 @@ private final class HTTPHandler: ChannelDuplexHandler {
214
202
}
215
203
}
216
204
217
- private final class UnaryHandler : ChannelInboundHandler , ChannelOutboundHandler {
205
+ // no need in locks since we validate only one request can run at a time
206
+ private final class UnaryHandler : ChannelDuplexHandler {
218
207
typealias OutboundIn = HTTPRequestWrapper
219
208
typealias InboundIn = HTTPClient . Response
220
209
typealias OutboundOut = HTTPClient . Request
221
210
222
211
private let keepAlive : Bool
223
212
224
- private let lock = Lock ( )
225
- private var pendingResponses = CircularBuffer < ( promise: EventLoopPromise < HTTPClient . Response > , timeout: Scheduled < Void > ? ) > ( )
213
+ private var pending : ( promise: EventLoopPromise < HTTPClient . Response > , timeout: Scheduled < Void > ? ) ?
226
214
private var lastError : Error ?
227
215
228
216
init ( keepAlive: Bool ) {
@@ -233,19 +221,20 @@ private final class UnaryHandler: ChannelInboundHandler, ChannelOutboundHandler
233
221
let wrapper = unwrapOutboundIn ( data)
234
222
let timeoutTask = wrapper. request. timeout. map {
235
223
context. eventLoop. scheduleTask ( in: $0) {
236
- if ( self . lock. withLock { !self . pendingResponses. isEmpty } ) {
237
- self . errorCaught ( context: context, error: HTTPClient . Errors. timeout)
224
+ if self . pending != nil {
225
+ // TODO: need to verify this is thread safe i.e tha the timeout event wont interleave with the normal hander events
226
+ context. pipeline. fireErrorCaught ( HTTPClient . Errors. timeout)
238
227
}
239
228
}
240
229
}
241
- self . lock . withLockVoid { pendingResponses . append ( ( promise: wrapper. promise, timeout: timeoutTask) ) }
230
+ self . pending = ( promise: wrapper. promise, timeout: timeoutTask)
242
231
context. writeAndFlush ( wrapOutboundOut ( wrapper. request) , promise: promise)
243
232
}
244
233
245
234
func channelRead( context: ChannelHandlerContext , data: NIOAny ) {
246
235
let response = unwrapInboundIn ( data)
247
- if let pending = ( self . lock . withLock { self . pendingResponses . popFirst ( ) } ) {
248
- let serverKeepAlive = response. headers [ " connection " ] . first ? . lowercased ( ) == " keep-alive "
236
+ if let pending = self . pending {
237
+ let serverKeepAlive = response. headers. first ( name : " connection " ) ? . lowercased ( ) == " keep-alive "
249
238
if !self . keepAlive || !serverKeepAlive {
250
239
pending. promise. futureResult. whenComplete { _ in
251
240
_ = context. channel. close ( )
@@ -258,20 +247,20 @@ private final class UnaryHandler: ChannelInboundHandler, ChannelOutboundHandler
258
247
259
248
func errorCaught( context: ChannelHandlerContext , error: Error ) {
260
249
// pending responses will fail with lastError in channelInactive since we are calling context.close
261
- self . lock . withLockVoid { self . lastError = error }
250
+ self . lastError = error
262
251
context. channel. close ( promise: nil )
263
252
}
264
253
265
254
func channelInactive( context: ChannelHandlerContext ) {
266
255
// fail any pending responses with last error or assume peer disconnected
267
- self . failPendingResponses ( self . lock . withLock { self . lastError } ?? HTTPClient . Errors. connectionResetByPeer)
256
+ self . failPendingResponses ( self . lastError ?? HTTPClient . Errors. connectionResetByPeer)
268
257
context. fireChannelInactive ( )
269
258
}
270
259
271
260
private func failPendingResponses( _ error: Error ) {
272
- while let pending = ( self . lock . withLock { pendingResponses . popFirst ( ) } ) {
273
- pending. 1 ? . cancel ( )
274
- pending. 0 . fail ( error)
261
+ if let pending = self . pending {
262
+ pending. timeout ? . cancel ( )
263
+ pending. promise . fail ( error)
275
264
}
276
265
}
277
266
}
0 commit comments