@@ -59,7 +59,7 @@ private enum LocalLambda {
59
59
var logger = Logger ( label: " LocalLambdaServer " )
60
60
logger. logLevel = configuration. general. logLevel
61
61
self . logger = logger
62
- self . group = MultiThreadedEventLoopGroup ( numberOfThreads: System . coreCount )
62
+ self . group = MultiThreadedEventLoopGroup ( numberOfThreads: 1 )
63
63
self . host = configuration. runtimeEngine. ip
64
64
self . port = configuration. runtimeEngine. port
65
65
self . invocationEndpoint = invocationEndpoint ?? " /invoke "
@@ -88,13 +88,21 @@ private enum LocalLambda {
88
88
}
89
89
90
90
final class HTTPHandler : ChannelInboundHandler {
91
+
92
+ enum InvocationState {
93
+ case waitingForNextRequest
94
+ case idle( EventLoopPromise < Pending > )
95
+ case processing( Pending )
96
+ }
97
+
91
98
public typealias InboundIn = HTTPServerRequestPart
92
99
public typealias OutboundOut = HTTPServerResponsePart
93
100
94
- private static let queueLock = Lock ( )
95
- private static var queue = [ String: Pending] ( )
96
-
97
101
private var processing = CircularBuffer < ( head: HTTPRequestHead , body: ByteBuffer ? ) > ( )
102
+
103
+ private static let lock = Lock ( )
104
+ private static var queue = [ Pending] ( )
105
+ private static var invocationState : InvocationState = . waitingForNextRequest
98
106
99
107
private let logger : Logger
100
108
private let invocationEndpoint : String
@@ -137,42 +145,70 @@ private enum LocalLambda {
137
145
self . writeResponse ( context: context, response: . init( status: . internalServerError) )
138
146
}
139
147
}
140
- Self . queueLock. withLock {
141
- Self . queue [ requestId] = Pending ( requestId: requestId, request: work, responsePromise: promise)
148
+ let pending = Pending ( requestId: requestId, request: work, responsePromise: promise)
149
+ switch Self . lock. withLock ( { Self . invocationState } ) {
150
+ case . idle( let promise) :
151
+ promise. succeed ( pending)
152
+ case . processing( _) , . waitingForNextRequest:
153
+ Self . queue. append ( pending)
142
154
}
143
155
}
144
156
} else if request. head. uri. hasSuffix ( " /next " ) {
145
- switch ( Self . queueLock. withLock { Self . queue. popFirst ( ) } ) {
157
+ // check if our server is in the correct state
158
+ guard case . waitingForNextRequest = Self . lock. withLock ( { Self . invocationState } ) else {
159
+ #warning("better error code?!")
160
+ self . writeResponse ( context: context, response: . init( status: . conflict) )
161
+ return
162
+ }
163
+
164
+ // pop the first task from the queue
165
+ switch ( Self . lock. withLock { !Self. queue. isEmpty ? Self . queue. removeFirst ( ) : nil } ) {
146
166
case . none:
147
- self . writeResponse ( context: context, response: . init( status: . noContent) )
167
+ // if there is nothing in the queue, create a promise that we can succeed,
168
+ // when we get a new task
169
+ let promise = context. eventLoop. makePromise ( of: Pending . self)
170
+ promise. futureResult. whenComplete { ( result) in
171
+ switch result {
172
+ case . failure( let error) :
173
+ self . writeResponse ( context: context, response: . init( status: . internalServerError) )
174
+ case . success( let pending) :
175
+ Self . lock. withLock {
176
+ Self . invocationState = . processing( pending)
177
+ }
178
+ self . writeResponse ( context: context, response: pending. toResponse ( ) )
179
+ }
180
+ }
181
+ Self . lock. withLock {
182
+ Self . invocationState = . idle( promise)
183
+ }
148
184
case . some( let pending) :
149
- var response = Response ( )
150
- response. body = pending. value. request
151
- // required headers
152
- response. headers = [
153
- ( AmazonHeaders . requestID, pending. key) ,
154
- ( AmazonHeaders . invokedFunctionARN, " arn:aws:lambda:us-east-1: \( Int16 . random ( in: Int16 . min ... Int16 . max) ) :function:custom-runtime " ) ,
155
- ( AmazonHeaders . traceID, " Root= \( Int16 . random ( in: Int16 . min ... Int16 . max) ) ;Parent= \( Int16 . random ( in: Int16 . min ... Int16 . max) ) ;Sampled=1 " ) ,
156
- ( AmazonHeaders . deadline, " \( DispatchWallTime . distantFuture. millisSinceEpoch) " ) ,
157
- ]
158
- Self . queueLock. withLock {
159
- Self . queue [ pending. key] = pending. value
185
+ Self . lock. withLock {
186
+ Self . invocationState = . processing( pending)
160
187
}
161
- self . writeResponse ( context: context, response: response )
188
+ self . writeResponse ( context: context, response: pending . toResponse ( ) )
162
189
}
163
190
164
191
} else if request. head. uri. hasSuffix ( " /response " ) {
165
192
let parts = request. head. uri. split ( separator: " / " )
166
193
guard let requestId = parts. count > 2 ? String ( parts [ parts. count - 2 ] ) : nil else {
194
+ // the request is malformed, since we were expecting a requestId in the path
167
195
return self . writeResponse ( context: context, response: . init( status: . badRequest) )
168
196
}
169
- switch ( Self . queueLock. withLock { Self . queue [ requestId] } ) {
170
- case . none:
171
- self . writeResponse ( context: context, response: . init( status: . badRequest) )
172
- case . some( let pending) :
173
- pending. responsePromise. succeed ( . init( status: . ok, body: request. body) )
174
- self . writeResponse ( context: context, response: . init( status: . accepted) )
175
- Self . queueLock. withLock { Self . queue [ requestId] = nil }
197
+ guard case . processing( let pending) = Self . lock. withLock ( { Self . invocationState } ) else {
198
+ // a response was send, but we did not expect to receive one
199
+ #warning("better error code?!")
200
+ return self . writeResponse ( context: context, response: . init( status: . conflict) )
201
+ }
202
+ guard requestId == pending. requestId else {
203
+ // the request's requestId is not matching the one we are expecting
204
+ return self . writeResponse ( context: context, response: . init( status: . badRequest) )
205
+ }
206
+
207
+ pending. responsePromise. succeed ( . init( status: . ok, body: request. body) )
208
+ self . writeResponse ( context: context, response: . init( status: . accepted) )
209
+
210
+ Self . lock. withLock {
211
+ Self . invocationState = . waitingForNextRequest
176
212
}
177
213
} else {
178
214
self . writeResponse ( context: context, response: . init( status: . notFound) )
@@ -211,6 +247,19 @@ private enum LocalLambda {
211
247
let requestId : String
212
248
let request : ByteBuffer
213
249
let responsePromise : EventLoopPromise < Response >
250
+
251
+ func toResponse( ) -> Response {
252
+ var response = Response ( )
253
+ response. body = self . request
254
+ // required headers
255
+ response. headers = [
256
+ ( AmazonHeaders . requestID, self . requestId) ,
257
+ ( AmazonHeaders . invokedFunctionARN, " arn:aws:lambda:us-east-1: \( Int16 . random ( in: Int16 . min ... Int16 . max) ) :function:custom-runtime " ) ,
258
+ ( AmazonHeaders . traceID, " Root= \( Int16 . random ( in: Int16 . min ... Int16 . max) ) ;Parent= \( Int16 . random ( in: Int16 . min ... Int16 . max) ) ;Sampled=1 " ) ,
259
+ ( AmazonHeaders . deadline, " \( DispatchWallTime . distantFuture. millisSinceEpoch) " ) ,
260
+ ]
261
+ return response
262
+ }
214
263
}
215
264
}
216
265
0 commit comments