-
Notifications
You must be signed in to change notification settings - Fork 113
Feature/ff local server #70
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -59,7 +59,7 @@ private enum LocalLambda { | |
var logger = Logger(label: "LocalLambdaServer") | ||
logger.logLevel = configuration.general.logLevel | ||
self.logger = logger | ||
self.group = MultiThreadedEventLoopGroup(numberOfThreads: System.coreCount) | ||
self.group = MultiThreadedEventLoopGroup(numberOfThreads: 1) | ||
self.host = configuration.runtimeEngine.ip | ||
self.port = configuration.runtimeEngine.port | ||
self.invocationEndpoint = invocationEndpoint ?? "/invoke" | ||
|
@@ -88,13 +88,20 @@ private enum LocalLambda { | |
} | ||
|
||
final class HTTPHandler: ChannelInboundHandler { | ||
|
||
enum InvocationState { | ||
case waitingForNextRequest | ||
case idle(EventLoopPromise<Pending>) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. waitingForInvocation |
||
case processing(Pending) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. waitingForLambdaResponse |
||
} | ||
|
||
public typealias InboundIn = HTTPServerRequestPart | ||
public typealias OutboundOut = HTTPServerResponsePart | ||
|
||
private static let queueLock = Lock() | ||
private static var queue = [String: Pending]() | ||
|
||
private var processing = CircularBuffer<(head: HTTPRequestHead, body: ByteBuffer?)>() | ||
|
||
private static var queue = [Pending]() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. use CircularBuffer, and then you can pop as optional |
||
private static var invocationState: InvocationState = .waitingForNextRequest | ||
|
||
private let logger: Logger | ||
private let invocationEndpoint: String | ||
|
@@ -137,43 +144,63 @@ private enum LocalLambda { | |
self.writeResponse(context: context, response: .init(status: .internalServerError)) | ||
} | ||
} | ||
Self.queueLock.withLock { | ||
Self.queue[requestId] = Pending(requestId: requestId, request: work, responsePromise: promise) | ||
let pending = Pending(requestId: requestId, request: work, responsePromise: promise) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Pending -> Invocation |
||
switch Self.invocationState { | ||
case .idle(let promise): | ||
promise.succeed(pending) | ||
case .processing(_), .waitingForNextRequest: | ||
Self.queue.append(pending) | ||
} | ||
} | ||
} else if request.head.uri.hasSuffix("/next") { | ||
switch (Self.queueLock.withLock { Self.queue.popFirst() }) { | ||
// check if our server is in the correct state | ||
guard case .waitingForNextRequest = Self.invocationState else { | ||
#warning("better error code?!") | ||
self.writeResponse(context: context, response: .init(status: .conflict)) | ||
return | ||
} | ||
|
||
// pop the first task from the queue | ||
switch !Self.queue.isEmpty ? Self.queue.removeFirst() : nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. circular buffer will make this easier |
||
case .none: | ||
self.writeResponse(context: context, response: .init(status: .noContent)) | ||
case .some(let pending): | ||
var response = Response() | ||
response.body = pending.value.request | ||
// required headers | ||
response.headers = [ | ||
(AmazonHeaders.requestID, pending.key), | ||
(AmazonHeaders.invokedFunctionARN, "arn:aws:lambda:us-east-1:\(Int16.random(in: Int16.min ... Int16.max)):function:custom-runtime"), | ||
(AmazonHeaders.traceID, "Root=\(Int16.random(in: Int16.min ... Int16.max));Parent=\(Int16.random(in: Int16.min ... Int16.max));Sampled=1"), | ||
(AmazonHeaders.deadline, "\(DispatchWallTime.distantFuture.millisSinceEpoch)"), | ||
] | ||
Self.queueLock.withLock { | ||
Self.queue[pending.key] = pending.value | ||
// if there is nothing in the queue, create a promise that we can succeed, | ||
// when we get a new task | ||
let promise = context.eventLoop.makePromise(of: Pending.self) | ||
promise.futureResult.whenComplete { (result) in | ||
switch result { | ||
case .failure(let error): | ||
self.writeResponse(context: context, response: .init(status: .internalServerError)) | ||
case .success(let pending): | ||
Self.invocationState = .processing(pending) | ||
self.writeResponse(context: context, response: pending.toResponse()) | ||
} | ||
} | ||
self.writeResponse(context: context, response: response) | ||
Self.invocationState = .idle(promise) | ||
case .some(let pending): | ||
// if there is a task pending, we can immediatly respond with it. | ||
Self.invocationState = .processing(pending) | ||
self.writeResponse(context: context, response: pending.toResponse()) | ||
} | ||
|
||
} else if request.head.uri.hasSuffix("/response") { | ||
let parts = request.head.uri.split(separator: "/") | ||
guard let requestId = parts.count > 2 ? String(parts[parts.count - 2]) : nil else { | ||
// the request is malformed, since we were expecting a requestId in the path | ||
return self.writeResponse(context: context, response: .init(status: .badRequest)) | ||
} | ||
switch (Self.queueLock.withLock { Self.queue[requestId] }) { | ||
case .none: | ||
self.writeResponse(context: context, response: .init(status: .badRequest)) | ||
case .some(let pending): | ||
pending.responsePromise.succeed(.init(status: .ok, body: request.body)) | ||
self.writeResponse(context: context, response: .init(status: .accepted)) | ||
Self.queueLock.withLock { Self.queue[requestId] = nil } | ||
guard case .processing(let pending) = Self.invocationState else { | ||
// a response was send, but we did not expect to receive one | ||
#warning("better error code?!") | ||
return self.writeResponse(context: context, response: .init(status: .conflict)) | ||
} | ||
guard requestId == pending.requestId else { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I used a dictionary to make sure we bring the right pending request, this will make it order based in case you have some concurrency between the client submitting work (eg iOS app) and the lambda polling/processing work (lambda code) |
||
// the request's requestId is not matching the one we are expecting | ||
return self.writeResponse(context: context, response: .init(status: .badRequest)) | ||
} | ||
|
||
pending.responsePromise.succeed(.init(status: .ok, body: request.body)) | ||
self.writeResponse(context: context, response: .init(status: .accepted)) | ||
Self.invocationState = .waitingForNextRequest | ||
} else { | ||
self.writeResponse(context: context, response: .init(status: .notFound)) | ||
} | ||
|
@@ -211,6 +238,19 @@ private enum LocalLambda { | |
let requestId: String | ||
let request: ByteBuffer | ||
let responsePromise: EventLoopPromise<Response> | ||
|
||
func toResponse() -> Response { | ||
var response = Response() | ||
response.body = self.request | ||
// required headers | ||
response.headers = [ | ||
(AmazonHeaders.requestID, self.requestId), | ||
(AmazonHeaders.invokedFunctionARN, "arn:aws:lambda:us-east-1:\(Int16.random(in: Int16.min ... Int16.max)):function:custom-runtime"), | ||
(AmazonHeaders.traceID, "Root=\(Int16.random(in: Int16.min ... Int16.max));Parent=\(Int16.random(in: Int16.min ... Int16.max));Sampled=1"), | ||
(AmazonHeaders.deadline, "\(DispatchWallTime.distantFuture.millisSinceEpoch)"), | ||
] | ||
return response | ||
} | ||
} | ||
} | ||
|
||
|
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
waitingForLambdaRequest