12
12
//
13
13
//===----------------------------------------------------------------------===//
14
14
15
+ #if DEBUG
15
16
import Dispatch
16
17
import Logging
17
18
import NIO
@@ -26,8 +27,6 @@ import NIOHTTP1
26
27
// callback(.success("Hello, \(payload)!"))
27
28
// }
28
29
// }
29
-
30
- #if DEBUG
31
30
extension Lambda {
32
31
/// Execute code in the context of a mock Lambda server.
33
32
///
@@ -91,10 +90,10 @@ private enum LocalLambda {
91
90
public typealias InboundIn = HTTPServerRequestPart
92
91
public typealias OutboundOut = HTTPServerResponsePart
93
92
94
- private static let queueLock = Lock ( )
95
- private static var queue = [ String: Pending] ( )
93
+ private var pending = CircularBuffer < ( head: HTTPRequestHead , body: ByteBuffer ? ) > ( )
96
94
97
- private var processing = CircularBuffer < ( head: HTTPRequestHead , body: ByteBuffer ? ) > ( )
95
+ private static var invocations = CircularBuffer < Invocation > ( )
96
+ private static var invocationState = InvocationState . waitingForLambdaRequest
98
97
99
98
private let logger : Logger
100
99
private let invocationEndpoint : String
@@ -109,72 +108,101 @@ private enum LocalLambda {
109
108
110
109
switch requestPart {
111
110
case . head( let head) :
112
- self . processing . append ( ( head: head, body: nil ) )
111
+ self . pending . append ( ( head: head, body: nil ) )
113
112
case . body( var buffer) :
114
- var request = self . processing . removeFirst ( )
113
+ var request = self . pending . removeFirst ( )
115
114
if request. body == nil {
116
115
request. body = buffer
117
116
} else {
118
117
request. body!. writeBuffer ( & buffer)
119
118
}
120
- self . processing . prepend ( request)
119
+ self . pending . prepend ( request)
121
120
case . end:
122
- let request = self . processing . removeFirst ( )
121
+ let request = self . pending . removeFirst ( )
123
122
self . processRequest ( context: context, request: request)
124
123
}
125
124
}
126
125
127
126
func processRequest( context: ChannelHandlerContext , request: ( head: HTTPRequestHead , body: ByteBuffer ? ) ) {
128
- if request. head. uri. hasSuffix ( self . invocationEndpoint) {
129
- if let work = request. body {
130
- let requestId = " \( DispatchTime . now ( ) . uptimeNanoseconds) " // FIXME:
131
- let promise = context. eventLoop. makePromise ( of: Response . self)
127
+ switch ( request. head. method, request. head. uri) {
128
+ // this endpoint is called by the client invoking the lambda
129
+ case ( . POST, let url) where url. hasSuffix ( self . invocationEndpoint) :
130
+ guard let work = request. body else {
131
+ return self . writeResponse ( context: context, response: . init( status: . badRequest) )
132
+ }
133
+ let requestID = " \( DispatchTime . now ( ) . uptimeNanoseconds) " // FIXME:
134
+ let promise = context. eventLoop. makePromise ( of: Response . self)
135
+ promise. futureResult. whenComplete { result in
136
+ switch result {
137
+ case . failure( let error) :
138
+ self . logger. error ( " invocation error: \( error) " )
139
+ self . writeResponse ( context: context, response: . init( status: . internalServerError) )
140
+ case . success( let response) :
141
+ self . writeResponse ( context: context, response: response)
142
+ }
143
+ }
144
+ let invocation = Invocation ( requestID: requestID, request: work, responsePromise: promise)
145
+ switch Self . invocationState {
146
+ case . waitingForInvocation( let promise) :
147
+ promise. succeed ( invocation)
148
+ case . waitingForLambdaRequest, . waitingForLambdaResponse:
149
+ Self . invocations. append ( invocation)
150
+ }
151
+ // /next endpoint is called by the lambda polling for work
152
+ case ( . GET, let url) where url. hasSuffix ( Consts . requestWorkURLSuffix) :
153
+ // check if our server is in the correct state
154
+ guard case . waitingForLambdaRequest = Self . invocationState else {
155
+ self . logger. error ( " invalid invocation state \( Self . invocationState) " )
156
+ self . writeResponse ( context: context, response: . init( status: . unprocessableEntity) )
157
+ return
158
+ }
159
+
160
+ // pop the first task from the queue
161
+ switch Self . invocations. popFirst ( ) {
162
+ case . none:
163
+ // if there is nothing in the queue,
164
+ // create a promise that we can fullfill when we get a new task
165
+ let promise = context. eventLoop. makePromise ( of: Invocation . self)
132
166
promise. futureResult. whenComplete { result in
133
167
switch result {
134
- case . success( let response) :
135
- self . writeResponse ( context: context, response: response)
136
- case . failure:
168
+ case . failure( let error) :
169
+ self . logger. error ( " invocation error: \( error) " )
137
170
self . writeResponse ( context: context, response: . init( status: . internalServerError) )
171
+ case . success( let invocation) :
172
+ Self . invocationState = . waitingForLambdaResponse( invocation)
173
+ self . writeResponse ( context: context, response: invocation. makeResponse ( ) )
138
174
}
139
175
}
140
- Self . queueLock. withLock {
141
- Self . queue [ requestId] = Pending ( requestId: requestId, request: work, responsePromise: promise)
142
- }
176
+ Self . invocationState = . waitingForInvocation( promise)
177
+ case . some( let invocation) :
178
+ // if there is a task pending, we can immediatly respond with it.
179
+ Self . invocationState = . waitingForLambdaResponse( invocation)
180
+ self . writeResponse ( context: context, response: invocation. makeResponse ( ) )
143
181
}
144
- } else if request. head. uri. hasSuffix ( " /next " ) {
145
- switch ( Self . queueLock. withLock { Self . queue. popFirst ( ) } ) {
146
- case . none:
147
- self . writeResponse ( context: context, response: . init( status: . noContent) )
148
- case . some( let pending) :
149
- var response = Response ( )
150
- response. body = pending. value. request
151
- // required headers
152
- response. headers = [
153
- ( AmazonHeaders . requestID, pending. key) ,
154
- ( AmazonHeaders . invokedFunctionARN, " arn:aws:lambda:us-east-1: \( Int16 . random ( in: Int16 . min ... Int16 . max) ) :function:custom-runtime " ) ,
155
- ( AmazonHeaders . traceID, " Root= \( Int16 . random ( in: Int16 . min ... Int16 . max) ) ;Parent= \( Int16 . random ( in: Int16 . min ... Int16 . max) ) ;Sampled=1 " ) ,
156
- ( AmazonHeaders . deadline, " \( DispatchWallTime . distantFuture. millisSinceEpoch) " ) ,
157
- ]
158
- Self . queueLock. withLock {
159
- Self . queue [ pending. key] = pending. value
160
- }
161
- self . writeResponse ( context: context, response: response)
162
- }
163
-
164
- } else if request. head. uri. hasSuffix ( " /response " ) {
182
+ // :requestID/response endpoint is called by the lambda posting the response
183
+ case ( . POST, let url) where url. hasSuffix ( Consts . postResponseURLSuffix) :
165
184
let parts = request. head. uri. split ( separator: " / " )
166
- guard let requestId = parts. count > 2 ? String ( parts [ parts. count - 2 ] ) : nil else {
185
+
186
+ guard let requestID = parts. count > 2 ? String ( parts [ parts. count - 2 ] ) : nil else {
187
+ // the request is malformed, since we were expecting a requestId in the path
167
188
return self . writeResponse ( context: context, response: . init( status: . badRequest) )
168
189
}
169
- switch ( Self . queueLock. withLock { Self . queue [ requestId] } ) {
170
- case . none:
171
- self . writeResponse ( context: context, response: . init( status: . badRequest) )
172
- case . some( let pending) :
173
- pending. responsePromise. succeed ( . init( status: . ok, body: request. body) )
174
- self . writeResponse ( context: context, response: . init( status: . accepted) )
175
- Self . queueLock. withLock { Self . queue [ requestId] = nil }
190
+ guard case . waitingForLambdaResponse( let invocation) = Self . invocationState else {
191
+ // a response was send, but we did not expect to receive one
192
+ self . logger. error ( " invalid invocation state \( Self . invocationState) " )
193
+ return self . writeResponse ( context: context, response: . init( status: . unprocessableEntity) )
194
+ }
195
+ guard requestID == invocation. requestID else {
196
+ // the request's requestId is not matching the one we are expecting
197
+ self . logger. error ( " invalid invocation state request ID \( requestID) does not match expected \( invocation. requestID) " )
198
+ return self . writeResponse ( context: context, response: . init( status: . badRequest) )
176
199
}
177
- } else {
200
+
201
+ invocation. responsePromise. succeed ( . init( status: . ok, body: request. body) )
202
+ self . writeResponse ( context: context, response: . init( status: . accepted) )
203
+ Self . invocationState = . waitingForLambdaRequest
204
+ // unknown call
205
+ default :
178
206
self . writeResponse ( context: context, response: . init( status: . notFound) )
179
207
}
180
208
}
@@ -207,10 +235,29 @@ private enum LocalLambda {
207
235
var body : ByteBuffer ?
208
236
}
209
237
210
- struct Pending {
211
- let requestId : String
238
+ struct Invocation {
239
+ let requestID : String
212
240
let request : ByteBuffer
213
241
let responsePromise : EventLoopPromise < Response >
242
+
243
+ func makeResponse( ) -> Response {
244
+ var response = Response ( )
245
+ response. body = self . request
246
+ // required headers
247
+ response. headers = [
248
+ ( AmazonHeaders . requestID, self . requestID) ,
249
+ ( AmazonHeaders . invokedFunctionARN, " arn:aws:lambda:us-east-1: \( Int16 . random ( in: Int16 . min ... Int16 . max) ) :function:custom-runtime " ) ,
250
+ ( AmazonHeaders . traceID, " Root= \( Int16 . random ( in: Int16 . min ... Int16 . max) ) ;Parent= \( Int16 . random ( in: Int16 . min ... Int16 . max) ) ;Sampled=1 " ) ,
251
+ ( AmazonHeaders . deadline, " \( DispatchWallTime . distantFuture. millisSinceEpoch) " ) ,
252
+ ]
253
+ return response
254
+ }
255
+ }
256
+
257
+ enum InvocationState {
258
+ case waitingForInvocation( EventLoopPromise < Invocation > )
259
+ case waitingForLambdaRequest
260
+ case waitingForLambdaResponse( Invocation )
214
261
}
215
262
}
216
263
0 commit comments