From f5bbfe6668be2fb73f65774385e918a6bea6a071 Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Tue, 5 May 2020 20:58:44 +0200 Subject: [PATCH 1/2] =?UTF-8?q?Don=E2=80=99t=20exit=20immediately?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../AWSLambdaRuntime/Lambda+LocalServer.swift | 103 +++++++++++++----- 1 file changed, 76 insertions(+), 27 deletions(-) diff --git a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift index 731ca597..89f810f9 100644 --- a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift +++ b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift @@ -59,7 +59,7 @@ private enum LocalLambda { var logger = Logger(label: "LocalLambdaServer") logger.logLevel = configuration.general.logLevel self.logger = logger - self.group = MultiThreadedEventLoopGroup(numberOfThreads: System.coreCount) + self.group = MultiThreadedEventLoopGroup(numberOfThreads: 1) self.host = configuration.runtimeEngine.ip self.port = configuration.runtimeEngine.port self.invocationEndpoint = invocationEndpoint ?? "/invoke" @@ -88,13 +88,21 @@ private enum LocalLambda { } final class HTTPHandler: ChannelInboundHandler { + + enum InvocationState { + case waitingForNextRequest + case idle(EventLoopPromise) + case processing(Pending) + } + public typealias InboundIn = HTTPServerRequestPart public typealias OutboundOut = HTTPServerResponsePart - private static let queueLock = Lock() - private static var queue = [String: Pending]() - private var processing = CircularBuffer<(head: HTTPRequestHead, body: ByteBuffer?)>() + + private static let lock = Lock() + private static var queue = [Pending]() + private static var invocationState: InvocationState = .waitingForNextRequest private let logger: Logger private let invocationEndpoint: String @@ -137,42 +145,70 @@ private enum LocalLambda { self.writeResponse(context: context, response: .init(status: .internalServerError)) } } - Self.queueLock.withLock { - Self.queue[requestId] = Pending(requestId: requestId, request: work, responsePromise: promise) + let pending = Pending(requestId: requestId, request: work, responsePromise: promise) + switch Self.lock.withLock({ Self.invocationState }) { + case .idle(let promise): + promise.succeed(pending) + case .processing(_), .waitingForNextRequest: + Self.queue.append(pending) } } } else if request.head.uri.hasSuffix("/next") { - switch (Self.queueLock.withLock { Self.queue.popFirst() }) { + // check if our server is in the correct state + guard case .waitingForNextRequest = Self.lock.withLock({ Self.invocationState }) else { + #warning("better error code?!") + self.writeResponse(context: context, response: .init(status: .conflict)) + return + } + + // pop the first task from the queue + switch (Self.lock.withLock { !Self.queue.isEmpty ? Self.queue.removeFirst() : nil }) { case .none: - self.writeResponse(context: context, response: .init(status: .noContent)) + // if there is nothing in the queue, create a promise that we can succeed, + // when we get a new task + let promise = context.eventLoop.makePromise(of: Pending.self) + promise.futureResult.whenComplete { (result) in + switch result { + case .failure(let error): + self.writeResponse(context: context, response: .init(status: .internalServerError)) + case .success(let pending): + Self.lock.withLock { + Self.invocationState = .processing(pending) + } + self.writeResponse(context: context, response: pending.toResponse()) + } + } + Self.lock.withLock { + Self.invocationState = .idle(promise) + } case .some(let pending): - var response = Response() - response.body = pending.value.request - // required headers - response.headers = [ - (AmazonHeaders.requestID, pending.key), - (AmazonHeaders.invokedFunctionARN, "arn:aws:lambda:us-east-1:\(Int16.random(in: Int16.min ... Int16.max)):function:custom-runtime"), - (AmazonHeaders.traceID, "Root=\(Int16.random(in: Int16.min ... Int16.max));Parent=\(Int16.random(in: Int16.min ... Int16.max));Sampled=1"), - (AmazonHeaders.deadline, "\(DispatchWallTime.distantFuture.millisSinceEpoch)"), - ] - Self.queueLock.withLock { - Self.queue[pending.key] = pending.value + Self.lock.withLock { + Self.invocationState = .processing(pending) } - self.writeResponse(context: context, response: response) + self.writeResponse(context: context, response: pending.toResponse()) } } else if request.head.uri.hasSuffix("/response") { let parts = request.head.uri.split(separator: "/") guard let requestId = parts.count > 2 ? String(parts[parts.count - 2]) : nil else { + // the request is malformed, since we were expecting a requestId in the path return self.writeResponse(context: context, response: .init(status: .badRequest)) } - switch (Self.queueLock.withLock { Self.queue[requestId] }) { - case .none: - self.writeResponse(context: context, response: .init(status: .badRequest)) - case .some(let pending): - pending.responsePromise.succeed(.init(status: .ok, body: request.body)) - self.writeResponse(context: context, response: .init(status: .accepted)) - Self.queueLock.withLock { Self.queue[requestId] = nil } + guard case .processing(let pending) = Self.lock.withLock({ Self.invocationState }) else { + // a response was send, but we did not expect to receive one + #warning("better error code?!") + return self.writeResponse(context: context, response: .init(status: .conflict)) + } + guard requestId == pending.requestId else { + // the request's requestId is not matching the one we are expecting + return self.writeResponse(context: context, response: .init(status: .badRequest)) + } + + pending.responsePromise.succeed(.init(status: .ok, body: request.body)) + self.writeResponse(context: context, response: .init(status: .accepted)) + + Self.lock.withLock { + Self.invocationState = .waitingForNextRequest } } else { self.writeResponse(context: context, response: .init(status: .notFound)) @@ -211,6 +247,19 @@ private enum LocalLambda { let requestId: String let request: ByteBuffer let responsePromise: EventLoopPromise + + func toResponse() -> Response { + var response = Response() + response.body = self.request + // required headers + response.headers = [ + (AmazonHeaders.requestID, self.requestId), + (AmazonHeaders.invokedFunctionARN, "arn:aws:lambda:us-east-1:\(Int16.random(in: Int16.min ... Int16.max)):function:custom-runtime"), + (AmazonHeaders.traceID, "Root=\(Int16.random(in: Int16.min ... Int16.max));Parent=\(Int16.random(in: Int16.min ... Int16.max));Sampled=1"), + (AmazonHeaders.deadline, "\(DispatchWallTime.distantFuture.millisSinceEpoch)"), + ] + return response + } } } From f0f5e4bd247ea9873186bf5f657ecee373d203b4 Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Tue, 5 May 2020 21:06:02 +0200 Subject: [PATCH 2/2] Removed locks. Just running in one EL --- .../AWSLambdaRuntime/Lambda+LocalServer.swift | 27 +++++++------------ 1 file changed, 9 insertions(+), 18 deletions(-) diff --git a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift index 89f810f9..02607680 100644 --- a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift +++ b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift @@ -100,7 +100,6 @@ private enum LocalLambda { private var processing = CircularBuffer<(head: HTTPRequestHead, body: ByteBuffer?)>() - private static let lock = Lock() private static var queue = [Pending]() private static var invocationState: InvocationState = .waitingForNextRequest @@ -146,7 +145,7 @@ private enum LocalLambda { } } let pending = Pending(requestId: requestId, request: work, responsePromise: promise) - switch Self.lock.withLock({ Self.invocationState }) { + switch Self.invocationState { case .idle(let promise): promise.succeed(pending) case .processing(_), .waitingForNextRequest: @@ -155,14 +154,14 @@ private enum LocalLambda { } } else if request.head.uri.hasSuffix("/next") { // check if our server is in the correct state - guard case .waitingForNextRequest = Self.lock.withLock({ Self.invocationState }) else { + guard case .waitingForNextRequest = Self.invocationState else { #warning("better error code?!") self.writeResponse(context: context, response: .init(status: .conflict)) return } // pop the first task from the queue - switch (Self.lock.withLock { !Self.queue.isEmpty ? Self.queue.removeFirst() : nil }) { + switch !Self.queue.isEmpty ? Self.queue.removeFirst() : nil { case .none: // if there is nothing in the queue, create a promise that we can succeed, // when we get a new task @@ -172,19 +171,14 @@ private enum LocalLambda { case .failure(let error): self.writeResponse(context: context, response: .init(status: .internalServerError)) case .success(let pending): - Self.lock.withLock { - Self.invocationState = .processing(pending) - } + Self.invocationState = .processing(pending) self.writeResponse(context: context, response: pending.toResponse()) } } - Self.lock.withLock { - Self.invocationState = .idle(promise) - } + Self.invocationState = .idle(promise) case .some(let pending): - Self.lock.withLock { - Self.invocationState = .processing(pending) - } + // if there is a task pending, we can immediatly respond with it. + Self.invocationState = .processing(pending) self.writeResponse(context: context, response: pending.toResponse()) } @@ -194,7 +188,7 @@ private enum LocalLambda { // the request is malformed, since we were expecting a requestId in the path return self.writeResponse(context: context, response: .init(status: .badRequest)) } - guard case .processing(let pending) = Self.lock.withLock({ Self.invocationState }) else { + guard case .processing(let pending) = Self.invocationState else { // a response was send, but we did not expect to receive one #warning("better error code?!") return self.writeResponse(context: context, response: .init(status: .conflict)) @@ -206,10 +200,7 @@ private enum LocalLambda { pending.responsePromise.succeed(.init(status: .ok, body: request.body)) self.writeResponse(context: context, response: .init(status: .accepted)) - - Self.lock.withLock { - Self.invocationState = .waitingForNextRequest - } + Self.invocationState = .waitingForNextRequest } else { self.writeResponse(context: context, response: .init(status: .notFound)) }