From 9c172fe55a1a862c5b8b9b62b1e4ab3b4578f962 Mon Sep 17 00:00:00 2001 From: tom doron Date: Fri, 1 May 2020 14:01:13 -0700 Subject: [PATCH 01/14] add debug functionality to test with mock server motivation: allow end to end testing locally changes: * add a Lambda+LocalServer which exposes Lambda.withLocalServer available only in DEBUG mode * local server can recieve POST requests with payloads on a configurable endpoint and and send them to the Lambda * add a "noContent" mode to Lambda runtime to allow polling --- Package.swift | 16 +- .../AWSLambdaRuntime/Lambda+LocalServer.swift | 222 ++++++++++++++++++ .../AWSLambdaRuntimeCore/LambdaContext.swift | 6 +- .../AWSLambdaRuntimeCore/LambdaRunner.swift | 17 +- Sources/AWSLambdaTesting/Lambda+Testing.swift | 20 +- Sources/StringSample/main.swift | 4 +- docker/docker-compose.1804.53.yaml | 2 +- 7 files changed, 267 insertions(+), 20 deletions(-) create mode 100644 Sources/AWSLambdaRuntime/Lambda+LocalServer.swift diff --git a/Package.swift b/Package.swift index 44e2434f..42e4c3ed 100644 --- a/Package.swift +++ b/Package.swift @@ -47,20 +47,12 @@ let package = Package( .byName(name: "AWSLambdaRuntime"), .product(name: "NIO", package: "swift-nio"), ]), - .testTarget(name: "AWSLambdaTestingTests", dependencies: [ - .byName(name: "AWSLambdaTesting"), - .byName(name: "AWSLambdaRuntime"), - ]), - // samples - .target(name: "StringSample", dependencies: [ - .byName(name: "AWSLambdaRuntime"), - ]), - .target(name: "CodableSample", dependencies: [ - .byName(name: "AWSLambdaRuntime"), - ]), - // perf tests + .testTarget(name: "AWSLambdaTestingTests", dependencies: ["AWSLambdaTesting"]), + // for perf testing .target(name: "MockServer", dependencies: [ .product(name: "NIOHTTP1", package: "swift-nio"), ]), + .target(name: "StringSample", dependencies: ["AWSLambdaRuntime"]), + .target(name: "CodableSample", dependencies: ["AWSLambdaRuntime"]), ] ) diff --git a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift new file mode 100644 index 00000000..731ca597 --- /dev/null +++ b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift @@ -0,0 +1,222 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftAWSLambdaRuntime open source project +// +// Copyright (c) 2020 Apple Inc. and the SwiftAWSLambdaRuntime project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Dispatch +import Logging +import NIO +import NIOConcurrencyHelpers +import NIOHTTP1 + +// This functionality is designed for local testing hence beind a #if DEBUG flag. +// For example: +// +// try Lambda.withLocalServer { +// Lambda.run { (context: Lambda.Context, payload: String, callback: @escaping (Result) -> Void) in +// callback(.success("Hello, \(payload)!")) +// } +// } + +#if DEBUG +extension Lambda { + /// Execute code in the context of a mock Lambda server. + /// + /// - parameters: + /// - invocationEndpoint: The endpoint to post payloads to. + /// - body: Code to run within the context of the mock server. Typically this would be a Lambda.run function call. + /// + /// - note: This API is designed stricly for local testing and is behind a DEBUG flag + public static func withLocalServer(invocationEndpoint: String? = nil, _ body: @escaping () -> Void) throws { + let server = LocalLambda.Server(invocationEndpoint: invocationEndpoint) + try server.start().wait() + defer { try! server.stop() } // FIXME: + body() + } +} + +// MARK: - Local Mock Server + +private enum LocalLambda { + struct Server { + private let logger: Logger + private let group: EventLoopGroup + private let host: String + private let port: Int + private let invocationEndpoint: String + + public init(invocationEndpoint: String?) { + let configuration = Lambda.Configuration() + var logger = Logger(label: "LocalLambdaServer") + logger.logLevel = configuration.general.logLevel + self.logger = logger + self.group = MultiThreadedEventLoopGroup(numberOfThreads: System.coreCount) + self.host = configuration.runtimeEngine.ip + self.port = configuration.runtimeEngine.port + self.invocationEndpoint = invocationEndpoint ?? "/invoke" + } + + func start() -> EventLoopFuture { + let bootstrap = ServerBootstrap(group: group) + .serverChannelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1) + .childChannelInitializer { channel in + channel.pipeline.configureHTTPServerPipeline(withErrorHandling: true).flatMap { _ in + channel.pipeline.addHandler(HTTPHandler(logger: self.logger, invocationEndpoint: self.invocationEndpoint)) + } + } + return bootstrap.bind(host: self.host, port: self.port).flatMap { channel -> EventLoopFuture in + guard channel.localAddress != nil else { + return channel.eventLoop.makeFailedFuture(ServerError.cantBind) + } + self.logger.info("LocalLambdaServer started and listening on \(self.host):\(self.port), receiving payloads on \(self.invocationEndpoint)") + return channel.eventLoop.makeSucceededFuture(()) + } + } + + func stop() throws { + try self.group.syncShutdownGracefully() + } + } + + final class HTTPHandler: ChannelInboundHandler { + public typealias InboundIn = HTTPServerRequestPart + public typealias OutboundOut = HTTPServerResponsePart + + private static let queueLock = Lock() + private static var queue = [String: Pending]() + + private var processing = CircularBuffer<(head: HTTPRequestHead, body: ByteBuffer?)>() + + private let logger: Logger + private let invocationEndpoint: String + + init(logger: Logger, invocationEndpoint: String) { + self.logger = logger + self.invocationEndpoint = invocationEndpoint + } + + func channelRead(context: ChannelHandlerContext, data: NIOAny) { + let requestPart = unwrapInboundIn(data) + + switch requestPart { + case .head(let head): + self.processing.append((head: head, body: nil)) + case .body(var buffer): + var request = self.processing.removeFirst() + if request.body == nil { + request.body = buffer + } else { + request.body!.writeBuffer(&buffer) + } + self.processing.prepend(request) + case .end: + let request = self.processing.removeFirst() + self.processRequest(context: context, request: request) + } + } + + func processRequest(context: ChannelHandlerContext, request: (head: HTTPRequestHead, body: ByteBuffer?)) { + if request.head.uri.hasSuffix(self.invocationEndpoint) { + if let work = request.body { + let requestId = "\(DispatchTime.now().uptimeNanoseconds)" // FIXME: + let promise = context.eventLoop.makePromise(of: Response.self) + promise.futureResult.whenComplete { result in + switch result { + case .success(let response): + self.writeResponse(context: context, response: response) + case .failure: + self.writeResponse(context: context, response: .init(status: .internalServerError)) + } + } + Self.queueLock.withLock { + Self.queue[requestId] = Pending(requestId: requestId, request: work, responsePromise: promise) + } + } + } else if request.head.uri.hasSuffix("/next") { + switch (Self.queueLock.withLock { Self.queue.popFirst() }) { + case .none: + self.writeResponse(context: context, response: .init(status: .noContent)) + case .some(let pending): + var response = Response() + response.body = pending.value.request + // required headers + response.headers = [ + (AmazonHeaders.requestID, pending.key), + (AmazonHeaders.invokedFunctionARN, "arn:aws:lambda:us-east-1:\(Int16.random(in: Int16.min ... Int16.max)):function:custom-runtime"), + (AmazonHeaders.traceID, "Root=\(Int16.random(in: Int16.min ... Int16.max));Parent=\(Int16.random(in: Int16.min ... Int16.max));Sampled=1"), + (AmazonHeaders.deadline, "\(DispatchWallTime.distantFuture.millisSinceEpoch)"), + ] + Self.queueLock.withLock { + Self.queue[pending.key] = pending.value + } + self.writeResponse(context: context, response: response) + } + + } else if request.head.uri.hasSuffix("/response") { + let parts = request.head.uri.split(separator: "/") + guard let requestId = parts.count > 2 ? String(parts[parts.count - 2]) : nil else { + return self.writeResponse(context: context, response: .init(status: .badRequest)) + } + switch (Self.queueLock.withLock { Self.queue[requestId] }) { + case .none: + self.writeResponse(context: context, response: .init(status: .badRequest)) + case .some(let pending): + pending.responsePromise.succeed(.init(status: .ok, body: request.body)) + self.writeResponse(context: context, response: .init(status: .accepted)) + Self.queueLock.withLock { Self.queue[requestId] = nil } + } + } else { + self.writeResponse(context: context, response: .init(status: .notFound)) + } + } + + func writeResponse(context: ChannelHandlerContext, response: Response) { + var headers = HTTPHeaders(response.headers ?? []) + headers.add(name: "content-length", value: "\(response.body?.readableBytes ?? 0)") + let head = HTTPResponseHead(version: HTTPVersion(major: 1, minor: 1), status: response.status, headers: headers) + + context.write(wrapOutboundOut(.head(head))).whenFailure { error in + self.logger.error("\(self) write error \(error)") + } + + if let buffer = response.body { + context.write(wrapOutboundOut(.body(.byteBuffer(buffer)))).whenFailure { error in + self.logger.error("\(self) write error \(error)") + } + } + + context.writeAndFlush(wrapOutboundOut(.end(nil))).whenComplete { result in + if case .failure(let error) = result { + self.logger.error("\(self) write error \(error)") + } + } + } + + struct Response { + var status: HTTPResponseStatus = .ok + var headers: [(String, String)]? + var body: ByteBuffer? + } + + struct Pending { + let requestId: String + let request: ByteBuffer + let responsePromise: EventLoopPromise + } + } + + enum ServerError: Error { + case notReady + case cantBind + } +} +#endif diff --git a/Sources/AWSLambdaRuntimeCore/LambdaContext.swift b/Sources/AWSLambdaRuntimeCore/LambdaContext.swift index ca69cbf7..fef386a4 100644 --- a/Sources/AWSLambdaRuntimeCore/LambdaContext.swift +++ b/Sources/AWSLambdaRuntimeCore/LambdaContext.swift @@ -19,7 +19,7 @@ import NIO extension Lambda { /// Lambda runtime context. /// The Lambda runtime generates and passes the `Context` to the Lambda handler as an argument. - public final class Context { + public final class Context: CustomDebugStringConvertible { /// The request ID, which identifies the request that triggered the function invocation. public let requestId: String @@ -85,5 +85,9 @@ extension Lambda { let remaining = deadline - now return .milliseconds(remaining) } + + public var debugDescription: String { + "\(Self.self)(requestId: \(self.requestId), traceId: \(self.traceId), invokedFunctionArn: \(self.invokedFunctionArn), cognitoIdentity: \(self.cognitoIdentity ?? "nil"), clientContext: \(self.clientContext ?? "nil"), deadline: \(self.deadline))" + } } } diff --git a/Sources/AWSLambdaRuntimeCore/LambdaRunner.swift b/Sources/AWSLambdaRuntimeCore/LambdaRunner.swift index 093f6f2e..965d6a50 100644 --- a/Sources/AWSLambdaRuntimeCore/LambdaRunner.swift +++ b/Sources/AWSLambdaRuntimeCore/LambdaRunner.swift @@ -45,9 +45,12 @@ extension Lambda { func run(logger: Logger, handler: Handler) -> EventLoopFuture { logger.debug("lambda invocation sequence starting") - // 1. request invocation from lambda runtime engine - return self.runtimeClient.getNextInvocation(logger: logger).peekError { error in - logger.error("could not fetch invocation from lambda runtime engine: \(error)") + // 1. request work from lambda runtime engine + return self.runtimeClient.requestWork(logger: logger).peekError { error -> Void in + if case RuntimeError.badStatusCode(.noContent) = error { + return + } + logger.error("could not fetch work from lambda runtime engine: \(error)") }.flatMap { invocation, payload in // 2. send invocation to handler let context = Context(logger: logger, eventLoop: self.eventLoop, invocation: invocation) @@ -64,7 +67,13 @@ extension Lambda { self.runtimeClient.reportResults(logger: logger, invocation: invocation, result: result).peekError { error in logger.error("could not report results to lambda runtime engine: \(error)") } - }.always { result in + }.flatMapErrorThrowing { error in + if case RuntimeError.badStatusCode(.noContent) = error { + return () + } + throw error + } + .always { result in // we are done! logger.log(level: result.successful ? .debug : .warning, "lambda invocation sequence completed \(result.successful ? "successfully" : "with failure")") } diff --git a/Sources/AWSLambdaTesting/Lambda+Testing.swift b/Sources/AWSLambdaTesting/Lambda+Testing.swift index da14faa1..eb698075 100644 --- a/Sources/AWSLambdaTesting/Lambda+Testing.swift +++ b/Sources/AWSLambdaTesting/Lambda+Testing.swift @@ -12,9 +12,27 @@ // //===----------------------------------------------------------------------===// -// this is designed to only work for testing +// This functionality is designed to help with Lambda unit testing with XCTest // #if filter required for release builds which do not support @testable import // @testable is used to access of internal functions +// For exmaple: +// +// func test() { +// struct MyLambda: EventLoopLambdaHandler { +// typealias In = String +// typealias Out = String +// +// func handle(context: Lambda.Context, payload: String) -> EventLoopFuture { +// return context.eventLoop.makeSucceededFuture("echo" + payload) +// } +// } +// +// let input = UUID().uuidString +// var result: String? +// XCTAssertNoThrow(result = try Lambda.test(MyLambda(), with: input)) +// XCTAssertEqual(result, "echo" + input) +// } + #if DEBUG @testable import AWSLambdaRuntime @testable import AWSLambdaRuntimeCore diff --git a/Sources/StringSample/main.swift b/Sources/StringSample/main.swift index c7d0434a..1ce34688 100644 --- a/Sources/StringSample/main.swift +++ b/Sources/StringSample/main.swift @@ -26,7 +26,9 @@ struct Handler: EventLoopLambdaHandler { } } -Lambda.run(Handler()) +try Lambda.withLocalServer { + Lambda.run(Handler()) +} // MARK: - this can also be expressed as a closure: diff --git a/docker/docker-compose.1804.53.yaml b/docker/docker-compose.1804.53.yaml index e200426c..1121f2d2 100644 --- a/docker/docker-compose.1804.53.yaml +++ b/docker/docker-compose.1804.53.yaml @@ -6,7 +6,7 @@ services: image: swift-aws-lambda:18.04-5.3 build: args: - base_image: "swiftlang/swift:nightly-bionic" + base_image: "swiftlang/swift:nightly-5.3-bionic" test: image: swift-aws-lambda:18.04-5.3 From 1bd1d22540c35afd225ac737a3800e339cd5bfc0 Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Wed, 6 May 2020 01:42:43 +0200 Subject: [PATCH 02/14] Feature/ff local server (#70) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Don’t exit immediately * Removed locks. Just running in one EL --- .../AWSLambdaRuntime/Lambda+LocalServer.swift | 96 +++++++++++++------ 1 file changed, 68 insertions(+), 28 deletions(-) diff --git a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift index 731ca597..02607680 100644 --- a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift +++ b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift @@ -59,7 +59,7 @@ private enum LocalLambda { var logger = Logger(label: "LocalLambdaServer") logger.logLevel = configuration.general.logLevel self.logger = logger - self.group = MultiThreadedEventLoopGroup(numberOfThreads: System.coreCount) + self.group = MultiThreadedEventLoopGroup(numberOfThreads: 1) self.host = configuration.runtimeEngine.ip self.port = configuration.runtimeEngine.port self.invocationEndpoint = invocationEndpoint ?? "/invoke" @@ -88,13 +88,20 @@ private enum LocalLambda { } final class HTTPHandler: ChannelInboundHandler { + + enum InvocationState { + case waitingForNextRequest + case idle(EventLoopPromise) + case processing(Pending) + } + public typealias InboundIn = HTTPServerRequestPart public typealias OutboundOut = HTTPServerResponsePart - private static let queueLock = Lock() - private static var queue = [String: Pending]() - private var processing = CircularBuffer<(head: HTTPRequestHead, body: ByteBuffer?)>() + + private static var queue = [Pending]() + private static var invocationState: InvocationState = .waitingForNextRequest private let logger: Logger private let invocationEndpoint: String @@ -137,43 +144,63 @@ private enum LocalLambda { self.writeResponse(context: context, response: .init(status: .internalServerError)) } } - Self.queueLock.withLock { - Self.queue[requestId] = Pending(requestId: requestId, request: work, responsePromise: promise) + let pending = Pending(requestId: requestId, request: work, responsePromise: promise) + switch Self.invocationState { + case .idle(let promise): + promise.succeed(pending) + case .processing(_), .waitingForNextRequest: + Self.queue.append(pending) } } } else if request.head.uri.hasSuffix("/next") { - switch (Self.queueLock.withLock { Self.queue.popFirst() }) { + // check if our server is in the correct state + guard case .waitingForNextRequest = Self.invocationState else { + #warning("better error code?!") + self.writeResponse(context: context, response: .init(status: .conflict)) + return + } + + // pop the first task from the queue + switch !Self.queue.isEmpty ? Self.queue.removeFirst() : nil { case .none: - self.writeResponse(context: context, response: .init(status: .noContent)) - case .some(let pending): - var response = Response() - response.body = pending.value.request - // required headers - response.headers = [ - (AmazonHeaders.requestID, pending.key), - (AmazonHeaders.invokedFunctionARN, "arn:aws:lambda:us-east-1:\(Int16.random(in: Int16.min ... Int16.max)):function:custom-runtime"), - (AmazonHeaders.traceID, "Root=\(Int16.random(in: Int16.min ... Int16.max));Parent=\(Int16.random(in: Int16.min ... Int16.max));Sampled=1"), - (AmazonHeaders.deadline, "\(DispatchWallTime.distantFuture.millisSinceEpoch)"), - ] - Self.queueLock.withLock { - Self.queue[pending.key] = pending.value + // if there is nothing in the queue, create a promise that we can succeed, + // when we get a new task + let promise = context.eventLoop.makePromise(of: Pending.self) + promise.futureResult.whenComplete { (result) in + switch result { + case .failure(let error): + self.writeResponse(context: context, response: .init(status: .internalServerError)) + case .success(let pending): + Self.invocationState = .processing(pending) + self.writeResponse(context: context, response: pending.toResponse()) + } } - self.writeResponse(context: context, response: response) + Self.invocationState = .idle(promise) + case .some(let pending): + // if there is a task pending, we can immediatly respond with it. + Self.invocationState = .processing(pending) + self.writeResponse(context: context, response: pending.toResponse()) } } else if request.head.uri.hasSuffix("/response") { let parts = request.head.uri.split(separator: "/") guard let requestId = parts.count > 2 ? String(parts[parts.count - 2]) : nil else { + // the request is malformed, since we were expecting a requestId in the path return self.writeResponse(context: context, response: .init(status: .badRequest)) } - switch (Self.queueLock.withLock { Self.queue[requestId] }) { - case .none: - self.writeResponse(context: context, response: .init(status: .badRequest)) - case .some(let pending): - pending.responsePromise.succeed(.init(status: .ok, body: request.body)) - self.writeResponse(context: context, response: .init(status: .accepted)) - Self.queueLock.withLock { Self.queue[requestId] = nil } + guard case .processing(let pending) = Self.invocationState else { + // a response was send, but we did not expect to receive one + #warning("better error code?!") + return self.writeResponse(context: context, response: .init(status: .conflict)) } + guard requestId == pending.requestId else { + // the request's requestId is not matching the one we are expecting + return self.writeResponse(context: context, response: .init(status: .badRequest)) + } + + pending.responsePromise.succeed(.init(status: .ok, body: request.body)) + self.writeResponse(context: context, response: .init(status: .accepted)) + Self.invocationState = .waitingForNextRequest } else { self.writeResponse(context: context, response: .init(status: .notFound)) } @@ -211,6 +238,19 @@ private enum LocalLambda { let requestId: String let request: ByteBuffer let responsePromise: EventLoopPromise + + func toResponse() -> Response { + var response = Response() + response.body = self.request + // required headers + response.headers = [ + (AmazonHeaders.requestID, self.requestId), + (AmazonHeaders.invokedFunctionARN, "arn:aws:lambda:us-east-1:\(Int16.random(in: Int16.min ... Int16.max)):function:custom-runtime"), + (AmazonHeaders.traceID, "Root=\(Int16.random(in: Int16.min ... Int16.max));Parent=\(Int16.random(in: Int16.min ... Int16.max));Sampled=1"), + (AmazonHeaders.deadline, "\(DispatchWallTime.distantFuture.millisSinceEpoch)"), + ] + return response + } } } From 769eea8708a4577e95fbc118841795d2876cab53 Mon Sep 17 00:00:00 2001 From: tom doron Date: Tue, 5 May 2020 17:05:41 -0700 Subject: [PATCH 03/14] fixup --- .../AWSLambdaRuntime/Lambda+LocalServer.swift | 142 +++++++++--------- .../AWSLambdaRuntimeCore/LambdaRunner.swift | 13 +- Sources/StringSample/main.swift | 4 +- 3 files changed, 77 insertions(+), 82 deletions(-) diff --git a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift index 02607680..b39165b1 100644 --- a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift +++ b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift @@ -12,6 +12,7 @@ // //===----------------------------------------------------------------------===// +#if DEBUG import Dispatch import Logging import NIO @@ -26,8 +27,6 @@ import NIOHTTP1 // callback(.success("Hello, \(payload)!")) // } // } - -#if DEBUG extension Lambda { /// Execute code in the context of a mock Lambda server. /// @@ -88,20 +87,13 @@ private enum LocalLambda { } final class HTTPHandler: ChannelInboundHandler { - - enum InvocationState { - case waitingForNextRequest - case idle(EventLoopPromise) - case processing(Pending) - } - public typealias InboundIn = HTTPServerRequestPart public typealias OutboundOut = HTTPServerResponsePart - private var processing = CircularBuffer<(head: HTTPRequestHead, body: ByteBuffer?)>() - - private static var queue = [Pending]() - private static var invocationState: InvocationState = .waitingForNextRequest + private var pending = CircularBuffer<(head: HTTPRequestHead, body: ByteBuffer?)>() + + private static var invocations = CircularBuffer() + private static var invocationState = InvocationState.waitingForLambdaRequest private let logger: Logger private let invocationEndpoint: String @@ -116,92 +108,100 @@ private enum LocalLambda { switch requestPart { case .head(let head): - self.processing.append((head: head, body: nil)) + self.pending.append((head: head, body: nil)) case .body(var buffer): - var request = self.processing.removeFirst() + var request = self.pending.removeFirst() if request.body == nil { request.body = buffer } else { request.body!.writeBuffer(&buffer) } - self.processing.prepend(request) + self.pending.prepend(request) case .end: - let request = self.processing.removeFirst() + let request = self.pending.removeFirst() self.processRequest(context: context, request: request) } } func processRequest(context: ChannelHandlerContext, request: (head: HTTPRequestHead, body: ByteBuffer?)) { - if request.head.uri.hasSuffix(self.invocationEndpoint) { - if let work = request.body { - let requestId = "\(DispatchTime.now().uptimeNanoseconds)" // FIXME: - let promise = context.eventLoop.makePromise(of: Response.self) - promise.futureResult.whenComplete { result in - switch result { - case .success(let response): - self.writeResponse(context: context, response: response) - case .failure: - self.writeResponse(context: context, response: .init(status: .internalServerError)) - } - } - let pending = Pending(requestId: requestId, request: work, responsePromise: promise) - switch Self.invocationState { - case .idle(let promise): - promise.succeed(pending) - case .processing(_), .waitingForNextRequest: - Self.queue.append(pending) + switch (request.head.method, request.head.uri) { + // this endpoint is called by the client invoking the lambda + case (.POST, let url) where url.hasSuffix(self.invocationEndpoint): + guard let work = request.body else { + return self.writeResponse(context: context, response: .init(status: .badRequest)) + } + let requestID = "\(DispatchTime.now().uptimeNanoseconds)" // FIXME: + let promise = context.eventLoop.makePromise(of: Response.self) + promise.futureResult.whenComplete { result in + switch result { + case .failure(let error): + self.logger.error("invocation error: \(error)") + self.writeResponse(context: context, response: .init(status: .internalServerError)) + case .success(let response): + self.writeResponse(context: context, response: response) } } - } else if request.head.uri.hasSuffix("/next") { + let invocation = Invocation(requestID: requestID, request: work, responsePromise: promise) + switch Self.invocationState { + case .waitingForInvocation(let promise): + promise.succeed(invocation) + case .waitingForLambdaRequest, .waitingForLambdaResponse: + Self.invocations.append(invocation) + } + // /next endpoint is called by the lambda polling for work + case (.GET, let url) where url.hasSuffix(Consts.requestWorkURLSuffix): // check if our server is in the correct state - guard case .waitingForNextRequest = Self.invocationState else { - #warning("better error code?!") - self.writeResponse(context: context, response: .init(status: .conflict)) + guard case .waitingForLambdaRequest = Self.invocationState else { + self.logger.error("invalid invocation state \(Self.invocationState)") + self.writeResponse(context: context, response: .init(status: .unprocessableEntity)) return } - + // pop the first task from the queue - switch !Self.queue.isEmpty ? Self.queue.removeFirst() : nil { + switch Self.invocations.popFirst() { case .none: - // if there is nothing in the queue, create a promise that we can succeed, - // when we get a new task - let promise = context.eventLoop.makePromise(of: Pending.self) - promise.futureResult.whenComplete { (result) in + // if there is nothing in the queue, + // create a promise that we can fullfill when we get a new task + let promise = context.eventLoop.makePromise(of: Invocation.self) + promise.futureResult.whenComplete { result in switch result { case .failure(let error): + self.logger.error("invocation error: \(error)") self.writeResponse(context: context, response: .init(status: .internalServerError)) - case .success(let pending): - Self.invocationState = .processing(pending) - self.writeResponse(context: context, response: pending.toResponse()) + case .success(let invocation): + Self.invocationState = .waitingForLambdaResponse(invocation) + self.writeResponse(context: context, response: invocation.makeResponse()) } } - Self.invocationState = .idle(promise) - case .some(let pending): + Self.invocationState = .waitingForInvocation(promise) + case .some(let invocation): // if there is a task pending, we can immediatly respond with it. - Self.invocationState = .processing(pending) - self.writeResponse(context: context, response: pending.toResponse()) + Self.invocationState = .waitingForLambdaResponse(invocation) + self.writeResponse(context: context, response: invocation.makeResponse()) } - - } else if request.head.uri.hasSuffix("/response") { + // :requestID/response endpoint is called by the lambda posting the response + case (.POST, let url) where url.hasSuffix(Consts.postResponseURLSuffix): let parts = request.head.uri.split(separator: "/") - guard let requestId = parts.count > 2 ? String(parts[parts.count - 2]) : nil else { + guard let requestID = parts.count > 2 ? String(parts[parts.count - 2]) : nil else { // the request is malformed, since we were expecting a requestId in the path return self.writeResponse(context: context, response: .init(status: .badRequest)) } - guard case .processing(let pending) = Self.invocationState else { + guard case .waitingForLambdaResponse(let invocation) = Self.invocationState else { // a response was send, but we did not expect to receive one - #warning("better error code?!") - return self.writeResponse(context: context, response: .init(status: .conflict)) + self.logger.error("invalid invocation state \(Self.invocationState)") + return self.writeResponse(context: context, response: .init(status: .unprocessableEntity)) } - guard requestId == pending.requestId else { + guard requestID == invocation.requestID else { // the request's requestId is not matching the one we are expecting + self.logger.error("invalid invocation state request ID \(requestID) does not match expected \(invocation.requestID)") return self.writeResponse(context: context, response: .init(status: .badRequest)) } - - pending.responsePromise.succeed(.init(status: .ok, body: request.body)) + + invocation.responsePromise.succeed(.init(status: .ok, body: request.body)) self.writeResponse(context: context, response: .init(status: .accepted)) - Self.invocationState = .waitingForNextRequest - } else { + Self.invocationState = .waitingForLambdaRequest + // unknown call + default: self.writeResponse(context: context, response: .init(status: .notFound)) } } @@ -234,17 +234,17 @@ private enum LocalLambda { var body: ByteBuffer? } - struct Pending { - let requestId: String + struct Invocation { + let requestID: String let request: ByteBuffer let responsePromise: EventLoopPromise - - func toResponse() -> Response { + + func makeResponse() -> Response { var response = Response() response.body = self.request // required headers response.headers = [ - (AmazonHeaders.requestID, self.requestId), + (AmazonHeaders.requestID, self.requestID), (AmazonHeaders.invokedFunctionARN, "arn:aws:lambda:us-east-1:\(Int16.random(in: Int16.min ... Int16.max)):function:custom-runtime"), (AmazonHeaders.traceID, "Root=\(Int16.random(in: Int16.min ... Int16.max));Parent=\(Int16.random(in: Int16.min ... Int16.max));Sampled=1"), (AmazonHeaders.deadline, "\(DispatchWallTime.distantFuture.millisSinceEpoch)"), @@ -252,6 +252,12 @@ private enum LocalLambda { return response } } + + enum InvocationState { + case waitingForInvocation(EventLoopPromise) + case waitingForLambdaRequest + case waitingForLambdaResponse(Invocation) + } } enum ServerError: Error { diff --git a/Sources/AWSLambdaRuntimeCore/LambdaRunner.swift b/Sources/AWSLambdaRuntimeCore/LambdaRunner.swift index 965d6a50..74f38464 100644 --- a/Sources/AWSLambdaRuntimeCore/LambdaRunner.swift +++ b/Sources/AWSLambdaRuntimeCore/LambdaRunner.swift @@ -46,10 +46,7 @@ extension Lambda { func run(logger: Logger, handler: Handler) -> EventLoopFuture { logger.debug("lambda invocation sequence starting") // 1. request work from lambda runtime engine - return self.runtimeClient.requestWork(logger: logger).peekError { error -> Void in - if case RuntimeError.badStatusCode(.noContent) = error { - return - } + return self.runtimeClient.requestWork(logger: logger).peekError { error in logger.error("could not fetch work from lambda runtime engine: \(error)") }.flatMap { invocation, payload in // 2. send invocation to handler @@ -67,13 +64,7 @@ extension Lambda { self.runtimeClient.reportResults(logger: logger, invocation: invocation, result: result).peekError { error in logger.error("could not report results to lambda runtime engine: \(error)") } - }.flatMapErrorThrowing { error in - if case RuntimeError.badStatusCode(.noContent) = error { - return () - } - throw error - } - .always { result in + }.always { result in // we are done! logger.log(level: result.successful ? .debug : .warning, "lambda invocation sequence completed \(result.successful ? "successfully" : "with failure")") } diff --git a/Sources/StringSample/main.swift b/Sources/StringSample/main.swift index 1ce34688..c7d0434a 100644 --- a/Sources/StringSample/main.swift +++ b/Sources/StringSample/main.swift @@ -26,9 +26,7 @@ struct Handler: EventLoopLambdaHandler { } } -try Lambda.withLocalServer { - Lambda.run(Handler()) -} +Lambda.run(Handler()) // MARK: - this can also be expressed as a closure: From 407d7cc3be80766d6ede28360849281e403dbb2d Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Tue, 12 May 2020 14:22:29 +0200 Subject: [PATCH 04/14] Fix rebase --- .../Lambda+LocalServer.swift | 2 +- Sources/AWSLambdaRuntimeCore/LambdaRunner.swift | 2 +- Sources/AWSLambdaRuntimeCore/Utils.swift | 2 +- Sources/StringSample/main.swift | 7 +++++-- 4 files changed, 8 insertions(+), 5 deletions(-) rename Sources/{AWSLambdaRuntime => AWSLambdaRuntimeCore}/Lambda+LocalServer.swift (99%) diff --git a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift b/Sources/AWSLambdaRuntimeCore/Lambda+LocalServer.swift similarity index 99% rename from Sources/AWSLambdaRuntime/Lambda+LocalServer.swift rename to Sources/AWSLambdaRuntimeCore/Lambda+LocalServer.swift index b39165b1..e39c1179 100644 --- a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift +++ b/Sources/AWSLambdaRuntimeCore/Lambda+LocalServer.swift @@ -149,7 +149,7 @@ private enum LocalLambda { Self.invocations.append(invocation) } // /next endpoint is called by the lambda polling for work - case (.GET, let url) where url.hasSuffix(Consts.requestWorkURLSuffix): + case (.GET, let url) where url.hasSuffix(Consts.getNextInvocationURLSuffix): // check if our server is in the correct state guard case .waitingForLambdaRequest = Self.invocationState else { self.logger.error("invalid invocation state \(Self.invocationState)") diff --git a/Sources/AWSLambdaRuntimeCore/LambdaRunner.swift b/Sources/AWSLambdaRuntimeCore/LambdaRunner.swift index 74f38464..353f0e86 100644 --- a/Sources/AWSLambdaRuntimeCore/LambdaRunner.swift +++ b/Sources/AWSLambdaRuntimeCore/LambdaRunner.swift @@ -46,7 +46,7 @@ extension Lambda { func run(logger: Logger, handler: Handler) -> EventLoopFuture { logger.debug("lambda invocation sequence starting") // 1. request work from lambda runtime engine - return self.runtimeClient.requestWork(logger: logger).peekError { error in + return self.runtimeClient.getNextInvocation(logger: logger).peekError { error in logger.error("could not fetch work from lambda runtime engine: \(error)") }.flatMap { invocation, payload in // 2. send invocation to handler diff --git a/Sources/AWSLambdaRuntimeCore/Utils.swift b/Sources/AWSLambdaRuntimeCore/Utils.swift index 05b206b9..ae5db50f 100644 --- a/Sources/AWSLambdaRuntimeCore/Utils.swift +++ b/Sources/AWSLambdaRuntimeCore/Utils.swift @@ -16,7 +16,7 @@ import Dispatch import NIO internal enum Consts { - private static let apiPrefix = "/2018-06-01" + static let apiPrefix = "/2018-06-01" static let invocationURLPrefix = "\(apiPrefix)/runtime/invocation" static let getNextInvocationURLSuffix = "/next" static let postResponseURLSuffix = "/response" diff --git a/Sources/StringSample/main.swift b/Sources/StringSample/main.swift index c7d0434a..3b56b6b7 100644 --- a/Sources/StringSample/main.swift +++ b/Sources/StringSample/main.swift @@ -12,7 +12,7 @@ // //===----------------------------------------------------------------------===// -import AWSLambdaRuntimeCore +import AWSLambdaRuntime import NIO // in this example we are receiving and responding with strings @@ -26,7 +26,10 @@ struct Handler: EventLoopLambdaHandler { } } -Lambda.run(Handler()) + +try Lambda.withLocalServer { + Lambda.run(Handler()) +} // MARK: - this can also be expressed as a closure: From 1d2220fa3004a347673fa0b1209d64af331b0ab9 Mon Sep 17 00:00:00 2001 From: tom doron Date: Fri, 1 May 2020 14:01:13 -0700 Subject: [PATCH 05/14] add debug functionality to test with mock server motivation: allow end to end testing locally changes: * add a Lambda+LocalServer which exposes Lambda.withLocalServer available only in DEBUG mode * local server can recieve POST requests with payloads on a configurable endpoint and and send them to the Lambda * add a "noContent" mode to Lambda runtime to allow polling --- .../AWSLambdaRuntime/Lambda+LocalServer.swift | 222 ++++++++++++++++++ .../AWSLambdaRuntimeCore/LambdaRunner.swift | 11 +- Sources/StringSample/main.swift | 1 - 3 files changed, 232 insertions(+), 2 deletions(-) create mode 100644 Sources/AWSLambdaRuntime/Lambda+LocalServer.swift diff --git a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift new file mode 100644 index 00000000..731ca597 --- /dev/null +++ b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift @@ -0,0 +1,222 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftAWSLambdaRuntime open source project +// +// Copyright (c) 2020 Apple Inc. and the SwiftAWSLambdaRuntime project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Dispatch +import Logging +import NIO +import NIOConcurrencyHelpers +import NIOHTTP1 + +// This functionality is designed for local testing hence beind a #if DEBUG flag. +// For example: +// +// try Lambda.withLocalServer { +// Lambda.run { (context: Lambda.Context, payload: String, callback: @escaping (Result) -> Void) in +// callback(.success("Hello, \(payload)!")) +// } +// } + +#if DEBUG +extension Lambda { + /// Execute code in the context of a mock Lambda server. + /// + /// - parameters: + /// - invocationEndpoint: The endpoint to post payloads to. + /// - body: Code to run within the context of the mock server. Typically this would be a Lambda.run function call. + /// + /// - note: This API is designed stricly for local testing and is behind a DEBUG flag + public static func withLocalServer(invocationEndpoint: String? = nil, _ body: @escaping () -> Void) throws { + let server = LocalLambda.Server(invocationEndpoint: invocationEndpoint) + try server.start().wait() + defer { try! server.stop() } // FIXME: + body() + } +} + +// MARK: - Local Mock Server + +private enum LocalLambda { + struct Server { + private let logger: Logger + private let group: EventLoopGroup + private let host: String + private let port: Int + private let invocationEndpoint: String + + public init(invocationEndpoint: String?) { + let configuration = Lambda.Configuration() + var logger = Logger(label: "LocalLambdaServer") + logger.logLevel = configuration.general.logLevel + self.logger = logger + self.group = MultiThreadedEventLoopGroup(numberOfThreads: System.coreCount) + self.host = configuration.runtimeEngine.ip + self.port = configuration.runtimeEngine.port + self.invocationEndpoint = invocationEndpoint ?? "/invoke" + } + + func start() -> EventLoopFuture { + let bootstrap = ServerBootstrap(group: group) + .serverChannelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1) + .childChannelInitializer { channel in + channel.pipeline.configureHTTPServerPipeline(withErrorHandling: true).flatMap { _ in + channel.pipeline.addHandler(HTTPHandler(logger: self.logger, invocationEndpoint: self.invocationEndpoint)) + } + } + return bootstrap.bind(host: self.host, port: self.port).flatMap { channel -> EventLoopFuture in + guard channel.localAddress != nil else { + return channel.eventLoop.makeFailedFuture(ServerError.cantBind) + } + self.logger.info("LocalLambdaServer started and listening on \(self.host):\(self.port), receiving payloads on \(self.invocationEndpoint)") + return channel.eventLoop.makeSucceededFuture(()) + } + } + + func stop() throws { + try self.group.syncShutdownGracefully() + } + } + + final class HTTPHandler: ChannelInboundHandler { + public typealias InboundIn = HTTPServerRequestPart + public typealias OutboundOut = HTTPServerResponsePart + + private static let queueLock = Lock() + private static var queue = [String: Pending]() + + private var processing = CircularBuffer<(head: HTTPRequestHead, body: ByteBuffer?)>() + + private let logger: Logger + private let invocationEndpoint: String + + init(logger: Logger, invocationEndpoint: String) { + self.logger = logger + self.invocationEndpoint = invocationEndpoint + } + + func channelRead(context: ChannelHandlerContext, data: NIOAny) { + let requestPart = unwrapInboundIn(data) + + switch requestPart { + case .head(let head): + self.processing.append((head: head, body: nil)) + case .body(var buffer): + var request = self.processing.removeFirst() + if request.body == nil { + request.body = buffer + } else { + request.body!.writeBuffer(&buffer) + } + self.processing.prepend(request) + case .end: + let request = self.processing.removeFirst() + self.processRequest(context: context, request: request) + } + } + + func processRequest(context: ChannelHandlerContext, request: (head: HTTPRequestHead, body: ByteBuffer?)) { + if request.head.uri.hasSuffix(self.invocationEndpoint) { + if let work = request.body { + let requestId = "\(DispatchTime.now().uptimeNanoseconds)" // FIXME: + let promise = context.eventLoop.makePromise(of: Response.self) + promise.futureResult.whenComplete { result in + switch result { + case .success(let response): + self.writeResponse(context: context, response: response) + case .failure: + self.writeResponse(context: context, response: .init(status: .internalServerError)) + } + } + Self.queueLock.withLock { + Self.queue[requestId] = Pending(requestId: requestId, request: work, responsePromise: promise) + } + } + } else if request.head.uri.hasSuffix("/next") { + switch (Self.queueLock.withLock { Self.queue.popFirst() }) { + case .none: + self.writeResponse(context: context, response: .init(status: .noContent)) + case .some(let pending): + var response = Response() + response.body = pending.value.request + // required headers + response.headers = [ + (AmazonHeaders.requestID, pending.key), + (AmazonHeaders.invokedFunctionARN, "arn:aws:lambda:us-east-1:\(Int16.random(in: Int16.min ... Int16.max)):function:custom-runtime"), + (AmazonHeaders.traceID, "Root=\(Int16.random(in: Int16.min ... Int16.max));Parent=\(Int16.random(in: Int16.min ... Int16.max));Sampled=1"), + (AmazonHeaders.deadline, "\(DispatchWallTime.distantFuture.millisSinceEpoch)"), + ] + Self.queueLock.withLock { + Self.queue[pending.key] = pending.value + } + self.writeResponse(context: context, response: response) + } + + } else if request.head.uri.hasSuffix("/response") { + let parts = request.head.uri.split(separator: "/") + guard let requestId = parts.count > 2 ? String(parts[parts.count - 2]) : nil else { + return self.writeResponse(context: context, response: .init(status: .badRequest)) + } + switch (Self.queueLock.withLock { Self.queue[requestId] }) { + case .none: + self.writeResponse(context: context, response: .init(status: .badRequest)) + case .some(let pending): + pending.responsePromise.succeed(.init(status: .ok, body: request.body)) + self.writeResponse(context: context, response: .init(status: .accepted)) + Self.queueLock.withLock { Self.queue[requestId] = nil } + } + } else { + self.writeResponse(context: context, response: .init(status: .notFound)) + } + } + + func writeResponse(context: ChannelHandlerContext, response: Response) { + var headers = HTTPHeaders(response.headers ?? []) + headers.add(name: "content-length", value: "\(response.body?.readableBytes ?? 0)") + let head = HTTPResponseHead(version: HTTPVersion(major: 1, minor: 1), status: response.status, headers: headers) + + context.write(wrapOutboundOut(.head(head))).whenFailure { error in + self.logger.error("\(self) write error \(error)") + } + + if let buffer = response.body { + context.write(wrapOutboundOut(.body(.byteBuffer(buffer)))).whenFailure { error in + self.logger.error("\(self) write error \(error)") + } + } + + context.writeAndFlush(wrapOutboundOut(.end(nil))).whenComplete { result in + if case .failure(let error) = result { + self.logger.error("\(self) write error \(error)") + } + } + } + + struct Response { + var status: HTTPResponseStatus = .ok + var headers: [(String, String)]? + var body: ByteBuffer? + } + + struct Pending { + let requestId: String + let request: ByteBuffer + let responsePromise: EventLoopPromise + } + } + + enum ServerError: Error { + case notReady + case cantBind + } +} +#endif diff --git a/Sources/AWSLambdaRuntimeCore/LambdaRunner.swift b/Sources/AWSLambdaRuntimeCore/LambdaRunner.swift index 353f0e86..dc8a72b6 100644 --- a/Sources/AWSLambdaRuntimeCore/LambdaRunner.swift +++ b/Sources/AWSLambdaRuntimeCore/LambdaRunner.swift @@ -47,6 +47,9 @@ extension Lambda { logger.debug("lambda invocation sequence starting") // 1. request work from lambda runtime engine return self.runtimeClient.getNextInvocation(logger: logger).peekError { error in + if case RuntimeError.badStatusCode(.noContent) = error { + return + } logger.error("could not fetch work from lambda runtime engine: \(error)") }.flatMap { invocation, payload in // 2. send invocation to handler @@ -64,7 +67,13 @@ extension Lambda { self.runtimeClient.reportResults(logger: logger, invocation: invocation, result: result).peekError { error in logger.error("could not report results to lambda runtime engine: \(error)") } - }.always { result in + }.flatMapErrorThrowing { error in + if case RuntimeError.badStatusCode(.noContent) = error { + return () + } + throw error + } + .always { result in // we are done! logger.log(level: result.successful ? .debug : .warning, "lambda invocation sequence completed \(result.successful ? "successfully" : "with failure")") } diff --git a/Sources/StringSample/main.swift b/Sources/StringSample/main.swift index 3b56b6b7..17bc6ade 100644 --- a/Sources/StringSample/main.swift +++ b/Sources/StringSample/main.swift @@ -26,7 +26,6 @@ struct Handler: EventLoopLambdaHandler { } } - try Lambda.withLocalServer { Lambda.run(Handler()) } From 995fa4895c3916e86b27cccd5a74a53fc608c269 Mon Sep 17 00:00:00 2001 From: tom doron Date: Tue, 5 May 2020 17:05:41 -0700 Subject: [PATCH 06/14] fixup --- .../AWSLambdaRuntime/Lambda+LocalServer.swift | 149 ++++++++++++------ .../AWSLambdaRuntimeCore/LambdaRunner.swift | 10 +- Sources/StringSample/main.swift | 4 +- 3 files changed, 101 insertions(+), 62 deletions(-) diff --git a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift index 731ca597..286a7392 100644 --- a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift +++ b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift @@ -12,6 +12,7 @@ // //===----------------------------------------------------------------------===// +#if DEBUG import Dispatch import Logging import NIO @@ -26,8 +27,6 @@ import NIOHTTP1 // callback(.success("Hello, \(payload)!")) // } // } - -#if DEBUG extension Lambda { /// Execute code in the context of a mock Lambda server. /// @@ -91,10 +90,10 @@ private enum LocalLambda { public typealias InboundIn = HTTPServerRequestPart public typealias OutboundOut = HTTPServerResponsePart - private static let queueLock = Lock() - private static var queue = [String: Pending]() + private var pending = CircularBuffer<(head: HTTPRequestHead, body: ByteBuffer?)>() - private var processing = CircularBuffer<(head: HTTPRequestHead, body: ByteBuffer?)>() + private static var invocations = CircularBuffer() + private static var invocationState = InvocationState.waitingForLambdaRequest private let logger: Logger private let invocationEndpoint: String @@ -109,72 +108,101 @@ private enum LocalLambda { switch requestPart { case .head(let head): - self.processing.append((head: head, body: nil)) + self.pending.append((head: head, body: nil)) case .body(var buffer): - var request = self.processing.removeFirst() + var request = self.pending.removeFirst() if request.body == nil { request.body = buffer } else { request.body!.writeBuffer(&buffer) } - self.processing.prepend(request) + self.pending.prepend(request) case .end: - let request = self.processing.removeFirst() + let request = self.pending.removeFirst() self.processRequest(context: context, request: request) } } func processRequest(context: ChannelHandlerContext, request: (head: HTTPRequestHead, body: ByteBuffer?)) { - if request.head.uri.hasSuffix(self.invocationEndpoint) { - if let work = request.body { - let requestId = "\(DispatchTime.now().uptimeNanoseconds)" // FIXME: - let promise = context.eventLoop.makePromise(of: Response.self) + switch (request.head.method, request.head.uri) { + // this endpoint is called by the client invoking the lambda + case (.POST, let url) where url.hasSuffix(self.invocationEndpoint): + guard let work = request.body else { + return self.writeResponse(context: context, response: .init(status: .badRequest)) + } + let requestID = "\(DispatchTime.now().uptimeNanoseconds)" // FIXME: + let promise = context.eventLoop.makePromise(of: Response.self) + promise.futureResult.whenComplete { result in + switch result { + case .failure(let error): + self.logger.error("invocation error: \(error)") + self.writeResponse(context: context, response: .init(status: .internalServerError)) + case .success(let response): + self.writeResponse(context: context, response: response) + } + } + let invocation = Invocation(requestID: requestID, request: work, responsePromise: promise) + switch Self.invocationState { + case .waitingForInvocation(let promise): + promise.succeed(invocation) + case .waitingForLambdaRequest, .waitingForLambdaResponse: + Self.invocations.append(invocation) + } + // /next endpoint is called by the lambda polling for work + case (.GET, let url) where url.hasSuffix(Consts.requestWorkURLSuffix): + // check if our server is in the correct state + guard case .waitingForLambdaRequest = Self.invocationState else { + self.logger.error("invalid invocation state \(Self.invocationState)") + self.writeResponse(context: context, response: .init(status: .unprocessableEntity)) + return + } + + // pop the first task from the queue + switch Self.invocations.popFirst() { + case .none: + // if there is nothing in the queue, + // create a promise that we can fullfill when we get a new task + let promise = context.eventLoop.makePromise(of: Invocation.self) promise.futureResult.whenComplete { result in switch result { - case .success(let response): - self.writeResponse(context: context, response: response) - case .failure: + case .failure(let error): + self.logger.error("invocation error: \(error)") self.writeResponse(context: context, response: .init(status: .internalServerError)) + case .success(let invocation): + Self.invocationState = .waitingForLambdaResponse(invocation) + self.writeResponse(context: context, response: invocation.makeResponse()) } } - Self.queueLock.withLock { - Self.queue[requestId] = Pending(requestId: requestId, request: work, responsePromise: promise) - } + Self.invocationState = .waitingForInvocation(promise) + case .some(let invocation): + // if there is a task pending, we can immediatly respond with it. + Self.invocationState = .waitingForLambdaResponse(invocation) + self.writeResponse(context: context, response: invocation.makeResponse()) } - } else if request.head.uri.hasSuffix("/next") { - switch (Self.queueLock.withLock { Self.queue.popFirst() }) { - case .none: - self.writeResponse(context: context, response: .init(status: .noContent)) - case .some(let pending): - var response = Response() - response.body = pending.value.request - // required headers - response.headers = [ - (AmazonHeaders.requestID, pending.key), - (AmazonHeaders.invokedFunctionARN, "arn:aws:lambda:us-east-1:\(Int16.random(in: Int16.min ... Int16.max)):function:custom-runtime"), - (AmazonHeaders.traceID, "Root=\(Int16.random(in: Int16.min ... Int16.max));Parent=\(Int16.random(in: Int16.min ... Int16.max));Sampled=1"), - (AmazonHeaders.deadline, "\(DispatchWallTime.distantFuture.millisSinceEpoch)"), - ] - Self.queueLock.withLock { - Self.queue[pending.key] = pending.value - } - self.writeResponse(context: context, response: response) - } - - } else if request.head.uri.hasSuffix("/response") { + // :requestID/response endpoint is called by the lambda posting the response + case (.POST, let url) where url.hasSuffix(Consts.postResponseURLSuffix): let parts = request.head.uri.split(separator: "/") - guard let requestId = parts.count > 2 ? String(parts[parts.count - 2]) : nil else { + + guard let requestID = parts.count > 2 ? String(parts[parts.count - 2]) : nil else { + // the request is malformed, since we were expecting a requestId in the path return self.writeResponse(context: context, response: .init(status: .badRequest)) } - switch (Self.queueLock.withLock { Self.queue[requestId] }) { - case .none: - self.writeResponse(context: context, response: .init(status: .badRequest)) - case .some(let pending): - pending.responsePromise.succeed(.init(status: .ok, body: request.body)) - self.writeResponse(context: context, response: .init(status: .accepted)) - Self.queueLock.withLock { Self.queue[requestId] = nil } + guard case .waitingForLambdaResponse(let invocation) = Self.invocationState else { + // a response was send, but we did not expect to receive one + self.logger.error("invalid invocation state \(Self.invocationState)") + return self.writeResponse(context: context, response: .init(status: .unprocessableEntity)) + } + guard requestID == invocation.requestID else { + // the request's requestId is not matching the one we are expecting + self.logger.error("invalid invocation state request ID \(requestID) does not match expected \(invocation.requestID)") + return self.writeResponse(context: context, response: .init(status: .badRequest)) } - } else { + + invocation.responsePromise.succeed(.init(status: .ok, body: request.body)) + self.writeResponse(context: context, response: .init(status: .accepted)) + Self.invocationState = .waitingForLambdaRequest + // unknown call + default: self.writeResponse(context: context, response: .init(status: .notFound)) } } @@ -207,10 +235,29 @@ private enum LocalLambda { var body: ByteBuffer? } - struct Pending { - let requestId: String + struct Invocation { + let requestID: String let request: ByteBuffer let responsePromise: EventLoopPromise + + func makeResponse() -> Response { + var response = Response() + response.body = self.request + // required headers + response.headers = [ + (AmazonHeaders.requestID, self.requestID), + (AmazonHeaders.invokedFunctionARN, "arn:aws:lambda:us-east-1:\(Int16.random(in: Int16.min ... Int16.max)):function:custom-runtime"), + (AmazonHeaders.traceID, "Root=\(Int16.random(in: Int16.min ... Int16.max));Parent=\(Int16.random(in: Int16.min ... Int16.max));Sampled=1"), + (AmazonHeaders.deadline, "\(DispatchWallTime.distantFuture.millisSinceEpoch)"), + ] + return response + } + } + + enum InvocationState { + case waitingForInvocation(EventLoopPromise) + case waitingForLambdaRequest + case waitingForLambdaResponse(Invocation) } } diff --git a/Sources/AWSLambdaRuntimeCore/LambdaRunner.swift b/Sources/AWSLambdaRuntimeCore/LambdaRunner.swift index dc8a72b6..8aaa8aee 100644 --- a/Sources/AWSLambdaRuntimeCore/LambdaRunner.swift +++ b/Sources/AWSLambdaRuntimeCore/LambdaRunner.swift @@ -46,7 +46,7 @@ extension Lambda { func run(logger: Logger, handler: Handler) -> EventLoopFuture { logger.debug("lambda invocation sequence starting") // 1. request work from lambda runtime engine - return self.runtimeClient.getNextInvocation(logger: logger).peekError { error in + return self.runtimeClient.getNextInvocation(logger: logger).peekError { (error) in if case RuntimeError.badStatusCode(.noContent) = error { return } @@ -67,13 +67,7 @@ extension Lambda { self.runtimeClient.reportResults(logger: logger, invocation: invocation, result: result).peekError { error in logger.error("could not report results to lambda runtime engine: \(error)") } - }.flatMapErrorThrowing { error in - if case RuntimeError.badStatusCode(.noContent) = error { - return () - } - throw error - } - .always { result in + }.always { result in // we are done! logger.log(level: result.successful ? .debug : .warning, "lambda invocation sequence completed \(result.successful ? "successfully" : "with failure")") } diff --git a/Sources/StringSample/main.swift b/Sources/StringSample/main.swift index 17bc6ade..fddfa478 100644 --- a/Sources/StringSample/main.swift +++ b/Sources/StringSample/main.swift @@ -26,9 +26,7 @@ struct Handler: EventLoopLambdaHandler { } } -try Lambda.withLocalServer { - Lambda.run(Handler()) -} +Lambda.run(Handler()) // MARK: - this can also be expressed as a closure: From 2111ea50e866d14f965fc9c22e72a7084c88f110 Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Thu, 7 May 2020 13:51:09 +0200 Subject: [PATCH 07/14] Move Lifecycle completely into EventLoop --- Sources/AWSLambdaRuntimeCore/Lambda.swift | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/Sources/AWSLambdaRuntimeCore/Lambda.swift b/Sources/AWSLambdaRuntimeCore/Lambda.swift index bf552b55..1170ce65 100644 --- a/Sources/AWSLambdaRuntimeCore/Lambda.swift +++ b/Sources/AWSLambdaRuntimeCore/Lambda.swift @@ -104,15 +104,19 @@ public enum Lambda { var result: Result! MultiThreadedEventLoopGroup.withCurrentThreadAsEventLoop { eventLoop in let lifecycle = Lifecycle(eventLoop: eventLoop, logger: logger, configuration: configuration, factory: factory) + #if DEBUG let signalSource = trap(signal: configuration.lifecycle.stopSignal) { signal in logger.info("intercepted signal: \(signal)") lifecycle.shutdown() } + #endif lifecycle.start().flatMap { lifecycle.shutdownFuture }.whenComplete { lifecycleResult in + #if DEBUG signalSource.cancel() + #endif eventLoop.shutdownGracefully { error in if let error = error { preconditionFailure("Failed to shutdown eventloop: \(error)") From 3c91d3cd4f3813f48936191266a8ebc0dd22ca0b Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Thu, 7 May 2020 15:27:07 +0200 Subject: [PATCH 08/14] Cancelling works. --- Sources/AWSLambdaRuntimeCore/HTTPClient.swift | 29 +++++++++++++++++++ .../LambdaLifecycle.swift | 12 +++++++- .../AWSLambdaRuntimeCore/LambdaRunner.swift | 22 ++++++++++---- .../LambdaRuntimeClient.swift | 7 +++++ 4 files changed, 63 insertions(+), 7 deletions(-) diff --git a/Sources/AWSLambdaRuntimeCore/HTTPClient.swift b/Sources/AWSLambdaRuntimeCore/HTTPClient.swift index fd7df7e5..2eaabd4a 100644 --- a/Sources/AWSLambdaRuntimeCore/HTTPClient.swift +++ b/Sources/AWSLambdaRuntimeCore/HTTPClient.swift @@ -48,6 +48,20 @@ internal final class HTTPClient { timeout: timeout ?? self.configuration.requestTimeout)) } + /// cancels the current request if there is one + func cancel() { + guard self.executing.exchange(with: true) else { + // there is no request running. nothing to cancel + return + } + + guard case .connected(let channel) = self.state else { + preconditionFailure("if we are executing, we expect to have an open channel") + } + + channel.triggerUserOutboundEvent(RequestCancelEvent(), promise: nil) + } + // TODO: cap reconnect attempt private func execute(_ request: Request, validate: Bool = true) -> EventLoopFuture { precondition(!validate || self.executing.compareAndExchange(expected: false, desired: true), "expecting single request at a time") @@ -120,6 +134,7 @@ internal final class HTTPClient { internal enum Errors: Error { case connectionResetByPeer case timeout + case cancelled } private enum State { @@ -284,6 +299,18 @@ private final class UnaryHandler: ChannelDuplexHandler { context.fireChannelInactive() } + func triggerUserOutboundEvent(context: ChannelHandlerContext, event: Any, promise: EventLoopPromise?) { + switch event { + case is RequestCancelEvent: + guard self.pending != nil else { + return + } + self.completeWith(.failure(HTTPClient.Errors.cancelled)) + default: + context.triggerUserOutboundEvent(event, promise: promise) + } + } + private func completeWith(_ result: Result) { guard let pending = self.pending else { preconditionFailure("invalid state, no pending request") @@ -299,3 +326,5 @@ private struct HTTPRequestWrapper { let request: HTTPClient.Request let promise: EventLoopPromise } + +private struct RequestCancelEvent {} diff --git a/Sources/AWSLambdaRuntimeCore/LambdaLifecycle.swift b/Sources/AWSLambdaRuntimeCore/LambdaLifecycle.swift index 03e22cc4..8014c618 100644 --- a/Sources/AWSLambdaRuntimeCore/LambdaLifecycle.swift +++ b/Sources/AWSLambdaRuntimeCore/LambdaLifecycle.swift @@ -81,10 +81,20 @@ extension Lambda { // MARK: - Private + #if DEBUG /// Begin the `Lifecycle` shutdown. + /// Only needed for debugging purposes. public func shutdown() { - self.state = .shuttingdown + // make this method thread safe by dispatching onto the eventloop + self.eventLoop.execute { + guard case .active(let runner, _) = self.state else { + return + } + self.state = .shuttingdown + runner.cancelWaitingForNextInvocation() + } } + #endif private func markShutdown() { self.state = .shutdown diff --git a/Sources/AWSLambdaRuntimeCore/LambdaRunner.swift b/Sources/AWSLambdaRuntimeCore/LambdaRunner.swift index 8aaa8aee..a26290cf 100644 --- a/Sources/AWSLambdaRuntimeCore/LambdaRunner.swift +++ b/Sources/AWSLambdaRuntimeCore/LambdaRunner.swift @@ -18,10 +18,12 @@ import NIO extension Lambda { /// LambdaRunner manages the Lambda runtime workflow, or business logic. - internal struct Runner { + internal final class Runner { private let runtimeClient: RuntimeClient private let eventLoop: EventLoop + private var isGettingNextInvocation = false + init(eventLoop: EventLoop, configuration: Configuration) { self.eventLoop = eventLoop self.runtimeClient = RuntimeClient(eventLoop: self.eventLoop, configuration: configuration.runtimeEngine) @@ -46,13 +48,12 @@ extension Lambda { func run(logger: Logger, handler: Handler) -> EventLoopFuture { logger.debug("lambda invocation sequence starting") // 1. request work from lambda runtime engine - return self.runtimeClient.getNextInvocation(logger: logger).peekError { (error) in - if case RuntimeError.badStatusCode(.noContent) = error { - return - } + self.isGettingNextInvocation = true + return self.runtimeClient.getNextInvocation(logger: logger).peekError { error in logger.error("could not fetch work from lambda runtime engine: \(error)") }.flatMap { invocation, payload in - // 2. send invocation to handler + // 2. send work to handler + self.isGettingNextInvocation = false let context = Context(logger: logger, eventLoop: self.eventLoop, invocation: invocation) logger.debug("sending invocation to lambda handler \(handler)") return handler.handle(context: context, payload: payload) @@ -72,6 +73,15 @@ extension Lambda { logger.log(level: result.successful ? .debug : .warning, "lambda invocation sequence completed \(result.successful ? "successfully" : "with failure")") } } + + #if DEBUG + /// cancels the current run, if we are waiting for next invocation (long poll from Lambda control plane) + /// only needed for debugging purposes. + func cancelWaitingForNextInvocation() { + guard self.isGettingNextInvocation else { return } + self.runtimeClient.cancel() + } + #endif } } diff --git a/Sources/AWSLambdaRuntimeCore/LambdaRuntimeClient.swift b/Sources/AWSLambdaRuntimeCore/LambdaRuntimeClient.swift index af00e22a..65013ec1 100644 --- a/Sources/AWSLambdaRuntimeCore/LambdaRuntimeClient.swift +++ b/Sources/AWSLambdaRuntimeCore/LambdaRuntimeClient.swift @@ -114,6 +114,13 @@ extension Lambda { } } } + + #if DEBUG + /// Cancels the current request, if one is running. Only needed for debugging purposes + func cancel() { + self.httpClient.cancel() + } + #endif } } From 5115e4f02bacdd27f5cb51c9dce9df729d3a6fc7 Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Thu, 7 May 2020 16:38:17 +0200 Subject: [PATCH 09/14] Removed unnecessary locks. --- Sources/AWSLambdaRuntimeCore/HTTPClient.swift | 12 ++++++++---- Sources/AWSLambdaRuntimeCore/LambdaLifecycle.swift | 5 +++-- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/Sources/AWSLambdaRuntimeCore/HTTPClient.swift b/Sources/AWSLambdaRuntimeCore/HTTPClient.swift index 2eaabd4a..2feda919 100644 --- a/Sources/AWSLambdaRuntimeCore/HTTPClient.swift +++ b/Sources/AWSLambdaRuntimeCore/HTTPClient.swift @@ -25,7 +25,7 @@ internal final class HTTPClient { private let targetHost: String private var state = State.disconnected - private let executing = NIOAtomic.makeAtomic(value: false) + private var executing = false init(eventLoop: EventLoop, configuration: Lambda.Configuration.RuntimeEngine) { self.eventLoop = eventLoop @@ -50,7 +50,7 @@ internal final class HTTPClient { /// cancels the current request if there is one func cancel() { - guard self.executing.exchange(with: true) else { + guard self.executing else { // there is no request running. nothing to cancel return } @@ -64,7 +64,10 @@ internal final class HTTPClient { // TODO: cap reconnect attempt private func execute(_ request: Request, validate: Bool = true) -> EventLoopFuture { - precondition(!validate || self.executing.compareAndExchange(expected: false, desired: true), "expecting single request at a time") + if validate { + precondition(self.executing == false) + self.executing = true + } switch self.state { case .disconnected: @@ -80,7 +83,8 @@ internal final class HTTPClient { let promise = channel.eventLoop.makePromise(of: Response.self) promise.futureResult.whenComplete { _ in - precondition(self.executing.compareAndExchange(expected: true, desired: false), "invalid execution state") + precondition(self.executing == true) + self.executing = false } let wrapper = HTTPRequestWrapper(request: request, promise: promise) channel.writeAndFlush(wrapper).cascadeFailure(to: promise) diff --git a/Sources/AWSLambdaRuntimeCore/LambdaLifecycle.swift b/Sources/AWSLambdaRuntimeCore/LambdaLifecycle.swift index 8014c618..9af877ea 100644 --- a/Sources/AWSLambdaRuntimeCore/LambdaLifecycle.swift +++ b/Sources/AWSLambdaRuntimeCore/LambdaLifecycle.swift @@ -18,6 +18,8 @@ import NIOConcurrencyHelpers extension Lambda { /// `Lifecycle` manages the Lambda process lifecycle. + /// + /// - note: It is intended to be used within a single `EventLoop`. For this reason this class is not thread safe. public final class Lifecycle { private let eventLoop: EventLoop private let shutdownPromise: EventLoopPromise @@ -82,8 +84,7 @@ extension Lambda { // MARK: - Private #if DEBUG - /// Begin the `Lifecycle` shutdown. - /// Only needed for debugging purposes. + /// Begin the `Lifecycle` shutdown. Only needed for debugging purposes, hence behind a `DEBUG` flag. public func shutdown() { // make this method thread safe by dispatching onto the eventloop self.eventLoop.execute { From 48afb03fe10faba64e59a906f1d3a95134f84c08 Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Thu, 7 May 2020 16:54:25 +0200 Subject: [PATCH 10/14] Use UUID for requestId. --- .../AWSLambdaRuntime/Lambda+LocalServer.swift | 269 ------------------ 1 file changed, 269 deletions(-) delete mode 100644 Sources/AWSLambdaRuntime/Lambda+LocalServer.swift diff --git a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift deleted file mode 100644 index 286a7392..00000000 --- a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift +++ /dev/null @@ -1,269 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the SwiftAWSLambdaRuntime open source project -// -// Copyright (c) 2020 Apple Inc. and the SwiftAWSLambdaRuntime project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -#if DEBUG -import Dispatch -import Logging -import NIO -import NIOConcurrencyHelpers -import NIOHTTP1 - -// This functionality is designed for local testing hence beind a #if DEBUG flag. -// For example: -// -// try Lambda.withLocalServer { -// Lambda.run { (context: Lambda.Context, payload: String, callback: @escaping (Result) -> Void) in -// callback(.success("Hello, \(payload)!")) -// } -// } -extension Lambda { - /// Execute code in the context of a mock Lambda server. - /// - /// - parameters: - /// - invocationEndpoint: The endpoint to post payloads to. - /// - body: Code to run within the context of the mock server. Typically this would be a Lambda.run function call. - /// - /// - note: This API is designed stricly for local testing and is behind a DEBUG flag - public static func withLocalServer(invocationEndpoint: String? = nil, _ body: @escaping () -> Void) throws { - let server = LocalLambda.Server(invocationEndpoint: invocationEndpoint) - try server.start().wait() - defer { try! server.stop() } // FIXME: - body() - } -} - -// MARK: - Local Mock Server - -private enum LocalLambda { - struct Server { - private let logger: Logger - private let group: EventLoopGroup - private let host: String - private let port: Int - private let invocationEndpoint: String - - public init(invocationEndpoint: String?) { - let configuration = Lambda.Configuration() - var logger = Logger(label: "LocalLambdaServer") - logger.logLevel = configuration.general.logLevel - self.logger = logger - self.group = MultiThreadedEventLoopGroup(numberOfThreads: System.coreCount) - self.host = configuration.runtimeEngine.ip - self.port = configuration.runtimeEngine.port - self.invocationEndpoint = invocationEndpoint ?? "/invoke" - } - - func start() -> EventLoopFuture { - let bootstrap = ServerBootstrap(group: group) - .serverChannelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1) - .childChannelInitializer { channel in - channel.pipeline.configureHTTPServerPipeline(withErrorHandling: true).flatMap { _ in - channel.pipeline.addHandler(HTTPHandler(logger: self.logger, invocationEndpoint: self.invocationEndpoint)) - } - } - return bootstrap.bind(host: self.host, port: self.port).flatMap { channel -> EventLoopFuture in - guard channel.localAddress != nil else { - return channel.eventLoop.makeFailedFuture(ServerError.cantBind) - } - self.logger.info("LocalLambdaServer started and listening on \(self.host):\(self.port), receiving payloads on \(self.invocationEndpoint)") - return channel.eventLoop.makeSucceededFuture(()) - } - } - - func stop() throws { - try self.group.syncShutdownGracefully() - } - } - - final class HTTPHandler: ChannelInboundHandler { - public typealias InboundIn = HTTPServerRequestPart - public typealias OutboundOut = HTTPServerResponsePart - - private var pending = CircularBuffer<(head: HTTPRequestHead, body: ByteBuffer?)>() - - private static var invocations = CircularBuffer() - private static var invocationState = InvocationState.waitingForLambdaRequest - - private let logger: Logger - private let invocationEndpoint: String - - init(logger: Logger, invocationEndpoint: String) { - self.logger = logger - self.invocationEndpoint = invocationEndpoint - } - - func channelRead(context: ChannelHandlerContext, data: NIOAny) { - let requestPart = unwrapInboundIn(data) - - switch requestPart { - case .head(let head): - self.pending.append((head: head, body: nil)) - case .body(var buffer): - var request = self.pending.removeFirst() - if request.body == nil { - request.body = buffer - } else { - request.body!.writeBuffer(&buffer) - } - self.pending.prepend(request) - case .end: - let request = self.pending.removeFirst() - self.processRequest(context: context, request: request) - } - } - - func processRequest(context: ChannelHandlerContext, request: (head: HTTPRequestHead, body: ByteBuffer?)) { - switch (request.head.method, request.head.uri) { - // this endpoint is called by the client invoking the lambda - case (.POST, let url) where url.hasSuffix(self.invocationEndpoint): - guard let work = request.body else { - return self.writeResponse(context: context, response: .init(status: .badRequest)) - } - let requestID = "\(DispatchTime.now().uptimeNanoseconds)" // FIXME: - let promise = context.eventLoop.makePromise(of: Response.self) - promise.futureResult.whenComplete { result in - switch result { - case .failure(let error): - self.logger.error("invocation error: \(error)") - self.writeResponse(context: context, response: .init(status: .internalServerError)) - case .success(let response): - self.writeResponse(context: context, response: response) - } - } - let invocation = Invocation(requestID: requestID, request: work, responsePromise: promise) - switch Self.invocationState { - case .waitingForInvocation(let promise): - promise.succeed(invocation) - case .waitingForLambdaRequest, .waitingForLambdaResponse: - Self.invocations.append(invocation) - } - // /next endpoint is called by the lambda polling for work - case (.GET, let url) where url.hasSuffix(Consts.requestWorkURLSuffix): - // check if our server is in the correct state - guard case .waitingForLambdaRequest = Self.invocationState else { - self.logger.error("invalid invocation state \(Self.invocationState)") - self.writeResponse(context: context, response: .init(status: .unprocessableEntity)) - return - } - - // pop the first task from the queue - switch Self.invocations.popFirst() { - case .none: - // if there is nothing in the queue, - // create a promise that we can fullfill when we get a new task - let promise = context.eventLoop.makePromise(of: Invocation.self) - promise.futureResult.whenComplete { result in - switch result { - case .failure(let error): - self.logger.error("invocation error: \(error)") - self.writeResponse(context: context, response: .init(status: .internalServerError)) - case .success(let invocation): - Self.invocationState = .waitingForLambdaResponse(invocation) - self.writeResponse(context: context, response: invocation.makeResponse()) - } - } - Self.invocationState = .waitingForInvocation(promise) - case .some(let invocation): - // if there is a task pending, we can immediatly respond with it. - Self.invocationState = .waitingForLambdaResponse(invocation) - self.writeResponse(context: context, response: invocation.makeResponse()) - } - // :requestID/response endpoint is called by the lambda posting the response - case (.POST, let url) where url.hasSuffix(Consts.postResponseURLSuffix): - let parts = request.head.uri.split(separator: "/") - - guard let requestID = parts.count > 2 ? String(parts[parts.count - 2]) : nil else { - // the request is malformed, since we were expecting a requestId in the path - return self.writeResponse(context: context, response: .init(status: .badRequest)) - } - guard case .waitingForLambdaResponse(let invocation) = Self.invocationState else { - // a response was send, but we did not expect to receive one - self.logger.error("invalid invocation state \(Self.invocationState)") - return self.writeResponse(context: context, response: .init(status: .unprocessableEntity)) - } - guard requestID == invocation.requestID else { - // the request's requestId is not matching the one we are expecting - self.logger.error("invalid invocation state request ID \(requestID) does not match expected \(invocation.requestID)") - return self.writeResponse(context: context, response: .init(status: .badRequest)) - } - - invocation.responsePromise.succeed(.init(status: .ok, body: request.body)) - self.writeResponse(context: context, response: .init(status: .accepted)) - Self.invocationState = .waitingForLambdaRequest - // unknown call - default: - self.writeResponse(context: context, response: .init(status: .notFound)) - } - } - - func writeResponse(context: ChannelHandlerContext, response: Response) { - var headers = HTTPHeaders(response.headers ?? []) - headers.add(name: "content-length", value: "\(response.body?.readableBytes ?? 0)") - let head = HTTPResponseHead(version: HTTPVersion(major: 1, minor: 1), status: response.status, headers: headers) - - context.write(wrapOutboundOut(.head(head))).whenFailure { error in - self.logger.error("\(self) write error \(error)") - } - - if let buffer = response.body { - context.write(wrapOutboundOut(.body(.byteBuffer(buffer)))).whenFailure { error in - self.logger.error("\(self) write error \(error)") - } - } - - context.writeAndFlush(wrapOutboundOut(.end(nil))).whenComplete { result in - if case .failure(let error) = result { - self.logger.error("\(self) write error \(error)") - } - } - } - - struct Response { - var status: HTTPResponseStatus = .ok - var headers: [(String, String)]? - var body: ByteBuffer? - } - - struct Invocation { - let requestID: String - let request: ByteBuffer - let responsePromise: EventLoopPromise - - func makeResponse() -> Response { - var response = Response() - response.body = self.request - // required headers - response.headers = [ - (AmazonHeaders.requestID, self.requestID), - (AmazonHeaders.invokedFunctionARN, "arn:aws:lambda:us-east-1:\(Int16.random(in: Int16.min ... Int16.max)):function:custom-runtime"), - (AmazonHeaders.traceID, "Root=\(Int16.random(in: Int16.min ... Int16.max));Parent=\(Int16.random(in: Int16.min ... Int16.max));Sampled=1"), - (AmazonHeaders.deadline, "\(DispatchWallTime.distantFuture.millisSinceEpoch)"), - ] - return response - } - } - - enum InvocationState { - case waitingForInvocation(EventLoopPromise) - case waitingForLambdaRequest - case waitingForLambdaResponse(Invocation) - } - } - - enum ServerError: Error { - case notReady - case cantBind - } -} -#endif From 45d34c45d2be855950e9e91a6acb64f38f2c99f2 Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Tue, 12 May 2020 14:47:46 +0200 Subject: [PATCH 11/14] Review comments. --- Sources/AWSLambdaRuntimeCore/HTTPClient.swift | 9 +++---- .../LambdaLifecycle.swift | 26 ++++--------------- .../AWSLambdaRuntimeCore/LambdaRunner.swift | 2 -- .../LambdaRuntimeClient.swift | 2 -- 4 files changed, 9 insertions(+), 30 deletions(-) diff --git a/Sources/AWSLambdaRuntimeCore/HTTPClient.swift b/Sources/AWSLambdaRuntimeCore/HTTPClient.swift index 2feda919..17b77b13 100644 --- a/Sources/AWSLambdaRuntimeCore/HTTPClient.swift +++ b/Sources/AWSLambdaRuntimeCore/HTTPClient.swift @@ -65,7 +65,7 @@ internal final class HTTPClient { // TODO: cap reconnect attempt private func execute(_ request: Request, validate: Bool = true) -> EventLoopFuture { if validate { - precondition(self.executing == false) + precondition(self.executing == false, "expecting single request at a time") self.executing = true } @@ -83,7 +83,7 @@ internal final class HTTPClient { let promise = channel.eventLoop.makePromise(of: Response.self) promise.futureResult.whenComplete { _ in - precondition(self.executing == true) + precondition(self.executing == true, "invalid execution state") self.executing = false } let wrapper = HTTPRequestWrapper(request: request, promise: promise) @@ -306,10 +306,9 @@ private final class UnaryHandler: ChannelDuplexHandler { func triggerUserOutboundEvent(context: ChannelHandlerContext, event: Any, promise: EventLoopPromise?) { switch event { case is RequestCancelEvent: - guard self.pending != nil else { - return + if self.pending != nil { + self.completeWith(.failure(HTTPClient.Errors.cancelled)) } - self.completeWith(.failure(HTTPClient.Errors.cancelled)) default: context.triggerUserOutboundEvent(event, promise: promise) } diff --git a/Sources/AWSLambdaRuntimeCore/LambdaLifecycle.swift b/Sources/AWSLambdaRuntimeCore/LambdaLifecycle.swift index 9af877ea..7ebbf924 100644 --- a/Sources/AWSLambdaRuntimeCore/LambdaLifecycle.swift +++ b/Sources/AWSLambdaRuntimeCore/LambdaLifecycle.swift @@ -27,8 +27,7 @@ extension Lambda { private let configuration: Configuration private let factory: HandlerFactory - private var _state = State.idle - private let stateLock = Lock() + private var state = State.idle /// Create a new `Lifecycle`. /// @@ -88,11 +87,11 @@ extension Lambda { public func shutdown() { // make this method thread safe by dispatching onto the eventloop self.eventLoop.execute { - guard case .active(let runner, _) = self.state else { - return - } + let oldState = self.state self.state = .shuttingdown - runner.cancelWaitingForNextInvocation() + if case .active(let runner, _) = oldState { + runner.cancelWaitingForNextInvocation() + } } } #endif @@ -130,21 +129,6 @@ extension Lambda { _run(0) } - private var state: State { - get { - self.stateLock.withLock { - self._state - } - } - set { - self.stateLock.withLockVoid { - precondition(newValue.order > self._state.order, "invalid state \(newValue) after \(self._state)") - self._state = newValue - } - self.logger.debug("lambda lifecycle state: \(newValue)") - } - } - private enum State { case idle case initializing diff --git a/Sources/AWSLambdaRuntimeCore/LambdaRunner.swift b/Sources/AWSLambdaRuntimeCore/LambdaRunner.swift index a26290cf..6cd75989 100644 --- a/Sources/AWSLambdaRuntimeCore/LambdaRunner.swift +++ b/Sources/AWSLambdaRuntimeCore/LambdaRunner.swift @@ -74,14 +74,12 @@ extension Lambda { } } - #if DEBUG /// cancels the current run, if we are waiting for next invocation (long poll from Lambda control plane) /// only needed for debugging purposes. func cancelWaitingForNextInvocation() { guard self.isGettingNextInvocation else { return } self.runtimeClient.cancel() } - #endif } } diff --git a/Sources/AWSLambdaRuntimeCore/LambdaRuntimeClient.swift b/Sources/AWSLambdaRuntimeCore/LambdaRuntimeClient.swift index 65013ec1..dd6f6954 100644 --- a/Sources/AWSLambdaRuntimeCore/LambdaRuntimeClient.swift +++ b/Sources/AWSLambdaRuntimeCore/LambdaRuntimeClient.swift @@ -115,12 +115,10 @@ extension Lambda { } } - #if DEBUG /// Cancels the current request, if one is running. Only needed for debugging purposes func cancel() { self.httpClient.cancel() } - #endif } } From 18589b7c828b5d0550f23845fe421eb269a4dd1b Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Fri, 15 May 2020 10:01:31 +0200 Subject: [PATCH 12/14] Review fixes --- Sources/AWSLambdaRuntimeCore/LambdaRunner.swift | 4 ++-- Sources/StringSample/main.swift | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Sources/AWSLambdaRuntimeCore/LambdaRunner.swift b/Sources/AWSLambdaRuntimeCore/LambdaRunner.swift index 6cd75989..454535a5 100644 --- a/Sources/AWSLambdaRuntimeCore/LambdaRunner.swift +++ b/Sources/AWSLambdaRuntimeCore/LambdaRunner.swift @@ -47,12 +47,12 @@ extension Lambda { func run(logger: Logger, handler: Handler) -> EventLoopFuture { logger.debug("lambda invocation sequence starting") - // 1. request work from lambda runtime engine + // 1. request invocation from lambda runtime engine self.isGettingNextInvocation = true return self.runtimeClient.getNextInvocation(logger: logger).peekError { error in logger.error("could not fetch work from lambda runtime engine: \(error)") }.flatMap { invocation, payload in - // 2. send work to handler + // 2. send invocation to handler self.isGettingNextInvocation = false let context = Context(logger: logger, eventLoop: self.eventLoop, invocation: invocation) logger.debug("sending invocation to lambda handler \(handler)") diff --git a/Sources/StringSample/main.swift b/Sources/StringSample/main.swift index fddfa478..c7d0434a 100644 --- a/Sources/StringSample/main.swift +++ b/Sources/StringSample/main.swift @@ -12,7 +12,7 @@ // //===----------------------------------------------------------------------===// -import AWSLambdaRuntime +import AWSLambdaRuntimeCore import NIO // in this example we are receiving and responding with strings From c3dc45b93e7c2a269bf517be7e6ea01f5a56897d Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Fri, 15 May 2020 10:44:12 +0200 Subject: [PATCH 13/14] Readded state check for Lifecycle. Assert start happens in correct loop. --- .../AWSLambdaRuntimeCore/LambdaLifecycle.swift | 13 +++++++++++-- Sources/AWSLambdaRuntimeCore/LambdaRunner.swift | 5 +++-- .../AWSLambdaRuntimeCoreTests/LambdaTest.swift | 17 ++++++++++++----- 3 files changed, 26 insertions(+), 9 deletions(-) diff --git a/Sources/AWSLambdaRuntimeCore/LambdaLifecycle.swift b/Sources/AWSLambdaRuntimeCore/LambdaLifecycle.swift index 7ebbf924..488eaa01 100644 --- a/Sources/AWSLambdaRuntimeCore/LambdaLifecycle.swift +++ b/Sources/AWSLambdaRuntimeCore/LambdaLifecycle.swift @@ -27,7 +27,12 @@ extension Lambda { private let configuration: Configuration private let factory: HandlerFactory - private var state = State.idle + private var state = State.idle { + willSet { + assert(self.eventLoop.inEventLoop, "State may only be changed on the `Lifecycle`'s `eventLoop`") + precondition(newValue.order > self.state.order, "invalid state \(newValue) after \(self.state.order)") + } + } /// Create a new `Lifecycle`. /// @@ -62,8 +67,12 @@ extension Lambda { /// Start the `Lifecycle`. /// - /// - Returns: An `EventLoopFuture` that is fulfilled after the Lambda hander has been created and initiliazed, and a first run has been schduled. + /// - Returns: An `EventLoopFuture` that is fulfilled after the Lambda hander has been created and initiliazed, and a first run has been scheduled. + /// + /// - note: This method must be called on the `EventLoop` the `Lifecycle` has been initialized with. public func start() -> EventLoopFuture { + assert(self.eventLoop.inEventLoop, "Start must be called on the `EventLoop` the `Lifecycle` has been initialized with.") + logger.info("lambda lifecycle starting with \(self.configuration)") self.state = .initializing // triggered when the Lambda has finished its last run diff --git a/Sources/AWSLambdaRuntimeCore/LambdaRunner.swift b/Sources/AWSLambdaRuntimeCore/LambdaRunner.swift index 454535a5..6489c04f 100644 --- a/Sources/AWSLambdaRuntimeCore/LambdaRunner.swift +++ b/Sources/AWSLambdaRuntimeCore/LambdaRunner.swift @@ -77,8 +77,9 @@ extension Lambda { /// cancels the current run, if we are waiting for next invocation (long poll from Lambda control plane) /// only needed for debugging purposes. func cancelWaitingForNextInvocation() { - guard self.isGettingNextInvocation else { return } - self.runtimeClient.cancel() + if self.isGettingNextInvocation { + self.runtimeClient.cancel() + } } } } diff --git a/Tests/AWSLambdaRuntimeCoreTests/LambdaTest.swift b/Tests/AWSLambdaRuntimeCoreTests/LambdaTest.swift index 764f7bcf..11f2dc75 100644 --- a/Tests/AWSLambdaRuntimeCoreTests/LambdaTest.swift +++ b/Tests/AWSLambdaRuntimeCoreTests/LambdaTest.swift @@ -145,12 +145,19 @@ class LambdaTest: XCTestCase { } let result = Lambda.run(configuration: configuration, factory: { $0.makeSucceededFuture(EchoHandler()) }) - guard case .success(let invocationCount) = result else { - return XCTFail("expected to have not failed") + switch result { + case .success(let invocationCount): + // this is an allowed outcome. the lambda was cancelled while it was processing a task. + // therefore: it was shut down before asking for a new invocation + XCTAssertGreaterThan(invocationCount, 0, "should have stopped before any request made") + XCTAssertLessThan(invocationCount, maxTimes, "should have stopped before \(maxTimes)") + case .failure(HTTPClient.Errors.cancelled): + // this is an allowed outcome. while the lambda was asking for a new invocation it was + // cancelled. for this reason the get new invocation request failed with .cancelled. + break + case .failure(let error): + XCTFail("Unexpected error: \(error)") } - - XCTAssertGreaterThan(invocationCount, 0, "should have stopped before any request made") - XCTAssertLessThan(invocationCount, maxTimes, "should have stopped before \(maxTimes)") } func testTimeout() { From 7fddbb48e72a22f5eb1a1ed283152e997dcb0b5f Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Sat, 16 May 2020 00:09:08 +0200 Subject: [PATCH 14/14] final round of code review. --- Sources/AWSLambdaRuntimeCore/LambdaLifecycle.swift | 8 ++++++++ Tests/AWSLambdaRuntimeCoreTests/LambdaTest.swift | 6 ------ 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/Sources/AWSLambdaRuntimeCore/LambdaLifecycle.swift b/Sources/AWSLambdaRuntimeCore/LambdaLifecycle.swift index 488eaa01..b5f4554a 100644 --- a/Sources/AWSLambdaRuntimeCore/LambdaLifecycle.swift +++ b/Sources/AWSLambdaRuntimeCore/LambdaLifecycle.swift @@ -124,6 +124,14 @@ extension Lambda { case .success: // recursive! per aws lambda runtime spec the polling requests are to be done one at a time _run(count + 1) + case .failure(HTTPClient.Errors.cancelled): + if case .shuttingdown = self.state { + // if we ware shutting down, we expect to that the get next + // invocation request might have been cancelled. For this reason we + // succeed the promise here. + return promise.succeed(count) + } + promise.fail(HTTPClient.Errors.cancelled) case .failure(let error): promise.fail(error) } diff --git a/Tests/AWSLambdaRuntimeCoreTests/LambdaTest.swift b/Tests/AWSLambdaRuntimeCoreTests/LambdaTest.swift index 11f2dc75..91289691 100644 --- a/Tests/AWSLambdaRuntimeCoreTests/LambdaTest.swift +++ b/Tests/AWSLambdaRuntimeCoreTests/LambdaTest.swift @@ -147,14 +147,8 @@ class LambdaTest: XCTestCase { switch result { case .success(let invocationCount): - // this is an allowed outcome. the lambda was cancelled while it was processing a task. - // therefore: it was shut down before asking for a new invocation XCTAssertGreaterThan(invocationCount, 0, "should have stopped before any request made") XCTAssertLessThan(invocationCount, maxTimes, "should have stopped before \(maxTimes)") - case .failure(HTTPClient.Errors.cancelled): - // this is an allowed outcome. while the lambda was asking for a new invocation it was - // cancelled. for this reason the get new invocation request failed with .cancelled. - break case .failure(let error): XCTFail("Unexpected error: \(error)") }