Skip to content

Feature/ff local server #70

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 5, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 68 additions & 28 deletions Sources/AWSLambdaRuntime/Lambda+LocalServer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ private enum LocalLambda {
var logger = Logger(label: "LocalLambdaServer")
logger.logLevel = configuration.general.logLevel
self.logger = logger
self.group = MultiThreadedEventLoopGroup(numberOfThreads: System.coreCount)
self.group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
self.host = configuration.runtimeEngine.ip
self.port = configuration.runtimeEngine.port
self.invocationEndpoint = invocationEndpoint ?? "/invoke"
Expand Down Expand Up @@ -88,13 +88,20 @@ private enum LocalLambda {
}

final class HTTPHandler: ChannelInboundHandler {

enum InvocationState {
case waitingForNextRequest
Copy link
Contributor

@tomerd tomerd May 5, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

waitingForLambdaRequest

case idle(EventLoopPromise<Pending>)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

waitingForInvocation

case processing(Pending)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

waitingForLambdaResponse

}

public typealias InboundIn = HTTPServerRequestPart
public typealias OutboundOut = HTTPServerResponsePart

private static let queueLock = Lock()
private static var queue = [String: Pending]()

private var processing = CircularBuffer<(head: HTTPRequestHead, body: ByteBuffer?)>()

private static var queue = [Pending]()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use CircularBuffer, and then you can pop as optional

private static var invocationState: InvocationState = .waitingForNextRequest

private let logger: Logger
private let invocationEndpoint: String
Expand Down Expand Up @@ -137,43 +144,63 @@ private enum LocalLambda {
self.writeResponse(context: context, response: .init(status: .internalServerError))
}
}
Self.queueLock.withLock {
Self.queue[requestId] = Pending(requestId: requestId, request: work, responsePromise: promise)
let pending = Pending(requestId: requestId, request: work, responsePromise: promise)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pending -> Invocation

switch Self.invocationState {
case .idle(let promise):
promise.succeed(pending)
case .processing(_), .waitingForNextRequest:
Self.queue.append(pending)
}
}
} else if request.head.uri.hasSuffix("/next") {
switch (Self.queueLock.withLock { Self.queue.popFirst() }) {
// check if our server is in the correct state
guard case .waitingForNextRequest = Self.invocationState else {
#warning("better error code?!")
self.writeResponse(context: context, response: .init(status: .conflict))
return
}

// pop the first task from the queue
switch !Self.queue.isEmpty ? Self.queue.removeFirst() : nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

circular buffer will make this easier

case .none:
self.writeResponse(context: context, response: .init(status: .noContent))
case .some(let pending):
var response = Response()
response.body = pending.value.request
// required headers
response.headers = [
(AmazonHeaders.requestID, pending.key),
(AmazonHeaders.invokedFunctionARN, "arn:aws:lambda:us-east-1:\(Int16.random(in: Int16.min ... Int16.max)):function:custom-runtime"),
(AmazonHeaders.traceID, "Root=\(Int16.random(in: Int16.min ... Int16.max));Parent=\(Int16.random(in: Int16.min ... Int16.max));Sampled=1"),
(AmazonHeaders.deadline, "\(DispatchWallTime.distantFuture.millisSinceEpoch)"),
]
Self.queueLock.withLock {
Self.queue[pending.key] = pending.value
// if there is nothing in the queue, create a promise that we can succeed,
// when we get a new task
let promise = context.eventLoop.makePromise(of: Pending.self)
promise.futureResult.whenComplete { (result) in
switch result {
case .failure(let error):
self.writeResponse(context: context, response: .init(status: .internalServerError))
case .success(let pending):
Self.invocationState = .processing(pending)
self.writeResponse(context: context, response: pending.toResponse())
}
}
self.writeResponse(context: context, response: response)
Self.invocationState = .idle(promise)
case .some(let pending):
// if there is a task pending, we can immediatly respond with it.
Self.invocationState = .processing(pending)
self.writeResponse(context: context, response: pending.toResponse())
}

} else if request.head.uri.hasSuffix("/response") {
let parts = request.head.uri.split(separator: "/")
guard let requestId = parts.count > 2 ? String(parts[parts.count - 2]) : nil else {
// the request is malformed, since we were expecting a requestId in the path
return self.writeResponse(context: context, response: .init(status: .badRequest))
}
switch (Self.queueLock.withLock { Self.queue[requestId] }) {
case .none:
self.writeResponse(context: context, response: .init(status: .badRequest))
case .some(let pending):
pending.responsePromise.succeed(.init(status: .ok, body: request.body))
self.writeResponse(context: context, response: .init(status: .accepted))
Self.queueLock.withLock { Self.queue[requestId] = nil }
guard case .processing(let pending) = Self.invocationState else {
// a response was send, but we did not expect to receive one
#warning("better error code?!")
return self.writeResponse(context: context, response: .init(status: .conflict))
}
guard requestId == pending.requestId else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used a dictionary to make sure we bring the right pending request, this will make it order based in case you have some concurrency between the client submitting work (eg iOS app) and the lambda polling/processing work (lambda code)

// the request's requestId is not matching the one we are expecting
return self.writeResponse(context: context, response: .init(status: .badRequest))
}

pending.responsePromise.succeed(.init(status: .ok, body: request.body))
self.writeResponse(context: context, response: .init(status: .accepted))
Self.invocationState = .waitingForNextRequest
} else {
self.writeResponse(context: context, response: .init(status: .notFound))
}
Expand Down Expand Up @@ -211,6 +238,19 @@ private enum LocalLambda {
let requestId: String
let request: ByteBuffer
let responsePromise: EventLoopPromise<Response>

func toResponse() -> Response {
var response = Response()
response.body = self.request
// required headers
response.headers = [
(AmazonHeaders.requestID, self.requestId),
(AmazonHeaders.invokedFunctionARN, "arn:aws:lambda:us-east-1:\(Int16.random(in: Int16.min ... Int16.max)):function:custom-runtime"),
(AmazonHeaders.traceID, "Root=\(Int16.random(in: Int16.min ... Int16.max));Parent=\(Int16.random(in: Int16.min ... Int16.max));Sampled=1"),
(AmazonHeaders.deadline, "\(DispatchWallTime.distantFuture.millisSinceEpoch)"),
]
return response
}
}
}

Expand Down