diff --git a/Examples/Echo/Lambda.swift b/Examples/Echo/Lambda.swift index 2b2b5763..5d3953ac 100644 --- a/Examples/Echo/Lambda.swift +++ b/Examples/Echo/Lambda.swift @@ -25,7 +25,7 @@ struct MyLambda: LambdaHandler { // setup your resources that you want to reuse for every invocation here. } - func handle(_ input: String, context: LambdaContext) async throws -> String { + func handle(_ input: String, context: Context) async throws -> String { // as an example, respond with the input's reversed String(input.reversed()) } diff --git a/Examples/JSON/Lambda.swift b/Examples/JSON/Lambda.swift index 91b8af7b..2b871cca 100644 --- a/Examples/JSON/Lambda.swift +++ b/Examples/JSON/Lambda.swift @@ -34,7 +34,7 @@ struct MyLambda: LambdaHandler { // setup your resources that you want to reuse for every invocation here. } - func handle(_ event: Request, context: LambdaContext) async throws -> Response { + func handle(_ event: Request, context: Context) async throws -> Response { // as an example, respond with the input event's reversed body Response(body: String(event.body.reversed())) } diff --git a/Package.swift b/Package.swift index ca0db60e..8dbc5c5b 100644 --- a/Package.swift +++ b/Package.swift @@ -7,8 +7,10 @@ let package = Package( products: [ // this library exports `AWSLambdaRuntimeCore` and adds Foundation convenience methods .library(name: "AWSLambdaRuntime", targets: ["AWSLambdaRuntime"]), - // this has all the main functionality for lambda and it does not link Foundation + // this has all the main functionality for AWS Lambda and it does not link Foundation .library(name: "AWSLambdaRuntimeCore", targets: ["AWSLambdaRuntimeCore"]), +// // this is the supporting library for any AWS-like lambda runtime +// .library(name: "LambdaRuntimeCore", targets: ["LambdaRuntimeCore"]), // for testing only .library(name: "AWSLambdaTesting", targets: ["AWSLambdaTesting"]), ], @@ -24,21 +26,26 @@ let package = Package( .product(name: "NIOFoundationCompat", package: "swift-nio"), ]), .target(name: "AWSLambdaRuntimeCore", dependencies: [ + .byName(name: "LambdaRuntimeCore"), + .product(name: "NIOHTTP1", package: "swift-nio"), + .product(name: "NIOCore", package: "swift-nio"), + ]), + .target(name: "LambdaRuntimeCore", dependencies: [ .product(name: "Logging", package: "swift-log"), .product(name: "Backtrace", package: "swift-backtrace"), .product(name: "NIOHTTP1", package: "swift-nio"), .product(name: "NIOCore", package: "swift-nio"), - .product(name: "NIOConcurrencyHelpers", package: "swift-nio"), .product(name: "NIOPosix", package: "swift-nio"), ]), - .testTarget(name: "AWSLambdaRuntimeCoreTests", dependencies: [ - .byName(name: "AWSLambdaRuntimeCore"), - .product(name: "NIOTestUtils", package: "swift-nio"), - .product(name: "NIOFoundationCompat", package: "swift-nio"), - ]), +// .testTarget(name: "AWSLambdaRuntimeCoreTests", dependencies: [ +// .byName(name: "AWSLambdaRuntimeCore"), +// .product(name: "NIOTestUtils", package: "swift-nio"), +// .product(name: "NIOFoundationCompat", package: "swift-nio"), +// ]), .testTarget(name: "AWSLambdaRuntimeTests", dependencies: [ - .byName(name: "AWSLambdaRuntimeCore"), .byName(name: "AWSLambdaRuntime"), + .byName(name: "AWSLambdaRuntimeCore"), + .byName(name: "LambdaRuntimeCore"), ]), // testing helper .target(name: "AWSLambdaTesting", dependencies: [ diff --git a/Sources/AWSLambdaRuntime/Context+Foundation.swift b/Sources/AWSLambdaRuntime/Context+Foundation.swift index 780e1509..1e7f0969 100644 --- a/Sources/AWSLambdaRuntime/Context+Foundation.swift +++ b/Sources/AWSLambdaRuntime/Context+Foundation.swift @@ -12,10 +12,10 @@ // //===----------------------------------------------------------------------===// -import AWSLambdaRuntimeCore +@_spi(Lambda) import AWSLambdaRuntimeCore import struct Foundation.Date -extension LambdaContext { +extension AWSLambda.Context { var deadlineDate: Date { let secondsSinceEpoch = Double(Int64(bitPattern: self.deadline.rawValue)) / -1_000_000_000 return Date(timeIntervalSince1970: secondsSinceEpoch) diff --git a/Sources/AWSLambdaRuntimeCore/AWSLambda.swift b/Sources/AWSLambdaRuntimeCore/AWSLambda.swift new file mode 100644 index 00000000..62eb30aa --- /dev/null +++ b/Sources/AWSLambdaRuntimeCore/AWSLambda.swift @@ -0,0 +1,27 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftAWSLambdaRuntime open source project +// +// Copyright (c) 2022 Apple Inc. and the SwiftAWSLambdaRuntime project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +@_exported import LambdaRuntimeCore + +public enum AWSLambda {} + +extension AWSLambda: LambdaProvider { + public static var runtimeEngineAddress: String? { + Lambda.env("AWS_LAMBDA_RUNTIME_API") + } +} + +extension ByteBufferLambdaHandler where Provider == AWSLambda { + public typealias Provider = AWSLambda +} diff --git a/Sources/AWSLambdaRuntimeCore/ControlPlaneRequest.swift b/Sources/AWSLambdaRuntimeCore/ControlPlaneRequest.swift index 14c5f2a7..53282efc 100644 --- a/Sources/AWSLambdaRuntimeCore/ControlPlaneRequest.swift +++ b/Sources/AWSLambdaRuntimeCore/ControlPlaneRequest.swift @@ -2,7 +2,7 @@ // // This source file is part of the SwiftAWSLambdaRuntime open source project // -// Copyright (c) 2017-2021 Apple Inc. and the SwiftAWSLambdaRuntime project authors +// Copyright (c) 2021-2022 Apple Inc. and the SwiftAWSLambdaRuntime project authors // Licensed under Apache License v2.0 // // See LICENSE.txt for license information @@ -12,68 +12,59 @@ // //===----------------------------------------------------------------------===// -import NIOCore +@_spi(Lambda) import LambdaRuntimeCore import NIOHTTP1 -enum ControlPlaneRequest: Hashable { - case next - case invocationResponse(String, ByteBuffer?) - case invocationError(String, ErrorResponse) - case initializationError(ErrorResponse) -} - -enum ControlPlaneResponse: Hashable { - case next(Invocation, ByteBuffer) - case accepted - case error(ErrorResponse) -} +extension AWSLambda { + public struct Invocation: LambdaInvocation { + @_spi(Lambda) public var requestID: String + @_spi(Lambda) public var deadlineInMillisSinceEpoch: Int64 + @_spi(Lambda) public var invokedFunctionARN: String + @_spi(Lambda) public var traceID: String + @_spi(Lambda) public var clientContext: String? + @_spi(Lambda) public var cognitoIdentity: String? -struct Invocation: Hashable { - let requestID: String - let deadlineInMillisSinceEpoch: Int64 - let invokedFunctionARN: String - let traceID: String - let clientContext: String? - let cognitoIdentity: String? - - init(headers: HTTPHeaders) throws { - guard let requestID = headers.first(name: AmazonHeaders.requestID), !requestID.isEmpty else { - throw Lambda.RuntimeError.invocationMissingHeader(AmazonHeaders.requestID) + init(requestID: String, + deadlineInMillisSinceEpoch: Int64, + invokedFunctionARN: String, + traceID: String, + clientContext: String?, + cognitoIdentity: String?) { + self.requestID = requestID + self.deadlineInMillisSinceEpoch = deadlineInMillisSinceEpoch + self.invokedFunctionARN = invokedFunctionARN + self.traceID = traceID + self.clientContext = clientContext + self.cognitoIdentity = cognitoIdentity } - guard let deadline = headers.first(name: AmazonHeaders.deadline), - let unixTimeInMilliseconds = Int64(deadline) - else { - throw Lambda.RuntimeError.invocationMissingHeader(AmazonHeaders.deadline) - } + @_spi(Lambda) + public init(headers: HTTPHeaders) throws { + guard let requestID = headers.first(name: AmazonHeaders.requestID), !requestID.isEmpty else { + throw LambdaRuntimeError.invocationHeadMissingRequestID + } - guard let invokedFunctionARN = headers.first(name: AmazonHeaders.invokedFunctionARN) else { - throw Lambda.RuntimeError.invocationMissingHeader(AmazonHeaders.invokedFunctionARN) - } + guard let deadline = headers.first(name: AmazonHeaders.deadline) else { + throw LambdaRuntimeError.invocationHeadMissingDeadlineInMillisSinceEpoch + } + guard let unixTimeInMilliseconds = Int64(deadline) else { + throw LambdaRuntimeError.responseHeadInvalidDeadlineValue + } - self.requestID = requestID - self.deadlineInMillisSinceEpoch = unixTimeInMilliseconds - self.invokedFunctionARN = invokedFunctionARN - self.traceID = headers.first(name: AmazonHeaders.traceID) ?? "Root=\(AmazonHeaders.generateXRayTraceID());Sampled=0" - self.clientContext = headers["Lambda-Runtime-Client-Context"].first - self.cognitoIdentity = headers["Lambda-Runtime-Cognito-Identity"].first - } -} + guard let invokedFunctionARN = headers.first(name: AmazonHeaders.invokedFunctionARN) else { + throw LambdaRuntimeError.invocationHeadMissingFunctionARN + } -struct ErrorResponse: Hashable, Codable { - var errorType: String - var errorMessage: String -} + let traceID = headers.first(name: AmazonHeaders.traceID) ?? "Root=\(AmazonHeaders.generateXRayTraceID());Sampled=0" -extension ErrorResponse { - internal func toJSONBytes() -> [UInt8] { - var bytes = [UInt8]() - bytes.append(UInt8(ascii: "{")) - bytes.append(contentsOf: #""errorType":"#.utf8) - self.errorType.encodeAsJSONString(into: &bytes) - bytes.append(contentsOf: #","errorMessage":"#.utf8) - self.errorMessage.encodeAsJSONString(into: &bytes) - bytes.append(UInt8(ascii: "}")) - return bytes + self.init( + requestID: requestID, + deadlineInMillisSinceEpoch: unixTimeInMilliseconds, + invokedFunctionARN: invokedFunctionARN, + traceID: traceID, + clientContext: headers["Lambda-Runtime-Client-Context"].first, + cognitoIdentity: headers["Lambda-Runtime-Cognito-Identity"].first + ) + } } } diff --git a/Sources/AWSLambdaRuntimeCore/ControlPlaneRequestEncoder.swift b/Sources/AWSLambdaRuntimeCore/ControlPlaneRequestEncoder.swift index a91e1e44..cbf580ec 100644 --- a/Sources/AWSLambdaRuntimeCore/ControlPlaneRequestEncoder.swift +++ b/Sources/AWSLambdaRuntimeCore/ControlPlaneRequestEncoder.swift @@ -2,7 +2,7 @@ // // This source file is part of the SwiftAWSLambdaRuntime open source project // -// Copyright (c) 2021 Apple Inc. and the SwiftAWSLambdaRuntime project authors +// Copyright (c) 2021-2022 Apple Inc. and the SwiftAWSLambdaRuntime project authors // Licensed under Apache License v2.0 // // See LICENSE.txt for license information @@ -12,103 +12,105 @@ // //===----------------------------------------------------------------------===// +@_spi(Lambda) import LambdaRuntimeCore import NIOCore -struct ControlPlaneRequestEncoder: _EmittingChannelHandler { - typealias OutboundOut = ByteBuffer +@_spi(Lambda) +extension AWSLambda { + public struct RequestEncoder: ControlPlaneRequestEncoder { + private var host: String + private var byteBuffer: ByteBuffer! - private var host: String - private var byteBuffer: ByteBuffer! + public init(host: String) { + self.host = host + } - init(host: String) { - self.host = host - } + public mutating func writeRequest(_ request: ControlPlaneRequest, context: ChannelHandlerContext, promise: EventLoopPromise?) { + self.byteBuffer.clear(minimumCapacity: self.byteBuffer.storageCapacity) - mutating func writeRequest(_ request: ControlPlaneRequest, context: ChannelHandlerContext, promise: EventLoopPromise?) { - self.byteBuffer.clear(minimumCapacity: self.byteBuffer.storageCapacity) - - switch request { - case .next: - self.byteBuffer.writeString(.nextInvocationRequestLine) - self.byteBuffer.writeHostHeader(host: self.host) - self.byteBuffer.writeString(.userAgentHeader) - self.byteBuffer.writeString(.CRLF) // end of head - context.write(self.wrapOutboundOut(self.byteBuffer), promise: promise) - context.flush() - - case .invocationResponse(let requestID, let payload): - let contentLength = payload?.readableBytes ?? 0 - self.byteBuffer.writeInvocationResultRequestLine(requestID) - self.byteBuffer.writeHostHeader(host: self.host) - self.byteBuffer.writeString(.userAgentHeader) - self.byteBuffer.writeContentLengthHeader(length: contentLength) - self.byteBuffer.writeString(.CRLF) // end of head - if let payload = payload, contentLength > 0 { - context.write(self.wrapOutboundOut(self.byteBuffer), promise: nil) - context.write(self.wrapOutboundOut(payload), promise: promise) - } else { + switch request { + case .next: + self.byteBuffer.writeString(.nextInvocationRequestLine) + self.byteBuffer.writeHostHeader(host: self.host) + self.byteBuffer.writeString(.userAgentHeader) + self.byteBuffer.writeString(.CRLF) // end of head + context.write(self.wrapOutboundOut(self.byteBuffer), promise: promise) + context.flush() + + case .invocationResponse(let requestID, let payload): + let contentLength = payload?.readableBytes ?? 0 + self.byteBuffer.writeInvocationResultRequestLine(requestID) + self.byteBuffer.writeHostHeader(host: self.host) + self.byteBuffer.writeString(.userAgentHeader) + self.byteBuffer.writeContentLengthHeader(length: contentLength) + self.byteBuffer.writeString(.CRLF) // end of head + if let payload = payload, contentLength > 0 { + context.write(self.wrapOutboundOut(self.byteBuffer), promise: nil) + context.write(self.wrapOutboundOut(payload), promise: promise) + } else { + context.write(self.wrapOutboundOut(self.byteBuffer), promise: promise) + } + context.flush() + + case .invocationError(let requestID, let errorMessage): + let payload = errorMessage.toJSONBytes() + self.byteBuffer.writeInvocationErrorRequestLine(requestID) + self.byteBuffer.writeContentLengthHeader(length: payload.count) + self.byteBuffer.writeHostHeader(host: self.host) + self.byteBuffer.writeString(.userAgentHeader) + self.byteBuffer.writeString(.unhandledErrorHeader) + self.byteBuffer.writeString(.CRLF) // end of head + self.byteBuffer.writeBytes(payload) + context.write(self.wrapOutboundOut(self.byteBuffer), promise: promise) + context.flush() + + case .initializationError(let errorMessage): + let payload = errorMessage.toJSONBytes() + self.byteBuffer.writeString(.runtimeInitErrorRequestLine) + self.byteBuffer.writeContentLengthHeader(length: payload.count) + self.byteBuffer.writeHostHeader(host: self.host) + self.byteBuffer.writeString(.userAgentHeader) + self.byteBuffer.writeString(.unhandledErrorHeader) + self.byteBuffer.writeString(.CRLF) // end of head + self.byteBuffer.writeBytes(payload) context.write(self.wrapOutboundOut(self.byteBuffer), promise: promise) + context.flush() } - context.flush() - - case .invocationError(let requestID, let errorMessage): - let payload = errorMessage.toJSONBytes() - self.byteBuffer.writeInvocationErrorRequestLine(requestID) - self.byteBuffer.writeContentLengthHeader(length: payload.count) - self.byteBuffer.writeHostHeader(host: self.host) - self.byteBuffer.writeString(.userAgentHeader) - self.byteBuffer.writeString(.unhandledErrorHeader) - self.byteBuffer.writeString(.CRLF) // end of head - self.byteBuffer.writeBytes(payload) - context.write(self.wrapOutboundOut(self.byteBuffer), promise: promise) - context.flush() - - case .initializationError(let errorMessage): - let payload = errorMessage.toJSONBytes() - self.byteBuffer.writeString(.runtimeInitErrorRequestLine) - self.byteBuffer.writeContentLengthHeader(length: payload.count) - self.byteBuffer.writeHostHeader(host: self.host) - self.byteBuffer.writeString(.userAgentHeader) - self.byteBuffer.writeString(.unhandledErrorHeader) - self.byteBuffer.writeString(.CRLF) // end of head - self.byteBuffer.writeBytes(payload) - context.write(self.wrapOutboundOut(self.byteBuffer), promise: promise) - context.flush() } - } - mutating func writerAdded(context: ChannelHandlerContext) { - self.byteBuffer = context.channel.allocator.buffer(capacity: 256) - } + public mutating func writerAdded(context: ChannelHandlerContext) { + self.byteBuffer = context.channel.allocator.buffer(capacity: 256) + } - mutating func writerRemoved(context: ChannelHandlerContext) { - self.byteBuffer = nil + public mutating func writerRemoved(context: ChannelHandlerContext) { + self.byteBuffer = nil + } } } extension String { - static let CRLF: String = "\r\n" + fileprivate static let CRLF: String = "\r\n" - static let userAgentHeader: String = "user-agent: Swift-Lambda/Unknown\r\n" - static let unhandledErrorHeader: String = "lambda-runtime-function-error-type: Unhandled\r\n" + fileprivate static let userAgentHeader: String = "user-agent: Swift-Lambda/Unknown\r\n" + fileprivate static let unhandledErrorHeader: String = "lambda-runtime-function-error-type: Unhandled\r\n" - static let nextInvocationRequestLine: String = + fileprivate static let nextInvocationRequestLine: String = "GET /2018-06-01/runtime/invocation/next HTTP/1.1\r\n" - static let runtimeInitErrorRequestLine: String = + fileprivate static let runtimeInitErrorRequestLine: String = "POST /2018-06-01/runtime/init/error HTTP/1.1\r\n" } extension ByteBuffer { - fileprivate mutating func writeInvocationResultRequestLine(_ requestID: String) { + fileprivate mutating func writeInvocationResultRequestLine(_ requestID: LambdaRequestID) { self.writeString("POST /2018-06-01/runtime/invocation/") - self.writeString(requestID) + self.writeRequestID(requestID) self.writeString("/response HTTP/1.1\r\n") } - fileprivate mutating func writeInvocationErrorRequestLine(_ requestID: String) { + fileprivate mutating func writeInvocationErrorRequestLine(_ requestID: LambdaRequestID) { self.writeString("POST /2018-06-01/runtime/invocation/") - self.writeString(requestID) + self.writeRequestID(requestID) self.writeString("/error HTTP/1.1\r\n") } diff --git a/Sources/AWSLambdaRuntimeCore/ControlPlaneResponseDecoder.swift b/Sources/AWSLambdaRuntimeCore/ControlPlaneResponseDecoder.swift new file mode 100644 index 00000000..dc364bb1 --- /dev/null +++ b/Sources/AWSLambdaRuntimeCore/ControlPlaneResponseDecoder.swift @@ -0,0 +1,530 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftAWSLambdaRuntime open source project +// +// Copyright (c) 2022 Apple Inc. and the SwiftAWSLambdaRuntime project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +@_spi(Lambda) import LambdaRuntimeCore +import NIOCore + +@_spi(Lambda) +extension AWSLambda { + public struct ResponseDecoder: ControlPlaneResponseDecoder { + public typealias Invocation = AWSLambda.Invocation + public typealias InboundOut = ControlPlaneResponse + + private enum State { + case waitingForNewResponse + case parsingHead(PartialHead) + case waitingForBody(PartialHead) + case receivingBody(PartialHead, ByteBuffer) + } + + private var state: State + + public init() { + self.state = .waitingForNewResponse + } + + public mutating func decode(buffer: inout ByteBuffer) throws -> Response? { + switch self.state { + case .waitingForNewResponse: + guard case .decoded(let head) = try self.decodeResponseHead(from: &buffer) else { + return nil + } + + guard case .decoded(let body) = try self.decodeBody(from: &buffer) else { + return nil + } + + return try self.decodeResponse(head: head, body: body) + + case .parsingHead: + guard case .decoded(let head) = try self.decodeHeaderLines(from: &buffer) else { + return nil + } + + guard case .decoded(let body) = try self.decodeBody(from: &buffer) else { + return nil + } + + return try self.decodeResponse(head: head, body: body) + + case .waitingForBody(let head), .receivingBody(let head, _): + guard case .decoded(let body) = try self.decodeBody(from: &buffer) else { + return nil + } + + return try self.decodeResponse(head: head, body: body) + } + } + + public mutating func decodeLast(buffer: inout ByteBuffer, seenEOF: Bool) throws -> Response? { + try self.decode(buffer: &buffer) + } + + // MARK: - Private Methods - + + private enum DecodeResult { + case needMoreData + case decoded(T) + } + + private mutating func decodeResponseHead(from buffer: inout ByteBuffer) throws -> DecodeResult { + guard case .decoded = try self.decodeResponseStatusLine(from: &buffer) else { + return .needMoreData + } + + return try self.decodeHeaderLines(from: &buffer) + } + + private mutating func decodeResponseStatusLine(from buffer: inout ByteBuffer) throws -> DecodeResult { + guard case .waitingForNewResponse = self.state else { + preconditionFailure("Invalid state: \(self.state)") + } + + guard case .decoded(var lineBuffer) = try self.decodeCRLFTerminatedLine(from: &buffer) else { + return .needMoreData + } + + let statusCode = try self.decodeStatusLine(from: &lineBuffer) + self.state = .parsingHead(.init(statusCode: statusCode)) + return .decoded(statusCode) + } + + private mutating func decodeHeaderLines(from buffer: inout ByteBuffer) throws -> DecodeResult { + guard case .parsingHead(var head) = self.state else { + preconditionFailure("Invalid state: \(self.state)") + } + + while true { + guard case .decoded(var nextLine) = try self.decodeCRLFTerminatedLine(from: &buffer) else { + self.state = .parsingHead(head) + return .needMoreData + } + + switch try self.decodeHeaderLine(from: &nextLine) { + case .headerEnd: + self.state = .waitingForBody(head) + return .decoded(head) + + case .contentLength(let length): + head.contentLength = length // TODO: This can crash + + case .contentType: + break // switch + + case .requestID(let requestID): + head.requestID = requestID + + case .traceID(let traceID): + head.traceID = traceID + + case .functionARN(let arn): + head.invokedFunctionARN = arn + + case .cognitoIdentity(let cognitoIdentity): + head.cognitoIdentity = cognitoIdentity + + case .deadlineMS(let deadline): + head.deadlineInMillisSinceEpoch = deadline + + case .ignore: + break // switch + } + } + } + + enum BodyEncoding { + case chunked + case plain(length: Int) + case none + } + + private mutating func decodeBody(from buffer: inout ByteBuffer) throws -> DecodeResult { + switch self.state { + case .waitingForBody(let partialHead): + switch partialHead.contentLength { + case .none: + return .decoded(nil) + case .some(let length): + if let slice = buffer.readSlice(length: length) { + self.state = .waitingForNewResponse + return .decoded(slice) + } + return .needMoreData + } + + case .waitingForNewResponse, .parsingHead, .receivingBody: + preconditionFailure("Invalid state: \(self.state)") + } + } + + private mutating func decodeResponse(head: PartialHead, body: ByteBuffer?) throws -> Response { + switch head.statusCode { + case 200: + guard let body = body else { + preconditionFailure("TODO: implement") + } + return .next(try Invocation(head: head), body) + case 202: + return .accepted + case 400 ..< 600: + preconditionFailure("TODO: implement") + + default: + throw LambdaRuntimeError.unexpectedStatusCode + } + } + + mutating func decodeStatusLine(from buffer: inout ByteBuffer) throws -> Int { + guard buffer.readableBytes >= 11 else { + throw LambdaRuntimeError.responseHeadInvalidStatusLine + } + + guard buffer.readString("HTTP/1.1 ") else { + throw LambdaRuntimeError.responseHeadInvalidStatusLine + } + + let statusAsString = buffer.readString(length: 3)! + guard let status = Int(statusAsString) else { + throw LambdaRuntimeError.responseHeadInvalidStatusLine + } + + return status + } + + private mutating func decodeCRLFTerminatedLine(from buffer: inout ByteBuffer) throws -> DecodeResult { + guard let crIndex = buffer.readableBytesView.firstIndex(of: UInt8(ascii: "\r")) else { + if buffer.readableBytes > 256 { + throw LambdaRuntimeError.responseHeadMoreThan256BytesBeforeCRLF + } + return .needMoreData + } + let lfIndex = buffer.readableBytesView.index(after: crIndex) + guard lfIndex < buffer.readableBytesView.endIndex else { + // the buffer is split exactly after the \r and \n. Let's wait for more data + return .needMoreData + } + + guard buffer.readableBytesView[lfIndex] == UInt8(ascii: "\n") else { + throw LambdaRuntimeError.responseHeadInvalidHeader + } + + let slice = buffer.readSlice(length: crIndex - buffer.readerIndex)! + buffer.moveReaderIndex(forwardBy: 2) // move over \r\n + return .decoded(slice) + } + + private enum HeaderLineContent: Equatable { + case traceID(String) + case contentType + case contentLength(Int) + case cognitoIdentity(String) + case deadlineMS(Int) + case functionARN(String) + case requestID(LambdaRequestID) + + case ignore + case headerEnd + } + + private mutating func decodeHeaderLine(from buffer: inout ByteBuffer) throws -> HeaderLineContent { + guard let colonIndex = buffer.readableBytesView.firstIndex(of: UInt8(ascii: ":")) else { + if buffer.readableBytes == 0 { + return .headerEnd + } + throw LambdaRuntimeError.responseHeadHeaderMissingColon + } + + // based on colonIndex we can already make some good guesses... + // 4: Date + // 12: Content-Type + // 14: Content-Length + // 17: Transfer-Encoding + // 23: Lambda-Runtime-Trace-Id + // 26: Lambda-Runtime-Deadline-Ms + // 29: Lambda-Runtime-Aws-Request-Id + // Lambda-Runtime-Client-Context + // 31: Lambda-Runtime-Cognito-Identity + // 35: Lambda-Runtime-Invoked-Function-Arn + + switch colonIndex { + case 4: + if buffer.readHeaderName("date") { + return .ignore + } + + case 12: + if buffer.readHeaderName("content-type") { + return .ignore + } + + case 14: + if buffer.readHeaderName("content-length") { + buffer.moveReaderIndex(forwardBy: 1) // move forward for colon + try self.decodeOptionalWhiteSpaceBeforeFieldValue(from: &buffer) + guard let length = buffer.readIntegerFromHeader() else { + throw LambdaRuntimeError.responseHeadInvalidContentLengthValue + } + return .contentLength(length) + } + + case 17: + if buffer.readHeaderName("transfer-encoding") { + buffer.moveReaderIndex(forwardBy: 1) // move forward for colon + try self.decodeOptionalWhiteSpaceBeforeFieldValue(from: &buffer) + guard let length = buffer.readIntegerFromHeader() else { + throw LambdaRuntimeError.responseHeadInvalidDeadlineValue + } + return .contentLength(length) + } + + case 23: + if buffer.readHeaderName("lambda-runtime-trace-id") { + buffer.moveReaderIndex(forwardBy: 1) + guard let string = try self.decodeHeaderValue(from: &buffer) else { + throw LambdaRuntimeError.responseHeadInvalidTraceIDValue + } + return .traceID(string) + } + + case 26: + if buffer.readHeaderName("lambda-runtime-deadline-ms") { + buffer.moveReaderIndex(forwardBy: 1) // move forward for colon + try self.decodeOptionalWhiteSpaceBeforeFieldValue(from: &buffer) + guard let deadline = buffer.readIntegerFromHeader() else { + throw LambdaRuntimeError.responseHeadInvalidContentLengthValue + } + return .deadlineMS(deadline) + } + + case 29: + if buffer.readHeaderName("lambda-runtime-aws-request-id") { + buffer.moveReaderIndex(forwardBy: 1) // move forward for colon + try self.decodeOptionalWhiteSpaceBeforeFieldValue(from: &buffer) + guard let requestID = buffer.readRequestID() else { + throw LambdaRuntimeError.responseHeadInvalidRequestIDValue + } + return .requestID(requestID) + } + if buffer.readHeaderName("lambda-runtime-client-context") { + return .ignore + } + + case 31: + if buffer.readHeaderName("lambda-runtime-cognito-identity") { + return .ignore + } + + case 35: + if buffer.readHeaderName("lambda-runtime-invoked-function-arn") { + buffer.moveReaderIndex(forwardBy: 1) + guard let string = try self.decodeHeaderValue(from: &buffer) else { + throw LambdaRuntimeError.responseHeadInvalidTraceIDValue + } + return .functionARN(string) + } + + default: + // Ensure we received a valid http header: + break // fallthrough + } + + // We received a header we didn't expect, let's ensure it is valid. + let satisfy = buffer.readableBytesView[0 ..< colonIndex].allSatisfy { char -> Bool in + switch char { + case UInt8(ascii: "a") ... UInt8(ascii: "z"), + UInt8(ascii: "A") ... UInt8(ascii: "Z"), + UInt8(ascii: "0") ... UInt8(ascii: "9"), + UInt8(ascii: "!"), + UInt8(ascii: "#"), + UInt8(ascii: "$"), + UInt8(ascii: "%"), + UInt8(ascii: "&"), + UInt8(ascii: "'"), + UInt8(ascii: "*"), + UInt8(ascii: "+"), + UInt8(ascii: "-"), + UInt8(ascii: "."), + UInt8(ascii: "^"), + UInt8(ascii: "_"), + UInt8(ascii: "`"), + UInt8(ascii: "|"), + UInt8(ascii: "~"): + return true + default: + return false + } + } + + guard satisfy else { + throw LambdaRuntimeError.responseHeadHeaderInvalidCharacter + } + + return .ignore + } + + @discardableResult + mutating func decodeOptionalWhiteSpaceBeforeFieldValue(from buffer: inout ByteBuffer) throws -> Int { + let startIndex = buffer.readerIndex + guard let index = buffer.readableBytesView.firstIndex(where: { $0 != UInt8(ascii: " ") && $0 != UInt8(ascii: "\t") }) else { + throw LambdaRuntimeError.responseHeadHeaderMissingFieldValue + } + buffer.moveReaderIndex(to: index) + return index - startIndex + } + + private func decodeHeaderValue(from buffer: inout ByteBuffer) throws -> String? { + func isNotOptionalWhiteSpace(_ val: UInt8) -> Bool { + val != UInt8(ascii: " ") && val != UInt8(ascii: "\t") + } + + guard let firstCharacterIndex = buffer.readableBytesView.firstIndex(where: isNotOptionalWhiteSpace), + let lastCharacterIndex = buffer.readableBytesView.lastIndex(where: isNotOptionalWhiteSpace) + else { + throw LambdaRuntimeError.responseHeadHeaderMissingFieldValue + } + + let string = buffer.getString(at: firstCharacterIndex, length: lastCharacterIndex + 1 - firstCharacterIndex) + buffer.moveReaderIndex(to: buffer.writerIndex) + return string + } + } +} + +extension AWSLambda.ResponseDecoder { + fileprivate struct PartialHead { + var statusCode: Int + var contentLength: Int? + + var requestID: LambdaRequestID? + var deadlineInMillisSinceEpoch: Int? + var invokedFunctionARN: String? + var traceID: String? + var clientContext: String? + var cognitoIdentity: String? + + init(statusCode: Int) { + self.statusCode = statusCode + self.contentLength = nil + + self.requestID = nil + self.deadlineInMillisSinceEpoch = nil + self.invokedFunctionARN = nil + self.traceID = nil + self.clientContext = nil + self.cognitoIdentity = nil + } + } +} + +extension AWSLambda.Invocation { + fileprivate init(head: AWSLambda.ResponseDecoder.PartialHead) throws { + guard let requestID = head.requestID else { + throw LambdaRuntimeError.invocationHeadMissingRequestID + } + + guard let deadlineInMillisSinceEpoch = head.deadlineInMillisSinceEpoch else { + throw LambdaRuntimeError.invocationHeadMissingDeadlineInMillisSinceEpoch + } + + guard let invokedFunctionARN = head.invokedFunctionARN else { + throw LambdaRuntimeError.invocationHeadMissingFunctionARN + } + + guard let traceID = head.traceID else { + throw LambdaRuntimeError.invocationHeadMissingTraceID + } + + self = .init( + requestID: requestID.lowercased, + deadlineInMillisSinceEpoch: Int64(deadlineInMillisSinceEpoch), + invokedFunctionARN: invokedFunctionARN, + traceID: traceID, + clientContext: head.clientContext, + cognitoIdentity: head.cognitoIdentity + ) + } +} + +extension ByteBuffer { + fileprivate mutating func readString(_ string: String) -> Bool { + let result = self.withUnsafeReadableBytes { inputBuffer in + string.utf8.withContiguousStorageIfAvailable { validateBuffer -> Bool in + assert(inputBuffer.count >= validateBuffer.count) + + for idx in 0 ..< validateBuffer.count { + if inputBuffer[idx] != validateBuffer[idx] { + return false + } + } + return true + } + }! + + if result { + self.moveReaderIndex(forwardBy: string.utf8.count) + return true + } + + return false + } + + fileprivate mutating func readHeaderName(_ name: String) -> Bool { + let result = self.withUnsafeReadableBytes { inputBuffer in + name.utf8.withContiguousStorageIfAvailable { nameBuffer -> Bool in + assert(inputBuffer.count >= nameBuffer.count) + + for idx in 0 ..< nameBuffer.count { + // let's hope this gets vectorised ;) + if inputBuffer[idx] & 0xDF != nameBuffer[idx] & 0xDF { + return false + } + } + return true + } + }! + + if result { + self.moveReaderIndex(forwardBy: name.utf8.count) + return true + } + + return false + } + + mutating func readIntegerFromHeader() -> Int? { + guard let ascii = self.readInteger(as: UInt8.self), UInt8(ascii: "0") <= ascii && ascii <= UInt8(ascii: "9") else { + return nil + } + var value = Int(ascii - UInt8(ascii: "0")) + loop: while let ascii = self.readInteger(as: UInt8.self) { + switch ascii { + case UInt8(ascii: "0") ... UInt8(ascii: "9"): + value = value * 10 + value += Int(ascii - UInt8(ascii: "0")) + + case UInt8(ascii: " "), UInt8(ascii: "\t"): + // verify that all following characters are also whitespace + guard self.readableBytesView.allSatisfy({ $0 == UInt8(ascii: " ") || $0 == UInt8(ascii: "\t") }) else { + return nil + } + return value + + default: + return nil + } + } + + return value + } +} diff --git a/Sources/AWSLambdaRuntimeCore/Lambda.swift b/Sources/AWSLambdaRuntimeCore/Lambda.swift deleted file mode 100644 index 1bf4f0dc..00000000 --- a/Sources/AWSLambdaRuntimeCore/Lambda.swift +++ /dev/null @@ -1,99 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the SwiftAWSLambdaRuntime open source project -// -// Copyright (c) 2017-2018 Apple Inc. and the SwiftAWSLambdaRuntime project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -#if os(Linux) -import Glibc -#else -import Darwin.C -#endif - -import Backtrace -import Logging -import NIOCore -import NIOPosix - -public enum Lambda { - /// Utility to access/read environment variables - public static func env(_ name: String) -> String? { - guard let value = getenv(name) else { - return nil - } - return String(cString: value) - } - - /// Run a Lambda defined by implementing the ``ByteBufferLambdaHandler`` protocol. - /// The Runtime will manage the Lambdas application lifecycle automatically. It will invoke the - /// ``ByteBufferLambdaHandler/makeHandler(context:)`` to create a new Handler. - /// - /// - parameters: - /// - configuration: A Lambda runtime configuration object - /// - handlerType: The Handler to create and invoke. - /// - /// - note: This is a blocking operation that will run forever, as its lifecycle is managed by the AWS Lambda Runtime Engine. - internal static func run( - configuration: Configuration = .init(), - handlerType: Handler.Type - ) -> Result { - let _run = { (configuration: Configuration) -> Result in - Backtrace.install() - var logger = Logger(label: "Lambda") - logger.logLevel = configuration.general.logLevel - - var result: Result! - MultiThreadedEventLoopGroup.withCurrentThreadAsEventLoop { eventLoop in - let runtime = LambdaRuntime(eventLoop: eventLoop, logger: logger, configuration: configuration) - #if DEBUG - let signalSource = trap(signal: configuration.lifecycle.stopSignal) { signal in - logger.info("intercepted signal: \(signal)") - runtime.shutdown() - } - #endif - - runtime.start().flatMap { - runtime.shutdownFuture - }.whenComplete { lifecycleResult in - #if DEBUG - signalSource.cancel() - #endif - eventLoop.shutdownGracefully { error in - if let error = error { - preconditionFailure("Failed to shutdown eventloop: \(error)") - } - } - result = lifecycleResult - } - } - - logger.info("shutdown completed") - return result - } - - // start local server for debugging in DEBUG mode only - #if DEBUG - if Lambda.env("LOCAL_LAMBDA_SERVER_ENABLED").flatMap(Bool.init) ?? false { - do { - return try Lambda.withLocalServer { - _run(configuration) - } - } catch { - return .failure(error) - } - } else { - return _run(configuration) - } - #else - return _run(configuration) - #endif - } -} diff --git a/Sources/AWSLambdaRuntimeCore/LambdaContext.swift b/Sources/AWSLambdaRuntimeCore/LambdaContext.swift index 39e12439..058d3ae0 100644 --- a/Sources/AWSLambdaRuntimeCore/LambdaContext.swift +++ b/Sources/AWSLambdaRuntimeCore/LambdaContext.swift @@ -2,7 +2,7 @@ // // This source file is part of the SwiftAWSLambdaRuntime open source project // -// Copyright (c) 2017-2020 Apple Inc. and the SwiftAWSLambdaRuntime project authors +// Copyright (c) 2017-2022 Apple Inc. and the SwiftAWSLambdaRuntime project authors // Licensed under Apache License v2.0 // // See LICENSE.txt for license information @@ -13,219 +13,162 @@ //===----------------------------------------------------------------------===// import Dispatch +@_spi(Lambda) import LambdaRuntimeCore import Logging import NIOCore -// MARK: - InitializationContext +extension AWSLambda { + /// Lambda runtime context. + /// The Lambda runtime generates and passes the `Context` to the Lambda handler as an argument. + public struct Context { + final class _Storage { + var requestID: String + var traceID: String + var invokedFunctionARN: String + var deadline: DispatchWallTime + var cognitoIdentity: String? + var clientContext: String? + var logger: Logger + var eventLoop: EventLoop + var allocator: ByteBufferAllocator + + init( + requestID: String, + traceID: String, + invokedFunctionARN: String, + deadline: DispatchWallTime, + cognitoIdentity: String?, + clientContext: String?, + logger: Logger, + eventLoop: EventLoop, + allocator: ByteBufferAllocator + ) { + self.requestID = requestID + self.traceID = traceID + self.invokedFunctionARN = invokedFunctionARN + self.deadline = deadline + self.cognitoIdentity = cognitoIdentity + self.clientContext = clientContext + self.logger = logger + self.eventLoop = eventLoop + self.allocator = allocator + } + } + + private var storage: _Storage + + /// The request ID, which identifies the request that triggered the function invocation. + public var requestID: String { + self.storage.requestID + } + + /// The AWS X-Ray tracing header. + public var traceID: String { + self.storage.traceID + } + + /// The ARN of the Lambda function, version, or alias that's specified in the invocation. + public var invokedFunctionARN: String { + self.storage.invokedFunctionARN + } + + /// The timestamp that the function times out + public var deadline: DispatchWallTime { + self.storage.deadline + } + + /// For invocations from the AWS Mobile SDK, data about the Amazon Cognito identity provider. + public var cognitoIdentity: String? { + self.storage.cognitoIdentity + } + + /// For invocations from the AWS Mobile SDK, data about the client application and device. + public var clientContext: String? { + self.storage.clientContext + } -extension Lambda { - /// Lambda runtime initialization context. - /// The Lambda runtime generates and passes the `InitializationContext` to the Handlers - /// ``ByteBufferLambdaHandler/makeHandler(context:)`` or ``LambdaHandler/init(context:)`` - /// as an argument. - public struct InitializationContext { /// `Logger` to log with /// /// - note: The `LogLevel` can be configured using the `LOG_LEVEL` environment variable. - public let logger: Logger + public var logger: Logger { + self.storage.logger + } /// The `EventLoop` the Lambda is executed on. Use this to schedule work with. + /// This is useful when implementing the `EventLoopLambdaHandler` protocol. /// /// - note: The `EventLoop` is shared with the Lambda runtime engine and should be handled with extra care. /// Most importantly the `EventLoop` must never be blocked. - public let eventLoop: EventLoop + public var eventLoop: EventLoop { + self.storage.eventLoop + } /// `ByteBufferAllocator` to allocate `ByteBuffer` - public let allocator: ByteBufferAllocator - - init(logger: Logger, eventLoop: EventLoop, allocator: ByteBufferAllocator) { - self.eventLoop = eventLoop - self.logger = logger - self.allocator = allocator + /// This is useful when implementing `EventLoopLambdaHandler` + public var allocator: ByteBufferAllocator { + self.storage.allocator } - /// This interface is not part of the public API and must not be used by adopters. This API is not part of semver versioning. - public static func __forTestsOnly( - logger: Logger, - eventLoop: EventLoop - ) -> InitializationContext { - InitializationContext( + init(requestID: String, + traceID: String, + invokedFunctionARN: String, + deadline: DispatchWallTime, + cognitoIdentity: String? = nil, + clientContext: String? = nil, + logger: Logger, + eventLoop: EventLoop, + allocator: ByteBufferAllocator) { + self.storage = _Storage( + requestID: requestID, + traceID: traceID, + invokedFunctionARN: invokedFunctionARN, + deadline: deadline, + cognitoIdentity: cognitoIdentity, + clientContext: clientContext, logger: logger, eventLoop: eventLoop, - allocator: ByteBufferAllocator() + allocator: allocator ) } - } -} -// MARK: - Context - -/// Lambda runtime context. -/// The Lambda runtime generates and passes the `Context` to the Lambda handler as an argument. -public struct LambdaContext: CustomDebugStringConvertible { - final class _Storage { - var requestID: String - var traceID: String - var invokedFunctionARN: String - var deadline: DispatchWallTime - var cognitoIdentity: String? - var clientContext: String? - var logger: Logger - var eventLoop: EventLoop - var allocator: ByteBufferAllocator - - init( + public func getRemainingTime() -> TimeAmount { + let deadline = self.deadline.millisSinceEpoch + let now = DispatchWallTime.now().millisSinceEpoch + + let remaining = deadline - now + return .milliseconds(remaining) + } + + public var debugDescription: String { + "\(Self.self)(requestID: \(self.requestID), traceID: \(self.traceID), invokedFunctionARN: \(self.invokedFunctionARN), cognitoIdentity: \(self.cognitoIdentity ?? "nil"), clientContext: \(self.clientContext ?? "nil"), deadline: \(self.deadline))" + } + + /// This interface is not part of the public API and must not be used by adopters. This API is not part of semver versioning. + public static func __forTestsOnly( requestID: String, traceID: String, invokedFunctionARN: String, - deadline: DispatchWallTime, - cognitoIdentity: String?, - clientContext: String?, + timeout: DispatchTimeInterval, logger: Logger, - eventLoop: EventLoop, - allocator: ByteBufferAllocator - ) { - self.requestID = requestID - self.traceID = traceID - self.invokedFunctionARN = invokedFunctionARN - self.deadline = deadline - self.cognitoIdentity = cognitoIdentity - self.clientContext = clientContext - self.logger = logger - self.eventLoop = eventLoop - self.allocator = allocator + eventLoop: EventLoop + ) -> Context { + Context( + requestID: requestID, + traceID: traceID, + invokedFunctionARN: invokedFunctionARN, + deadline: .now() + timeout, + logger: logger, + eventLoop: eventLoop, + allocator: ByteBufferAllocator() + ) } } - - private var storage: _Storage - - /// The request ID, which identifies the request that triggered the function invocation. - public var requestID: String { - self.storage.requestID - } - - /// The AWS X-Ray tracing header. - public var traceID: String { - self.storage.traceID - } - - /// The ARN of the Lambda function, version, or alias that's specified in the invocation. - public var invokedFunctionARN: String { - self.storage.invokedFunctionARN - } - - /// The timestamp that the function times out - public var deadline: DispatchWallTime { - self.storage.deadline - } - - /// For invocations from the AWS Mobile SDK, data about the Amazon Cognito identity provider. - public var cognitoIdentity: String? { - self.storage.cognitoIdentity - } - - /// For invocations from the AWS Mobile SDK, data about the client application and device. - public var clientContext: String? { - self.storage.clientContext - } - - /// `Logger` to log with - /// - /// - note: The `LogLevel` can be configured using the `LOG_LEVEL` environment variable. - public var logger: Logger { - self.storage.logger - } - - /// The `EventLoop` the Lambda is executed on. Use this to schedule work with. - /// This is useful when implementing the `EventLoopLambdaHandler` protocol. - /// - /// - note: The `EventLoop` is shared with the Lambda runtime engine and should be handled with extra care. - /// Most importantly the `EventLoop` must never be blocked. - public var eventLoop: EventLoop { - self.storage.eventLoop - } - - /// `ByteBufferAllocator` to allocate `ByteBuffer` - /// This is useful when implementing `EventLoopLambdaHandler` - public var allocator: ByteBufferAllocator { - self.storage.allocator - } - - init(requestID: String, - traceID: String, - invokedFunctionARN: String, - deadline: DispatchWallTime, - cognitoIdentity: String? = nil, - clientContext: String? = nil, - logger: Logger, - eventLoop: EventLoop, - allocator: ByteBufferAllocator) { - self.storage = _Storage( - requestID: requestID, - traceID: traceID, - invokedFunctionARN: invokedFunctionARN, - deadline: deadline, - cognitoIdentity: cognitoIdentity, - clientContext: clientContext, - logger: logger, - eventLoop: eventLoop, - allocator: allocator - ) - } - - public func getRemainingTime() -> TimeAmount { - let deadline = self.deadline.millisSinceEpoch - let now = DispatchWallTime.now().millisSinceEpoch - - let remaining = deadline - now - return .milliseconds(remaining) - } - - public var debugDescription: String { - "\(Self.self)(requestID: \(self.requestID), traceID: \(self.traceID), invokedFunctionARN: \(self.invokedFunctionARN), cognitoIdentity: \(self.cognitoIdentity ?? "nil"), clientContext: \(self.clientContext ?? "nil"), deadline: \(self.deadline))" - } - - /// This interface is not part of the public API and must not be used by adopters. This API is not part of semver versioning. - public static func __forTestsOnly( - requestID: String, - traceID: String, - invokedFunctionARN: String, - timeout: DispatchTimeInterval, - logger: Logger, - eventLoop: EventLoop - ) -> LambdaContext { - LambdaContext( - requestID: requestID, - traceID: traceID, - invokedFunctionARN: invokedFunctionARN, - deadline: .now() + timeout, - logger: logger, - eventLoop: eventLoop, - allocator: ByteBufferAllocator() - ) - } } -// MARK: - ShutdownContext - -extension Lambda { - /// Lambda runtime shutdown context. - /// The Lambda runtime generates and passes the `ShutdownContext` to the Lambda handler as an argument. - public final class ShutdownContext { - /// `Logger` to log with - /// - /// - note: The `LogLevel` can be configured using the `LOG_LEVEL` environment variable. - public let logger: Logger +extension AWSLambda.Context: LambdaContext { + public typealias Invocation = AWSLambda.Invocation - /// The `EventLoop` the Lambda is executed on. Use this to schedule work with. - /// - /// - note: The `EventLoop` is shared with the Lambda runtime engine and should be handled with extra care. - /// Most importantly the `EventLoop` must never be blocked. - public let eventLoop: EventLoop - - internal init(logger: Logger, eventLoop: EventLoop) { - self.eventLoop = eventLoop - self.logger = logger - } + public init(logger: Logger, eventLoop: EventLoop, allocator: ByteBufferAllocator, invocation: Invocation) { + self.init(requestID: invocation.requestID, traceID: invocation.traceID, invokedFunctionARN: invocation.invokedFunctionARN, deadline: DispatchWallTime(millisSinceEpoch: invocation.deadlineInMillisSinceEpoch), logger: logger, eventLoop: eventLoop, allocator: allocator) } } diff --git a/Sources/AWSLambdaRuntimeCore/LambdaRunner.swift b/Sources/AWSLambdaRuntimeCore/LambdaRunner.swift deleted file mode 100644 index 38499a05..00000000 --- a/Sources/AWSLambdaRuntimeCore/LambdaRunner.swift +++ /dev/null @@ -1,158 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the SwiftAWSLambdaRuntime open source project -// -// Copyright (c) 2017-2018 Apple Inc. and the SwiftAWSLambdaRuntime project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -import Dispatch -import Logging -import NIOCore - -extension Lambda { - /// LambdaRunner manages the Lambda runtime workflow, or business logic. - internal final class Runner { - private let runtimeClient: RuntimeClient - private let eventLoop: EventLoop - private let allocator: ByteBufferAllocator - - private var isGettingNextInvocation = false - - init(eventLoop: EventLoop, configuration: Configuration) { - self.eventLoop = eventLoop - self.runtimeClient = RuntimeClient(eventLoop: self.eventLoop, configuration: configuration.runtimeEngine) - self.allocator = ByteBufferAllocator() - } - - /// Run the user provided initializer. This *must* only be called once. - /// - /// - Returns: An `EventLoopFuture` fulfilled with the outcome of the initialization. - func initialize(logger: Logger, handlerType: Handler.Type) -> EventLoopFuture { - logger.debug("initializing lambda") - // 1. create the handler from the factory - // 2. report initialization error if one occured - let context = InitializationContext(logger: logger, - eventLoop: self.eventLoop, - allocator: self.allocator) - return Handler.makeHandler(context: context) - // Hopping back to "our" EventLoop is important in case the factory returns a future - // that originated from a foreign EventLoop/EventLoopGroup. - // This can happen if the factory uses a library (let's say a database client) that manages its own threads/loops - // for whatever reason and returns a future that originated from that foreign EventLoop. - .hop(to: self.eventLoop) - .peekError { error in - self.runtimeClient.reportInitializationError(logger: logger, error: error).peekError { reportingError in - // We're going to bail out because the init failed, so there's not a lot we can do other than log - // that we couldn't report this error back to the runtime. - logger.error("failed reporting initialization error to lambda runtime engine: \(reportingError)") - } - } - } - - func run(logger: Logger, handler: Handler) -> EventLoopFuture { - logger.debug("lambda invocation sequence starting") - // 1. request invocation from lambda runtime engine - self.isGettingNextInvocation = true - return self.runtimeClient.getNextInvocation(logger: logger).peekError { error in - logger.error("could not fetch work from lambda runtime engine: \(error)") - }.flatMap { invocation, bytes in - // 2. send invocation to handler - self.isGettingNextInvocation = false - let context = LambdaContext( - logger: logger, - eventLoop: self.eventLoop, - allocator: self.allocator, - invocation: invocation - ) - logger.debug("sending invocation to lambda handler \(handler)") - return handler.handle(bytes, context: context) - // Hopping back to "our" EventLoop is important in case the handler returns a future that - // originiated from a foreign EventLoop/EventLoopGroup. - // This can happen if the handler uses a library (lets say a DB client) that manages its own threads/loops - // for whatever reason and returns a future that originated from that foreign EventLoop. - .hop(to: self.eventLoop) - .mapResult { result in - if case .failure(let error) = result { - logger.warning("lambda handler returned an error: \(error)") - } - return (invocation, result) - } - }.flatMap { invocation, result in - // 3. report results to runtime engine - self.runtimeClient.reportResults(logger: logger, invocation: invocation, result: result).peekError { error in - logger.error("could not report results to lambda runtime engine: \(error)") - } - } - } - - /// cancels the current run, if we are waiting for next invocation (long poll from Lambda control plane) - /// only needed for debugging purposes. - func cancelWaitingForNextInvocation() { - if self.isGettingNextInvocation { - self.runtimeClient.cancel() - } - } - } -} - -extension LambdaContext { - init(logger: Logger, eventLoop: EventLoop, allocator: ByteBufferAllocator, invocation: Invocation) { - self.init(requestID: invocation.requestID, - traceID: invocation.traceID, - invokedFunctionARN: invocation.invokedFunctionARN, - deadline: DispatchWallTime(millisSinceEpoch: invocation.deadlineInMillisSinceEpoch), - cognitoIdentity: invocation.cognitoIdentity, - clientContext: invocation.clientContext, - logger: logger, - eventLoop: eventLoop, - allocator: allocator) - } -} - -// TODO: move to nio? -extension EventLoopFuture { - // callback does not have side effects, failing with original result - func peekError(_ callback: @escaping (Error) -> Void) -> EventLoopFuture { - self.flatMapError { error in - callback(error) - return self - } - } - - // callback does not have side effects, failing with original result - func peekError(_ callback: @escaping (Error) -> EventLoopFuture) -> EventLoopFuture { - self.flatMapError { error in - let promise = self.eventLoop.makePromise(of: Value.self) - callback(error).whenComplete { _ in - promise.completeWith(self) - } - return promise.futureResult - } - } - - func mapResult(_ callback: @escaping (Result) -> NewValue) -> EventLoopFuture { - self.map { value in - callback(.success(value)) - }.flatMapErrorThrowing { error in - callback(.failure(error)) - } - } -} - -extension Result { - private var successful: Bool { - switch self { - case .success: - return true - case .failure: - return false - } - } -} diff --git a/Sources/AWSLambdaRuntimeCore/LambdaRuntime.swift b/Sources/AWSLambdaRuntimeCore/LambdaRuntime.swift deleted file mode 100644 index 46e73d1b..00000000 --- a/Sources/AWSLambdaRuntimeCore/LambdaRuntime.swift +++ /dev/null @@ -1,192 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the SwiftAWSLambdaRuntime open source project -// -// Copyright (c) 2017-2020 Apple Inc. and the SwiftAWSLambdaRuntime project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -import Logging -import NIOConcurrencyHelpers -import NIOCore - -/// `LambdaRuntime` manages the Lambda process lifecycle. -/// -/// - note: It is intended to be used within a single `EventLoop`. For this reason this class is not thread safe. -public final class LambdaRuntime { - private let eventLoop: EventLoop - private let shutdownPromise: EventLoopPromise - private let logger: Logger - private let configuration: Lambda.Configuration - - private var state = State.idle { - willSet { - self.eventLoop.assertInEventLoop() - precondition(newValue.order > self.state.order, "invalid state \(newValue) after \(self.state.order)") - } - } - - /// Create a new `LambdaRuntime`. - /// - /// - parameters: - /// - eventLoop: An `EventLoop` to run the Lambda on. - /// - logger: A `Logger` to log the Lambda events. - public convenience init(eventLoop: EventLoop, logger: Logger) { - self.init(eventLoop: eventLoop, logger: logger, configuration: .init()) - } - - init(eventLoop: EventLoop, logger: Logger, configuration: Lambda.Configuration) { - self.eventLoop = eventLoop - self.shutdownPromise = eventLoop.makePromise(of: Int.self) - self.logger = logger - self.configuration = configuration - } - - deinit { - guard case .shutdown = self.state else { - preconditionFailure("invalid state \(self.state)") - } - } - - /// The `Lifecycle` shutdown future. - /// - /// - Returns: An `EventLoopFuture` that is fulfilled after the Lambda lifecycle has fully shutdown. - public var shutdownFuture: EventLoopFuture { - self.shutdownPromise.futureResult - } - - /// Start the `LambdaRuntime`. - /// - /// - Returns: An `EventLoopFuture` that is fulfilled after the Lambda hander has been created and initiliazed, and a first run has been scheduled. - /// - /// - note: This method must be called on the `EventLoop` the `LambdaRuntime` has been initialized with. - public func start() -> EventLoopFuture { - self.eventLoop.assertInEventLoop() - - logger.info("lambda runtime starting with \(self.configuration)") - self.state = .initializing - - var logger = self.logger - logger[metadataKey: "lifecycleId"] = .string(self.configuration.lifecycle.id) - let runner = Lambda.Runner(eventLoop: self.eventLoop, configuration: self.configuration) - - let startupFuture = runner.initialize(logger: logger, handlerType: Handler.self) - startupFuture.flatMap { handler -> EventLoopFuture<(Handler, Result)> in - // after the startup future has succeeded, we have a handler that we can use - // to `run` the lambda. - let finishedPromise = self.eventLoop.makePromise(of: Int.self) - self.state = .active(runner, handler) - self.run(promise: finishedPromise) - return finishedPromise.futureResult.mapResult { (handler, $0) } - } - .flatMap { handler, runnerResult -> EventLoopFuture in - // after the lambda finishPromise has succeeded or failed we need to - // shutdown the handler - let shutdownContext = Lambda.ShutdownContext(logger: logger, eventLoop: self.eventLoop) - return handler.shutdown(context: shutdownContext).flatMapErrorThrowing { error in - // if, we had an error shuting down the lambda, we want to concatenate it with - // the runner result - logger.error("Error shutting down handler: \(error)") - throw Lambda.RuntimeError.shutdownError(shutdownError: error, runnerResult: runnerResult) - }.flatMapResult { _ -> Result in - // we had no error shutting down the lambda. let's return the runner's result - runnerResult - } - }.always { _ in - // triggered when the Lambda has finished its last run or has a startup failure. - self.markShutdown() - }.cascade(to: self.shutdownPromise) - - return startupFuture.map { _ in } - } - - // MARK: - Private - - #if DEBUG - /// Begin the `LambdaRuntime` shutdown. Only needed for debugging purposes, hence behind a `DEBUG` flag. - public func shutdown() { - // make this method thread safe by dispatching onto the eventloop - self.eventLoop.execute { - let oldState = self.state - self.state = .shuttingdown - if case .active(let runner, _) = oldState { - runner.cancelWaitingForNextInvocation() - } - } - } - #endif - - private func markShutdown() { - self.state = .shutdown - } - - @inline(__always) - private func run(promise: EventLoopPromise) { - func _run(_ count: Int) { - switch self.state { - case .active(let runner, let handler): - if self.configuration.lifecycle.maxTimes > 0, count >= self.configuration.lifecycle.maxTimes { - return promise.succeed(count) - } - var logger = self.logger - logger[metadataKey: "lifecycleIteration"] = "\(count)" - runner.run(logger: logger, handler: handler).whenComplete { result in - switch result { - case .success: - logger.log(level: .debug, "lambda invocation sequence completed successfully") - // recursive! per aws lambda runtime spec the polling requests are to be done one at a time - _run(count + 1) - case .failure(HTTPClient.Errors.cancelled): - if case .shuttingdown = self.state { - // if we ware shutting down, we expect to that the get next - // invocation request might have been cancelled. For this reason we - // succeed the promise here. - logger.log(level: .info, "lambda invocation sequence has been cancelled for shutdown") - return promise.succeed(count) - } - logger.log(level: .error, "lambda invocation sequence has been cancelled unexpectedly") - promise.fail(HTTPClient.Errors.cancelled) - case .failure(let error): - logger.log(level: .error, "lambda invocation sequence completed with error: \(error)") - promise.fail(error) - } - } - case .shuttingdown: - promise.succeed(count) - default: - preconditionFailure("invalid run state: \(self.state)") - } - } - - _run(0) - } - - private enum State { - case idle - case initializing - case active(Lambda.Runner, Handler) - case shuttingdown - case shutdown - - internal var order: Int { - switch self { - case .idle: - return 0 - case .initializing: - return 1 - case .active: - return 2 - case .shuttingdown: - return 3 - case .shutdown: - return 4 - } - } - } -} diff --git a/Sources/AWSLambdaRuntimeCore/LambdaRuntimeClient.swift b/Sources/AWSLambdaRuntimeCore/LambdaRuntimeClient.swift deleted file mode 100644 index 7303ef1c..00000000 --- a/Sources/AWSLambdaRuntimeCore/LambdaRuntimeClient.swift +++ /dev/null @@ -1,148 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the SwiftAWSLambdaRuntime open source project -// -// Copyright (c) 2017-2018 Apple Inc. and the SwiftAWSLambdaRuntime project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -import Logging -import NIOCore -import NIOHTTP1 - -/// An HTTP based client for AWS Runtime Engine. This encapsulates the RESTful methods exposed by the Runtime Engine: -/// * /runtime/invocation/next -/// * /runtime/invocation/response -/// * /runtime/invocation/error -/// * /runtime/init/error -extension Lambda { - internal struct RuntimeClient { - private let eventLoop: EventLoop - private let allocator = ByteBufferAllocator() - private let httpClient: HTTPClient - - init(eventLoop: EventLoop, configuration: Configuration.RuntimeEngine) { - self.eventLoop = eventLoop - self.httpClient = HTTPClient(eventLoop: eventLoop, configuration: configuration) - } - - /// Requests invocation from the control plane. - func getNextInvocation(logger: Logger) -> EventLoopFuture<(Invocation, ByteBuffer)> { - let url = Consts.invocationURLPrefix + Consts.getNextInvocationURLSuffix - logger.debug("requesting work from lambda runtime engine using \(url)") - return self.httpClient.get(url: url, headers: RuntimeClient.defaultHeaders).flatMapThrowing { response in - guard response.status == .ok else { - throw RuntimeError.badStatusCode(response.status) - } - let invocation = try Invocation(headers: response.headers) - guard let event = response.body else { - throw RuntimeError.noBody - } - return (invocation, event) - }.flatMapErrorThrowing { error in - switch error { - case HTTPClient.Errors.timeout: - throw RuntimeError.upstreamError("timeout") - case HTTPClient.Errors.connectionResetByPeer: - throw RuntimeError.upstreamError("connectionResetByPeer") - default: - throw error - } - } - } - - /// Reports a result to the Runtime Engine. - func reportResults(logger: Logger, invocation: Invocation, result: Result) -> EventLoopFuture { - var url = Consts.invocationURLPrefix + "/" + invocation.requestID - var body: ByteBuffer? - let headers: HTTPHeaders - - switch result { - case .success(let buffer): - url += Consts.postResponseURLSuffix - body = buffer - headers = RuntimeClient.defaultHeaders - case .failure(let error): - url += Consts.postErrorURLSuffix - let errorResponse = ErrorResponse(errorType: Consts.functionError, errorMessage: "\(error)") - let bytes = errorResponse.toJSONBytes() - body = self.allocator.buffer(capacity: bytes.count) - body!.writeBytes(bytes) - headers = RuntimeClient.errorHeaders - } - logger.debug("reporting results to lambda runtime engine using \(url)") - return self.httpClient.post(url: url, headers: headers, body: body).flatMapThrowing { response in - guard response.status == .accepted else { - throw RuntimeError.badStatusCode(response.status) - } - return () - }.flatMapErrorThrowing { error in - switch error { - case HTTPClient.Errors.timeout: - throw RuntimeError.upstreamError("timeout") - case HTTPClient.Errors.connectionResetByPeer: - throw RuntimeError.upstreamError("connectionResetByPeer") - default: - throw error - } - } - } - - /// Reports an initialization error to the Runtime Engine. - func reportInitializationError(logger: Logger, error: Error) -> EventLoopFuture { - let url = Consts.postInitErrorURL - let errorResponse = ErrorResponse(errorType: Consts.initializationError, errorMessage: "\(error)") - let bytes = errorResponse.toJSONBytes() - var body = self.allocator.buffer(capacity: bytes.count) - body.writeBytes(bytes) - logger.warning("reporting initialization error to lambda runtime engine using \(url)") - return self.httpClient.post(url: url, headers: RuntimeClient.errorHeaders, body: body).flatMapThrowing { response in - guard response.status == .accepted else { - throw RuntimeError.badStatusCode(response.status) - } - return () - }.flatMapErrorThrowing { error in - switch error { - case HTTPClient.Errors.timeout: - throw RuntimeError.upstreamError("timeout") - case HTTPClient.Errors.connectionResetByPeer: - throw RuntimeError.upstreamError("connectionResetByPeer") - default: - throw error - } - } - } - - /// Cancels the current request, if one is running. Only needed for debugging purposes - func cancel() { - self.httpClient.cancel() - } - } -} - -extension Lambda { - internal enum RuntimeError: Error { - case badStatusCode(HTTPResponseStatus) - case upstreamError(String) - case invocationMissingHeader(String) - case noBody - case json(Error) - case shutdownError(shutdownError: Error, runnerResult: Result) - } -} - -extension Lambda.RuntimeClient { - internal static let defaultHeaders = HTTPHeaders([("user-agent", "Swift-Lambda/Unknown")]) - - /// These headers must be sent along an invocation or initialization error report - internal static let errorHeaders = HTTPHeaders([ - ("user-agent", "Swift-Lambda/Unknown"), - ("lambda-runtime-function-error-type", "Unhandled"), - ]) -} diff --git a/Sources/AWSLambdaRuntimeCore/Lambda+LocalServer.swift b/Sources/AWSLambdaRuntimeCore/LocalServer.swift similarity index 97% rename from Sources/AWSLambdaRuntimeCore/Lambda+LocalServer.swift rename to Sources/AWSLambdaRuntimeCore/LocalServer.swift index 1e09d867..ef2551ca 100644 --- a/Sources/AWSLambdaRuntimeCore/Lambda+LocalServer.swift +++ b/Sources/AWSLambdaRuntimeCore/LocalServer.swift @@ -2,7 +2,7 @@ // // This source file is part of the SwiftAWSLambdaRuntime open source project // -// Copyright (c) 2020 Apple Inc. and the SwiftAWSLambdaRuntime project authors +// Copyright (c) 2020-2022 Apple Inc. and the SwiftAWSLambdaRuntime project authors // Licensed under Apache License v2.0 // // See LICENSE.txt for license information @@ -14,8 +14,8 @@ #if DEBUG import Dispatch +@_spi(Lambda) import LambdaRuntimeCore import Logging -import NIOConcurrencyHelpers import NIOCore import NIOHTTP1 import NIOPosix @@ -28,7 +28,8 @@ import NIOPosix // callback(.success("Hello, \(event)!")) // } // } -extension Lambda { +@_spi(Lambda) +extension AWSLambda { /// Execute code in the context of a mock Lambda server. /// /// - parameters: @@ -36,7 +37,7 @@ extension Lambda { /// - body: Code to run within the context of the mock server. Typically this would be a Lambda.run function call. /// /// - note: This API is designed stricly for local testing and is behind a DEBUG flag - internal static func withLocalServer(invocationEndpoint: String? = nil, _ body: @escaping () -> Value) throws -> Value { + public static func withLocalServer(invocationEndpoint: String?, _ body: @escaping () -> Value) throws -> Value { let server = LocalLambda.Server(invocationEndpoint: invocationEndpoint) try server.start().wait() defer { try! server.stop() } @@ -131,7 +132,7 @@ private enum LocalLambda { guard let work = request.body else { return self.writeResponse(context: context, response: .init(status: .badRequest)) } - let requestID = "\(DispatchTime.now().uptimeNanoseconds)" // FIXME: + let requestID = LambdaRequestID().lowercased let promise = context.eventLoop.makePromise(of: Response.self) promise.futureResult.whenComplete { result in switch result { diff --git a/Sources/AWSLambdaRuntimeCore/Utils.swift b/Sources/AWSLambdaRuntimeCore/Utils.swift index 9924a05b..9ba3a470 100644 --- a/Sources/AWSLambdaRuntimeCore/Utils.swift +++ b/Sources/AWSLambdaRuntimeCore/Utils.swift @@ -2,7 +2,7 @@ // // This source file is part of the SwiftAWSLambdaRuntime open source project // -// Copyright (c) 2017-2018 Apple Inc. and the SwiftAWSLambdaRuntime project authors +// Copyright (c) 2017-2022 Apple Inc. and the SwiftAWSLambdaRuntime project authors // Licensed under Apache License v2.0 // // See LICENSE.txt for license information @@ -13,6 +13,7 @@ //===----------------------------------------------------------------------===// import Dispatch +@_spi(Lambda) import LambdaRuntimeCore import NIOPosix internal enum Consts { @@ -36,75 +37,6 @@ internal enum AmazonHeaders { static let invokedFunctionARN = "Lambda-Runtime-Invoked-Function-Arn" } -/// Helper function to trap signals -internal func trap(signal sig: Signal, handler: @escaping (Signal) -> Void) -> DispatchSourceSignal { - let signalSource = DispatchSource.makeSignalSource(signal: sig.rawValue, queue: DispatchQueue.global()) - signal(sig.rawValue, SIG_IGN) - signalSource.setEventHandler(handler: { - signalSource.cancel() - handler(sig) - }) - signalSource.resume() - return signalSource -} - -internal enum Signal: Int32 { - case HUP = 1 - case INT = 2 - case QUIT = 3 - case ABRT = 6 - case KILL = 9 - case ALRM = 14 - case TERM = 15 -} - -extension DispatchWallTime { - internal init(millisSinceEpoch: Int64) { - let nanoSinceEpoch = UInt64(millisSinceEpoch) * 1_000_000 - let seconds = UInt64(nanoSinceEpoch / 1_000_000_000) - let nanoseconds = nanoSinceEpoch - (seconds * 1_000_000_000) - self.init(timespec: timespec(tv_sec: Int(seconds), tv_nsec: Int(nanoseconds))) - } - - internal var millisSinceEpoch: Int64 { - Int64(bitPattern: self.rawValue) / -1_000_000 - } -} - -extension String { - func encodeAsJSONString(into bytes: inout [UInt8]) { - bytes.append(UInt8(ascii: "\"")) - let stringBytes = self.utf8 - var startCopyIndex = stringBytes.startIndex - var nextIndex = startCopyIndex - - while nextIndex != stringBytes.endIndex { - switch stringBytes[nextIndex] { - case 0 ..< 32, UInt8(ascii: "\""), UInt8(ascii: "\\"): - // All Unicode characters may be placed within the - // quotation marks, except for the characters that MUST be escaped: - // quotation mark, reverse solidus, and the control characters (U+0000 - // through U+001F). - // https://tools.ietf.org/html/rfc7159#section-7 - - // copy the current range over - bytes.append(contentsOf: stringBytes[startCopyIndex ..< nextIndex]) - bytes.append(UInt8(ascii: "\\")) - bytes.append(stringBytes[nextIndex]) - - nextIndex = stringBytes.index(after: nextIndex) - startCopyIndex = nextIndex - default: - nextIndex = stringBytes.index(after: nextIndex) - } - } - - // copy everything, that hasn't been copied yet - bytes.append(contentsOf: stringBytes[startCopyIndex ..< nextIndex]) - bytes.append(UInt8(ascii: "\"")) - } -} - extension AmazonHeaders { /// Generates (X-Ray) trace ID. /// # Trace ID Format diff --git a/Sources/AWSLambdaTesting/Lambda+Testing.swift b/Sources/AWSLambdaTesting/Lambda+Testing.swift index f514f38f..fc564652 100644 --- a/Sources/AWSLambdaTesting/Lambda+Testing.swift +++ b/Sources/AWSLambdaTesting/Lambda+Testing.swift @@ -51,8 +51,7 @@ extension Lambda { public init(requestID: String = "\(DispatchTime.now().uptimeNanoseconds)", traceID: String = "Root=\(DispatchTime.now().uptimeNanoseconds);Parent=\(DispatchTime.now().uptimeNanoseconds);Sampled=1", invokedFunctionARN: String = "arn:aws:lambda:us-west-1:\(DispatchTime.now().uptimeNanoseconds):function:custom-runtime", - timeout: DispatchTimeInterval = .seconds(5)) - { + timeout: DispatchTimeInterval = .seconds(5)) { self.requestID = requestID self.traceID = traceID self.invokedFunctionARN = invokedFunctionARN @@ -64,7 +63,7 @@ extension Lambda { _ handlerType: Handler.Type, with event: Handler.Event, using config: TestConfig = .init() - ) throws -> Handler.Output { + ) throws -> Handler.Output where Handler.Provider.Context == AWSLambda.Context { let logger = Logger(label: "test") let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) defer { @@ -78,7 +77,7 @@ extension Lambda { eventLoop: eventLoop ) - let context = LambdaContext.__forTestsOnly( + let context = AWSLambda.Context.__forTestsOnly( requestID: config.requestID, traceID: config.traceID, invokedFunctionARN: config.invokedFunctionARN, diff --git a/Sources/LambdaRuntimeCore/ControlPlaneRequest.swift b/Sources/LambdaRuntimeCore/ControlPlaneRequest.swift new file mode 100644 index 00000000..f062fc4d --- /dev/null +++ b/Sources/LambdaRuntimeCore/ControlPlaneRequest.swift @@ -0,0 +1,56 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftAWSLambdaRuntime open source project +// +// Copyright (c) 2021-2022 Apple Inc. and the SwiftAWSLambdaRuntime project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import NIOCore +import NIOHTTP1 + +public protocol LambdaInvocation: Hashable { + @_spi(Lambda) var requestID: String { get } + @_spi(Lambda) init(headers: HTTPHeaders) throws +} + +@_spi(Lambda) +public enum ControlPlaneRequest: Hashable { + case next + case invocationResponse(LambdaRequestID, ByteBuffer?) + case invocationError(LambdaRequestID, ErrorResponse) + case initializationError(ErrorResponse) +} + +@_spi(Lambda) +public enum ControlPlaneResponse: Hashable { + case next(Invocation, ByteBuffer) + case accepted + case error(ErrorResponse) +} + +@_spi(Lambda) +public struct ErrorResponse: Hashable, Codable { + public var errorType: String + public var errorMessage: String +} + +@_spi(Lambda) +extension ErrorResponse { + public func toJSONBytes() -> [UInt8] { + var bytes = [UInt8]() + bytes.append(UInt8(ascii: "{")) + bytes.append(contentsOf: #""errorType":"#.utf8) + self.errorType.encodeAsJSONString(into: &bytes) + bytes.append(contentsOf: #","errorMessage":"#.utf8) + self.errorMessage.encodeAsJSONString(into: &bytes) + bytes.append(UInt8(ascii: "}")) + return bytes + } +} diff --git a/Sources/LambdaRuntimeCore/ControlPlaneRequestEncoder.swift b/Sources/LambdaRuntimeCore/ControlPlaneRequestEncoder.swift new file mode 100644 index 00000000..d58b8786 --- /dev/null +++ b/Sources/LambdaRuntimeCore/ControlPlaneRequestEncoder.swift @@ -0,0 +1,22 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftAWSLambdaRuntime open source project +// +// Copyright (c) 2021-2022 Apple Inc. and the SwiftAWSLambdaRuntime project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import NIOCore + +public protocol ControlPlaneRequestEncoder: _EmittingChannelHandler where OutboundOut == ByteBuffer { + @_spi(Lambda) init(host: String) + @_spi(Lambda) mutating func writeRequest(_ request: ControlPlaneRequest, context: ChannelHandlerContext, promise: EventLoopPromise?) + @_spi(Lambda) mutating func writerAdded(context: ChannelHandlerContext) + @_spi(Lambda) mutating func writerRemoved(context: ChannelHandlerContext) +} diff --git a/Sources/LambdaRuntimeCore/ControlPlaneResponseDecoder.swift b/Sources/LambdaRuntimeCore/ControlPlaneResponseDecoder.swift new file mode 100644 index 00000000..1f13da44 --- /dev/null +++ b/Sources/LambdaRuntimeCore/ControlPlaneResponseDecoder.swift @@ -0,0 +1,25 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftAWSLambdaRuntime open source project +// +// Copyright (c) 2022 Apple Inc. and the SwiftAWSLambdaRuntime project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import NIOCore + +public protocol ControlPlaneResponseDecoder: NIOSingleStepByteToMessageDecoder { + associatedtype Invocation: LambdaInvocation + @_spi(Lambda) init() +} + +@_spi(Lambda) +extension ControlPlaneResponseDecoder where InboundOut == ControlPlaneResponse { + public typealias Response = ControlPlaneResponse +} diff --git a/Sources/AWSLambdaRuntimeCore/HTTPClient.swift b/Sources/LambdaRuntimeCore/HTTPClient.swift similarity index 99% rename from Sources/AWSLambdaRuntimeCore/HTTPClient.swift rename to Sources/LambdaRuntimeCore/HTTPClient.swift index 045cd968..9921e2f3 100644 --- a/Sources/AWSLambdaRuntimeCore/HTTPClient.swift +++ b/Sources/LambdaRuntimeCore/HTTPClient.swift @@ -12,7 +12,6 @@ // //===----------------------------------------------------------------------===// -import NIOConcurrencyHelpers import NIOCore import NIOHTTP1 import NIOPosix diff --git a/Sources/AWSLambdaRuntimeCore/Lambda+String.swift b/Sources/LambdaRuntimeCore/Lambda+String.swift similarity index 100% rename from Sources/AWSLambdaRuntimeCore/Lambda+String.swift rename to Sources/LambdaRuntimeCore/Lambda+String.swift diff --git a/Sources/LambdaRuntimeCore/Lambda.swift b/Sources/LambdaRuntimeCore/Lambda.swift new file mode 100644 index 00000000..9d39485d --- /dev/null +++ b/Sources/LambdaRuntimeCore/Lambda.swift @@ -0,0 +1,34 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftAWSLambdaRuntime open source project +// +// Copyright (c) 2017-2018 Apple Inc. and the SwiftAWSLambdaRuntime project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +#if os(Linux) +import Glibc +#else +import Darwin.C +#endif + +import Backtrace +import Logging +import NIOCore +import NIOPosix + +public enum Lambda { + /// Utility to access/read environment variables + public static func env(_ name: String) -> String? { + guard let value = getenv(name) else { + return nil + } + return String(cString: value) + } +} diff --git a/Sources/AWSLambdaRuntimeCore/LambdaConfiguration.swift b/Sources/LambdaRuntimeCore/LambdaConfiguration.swift similarity index 70% rename from Sources/AWSLambdaRuntimeCore/LambdaConfiguration.swift rename to Sources/LambdaRuntimeCore/LambdaConfiguration.swift index c2615a9a..6bb61aa4 100644 --- a/Sources/AWSLambdaRuntimeCore/LambdaConfiguration.swift +++ b/Sources/LambdaRuntimeCore/LambdaConfiguration.swift @@ -17,36 +17,37 @@ import Logging import NIOCore extension Lambda { - internal struct Configuration: CustomStringConvertible { - let general: General - let lifecycle: Lifecycle - let runtimeEngine: RuntimeEngine + @_spi(Lambda) + public struct Configuration: CustomStringConvertible { + public let general: General + public let lifecycle: Lifecycle + public let runtimeEngine: RuntimeEngine - init() { - self.init(general: .init(), lifecycle: .init(), runtimeEngine: .init()) + public init(runtimeEngine address: String? = nil) { + self.init(general: .init(), lifecycle: .init(), runtimeEngine: .init(address: address)) } - init(general: General? = nil, lifecycle: Lifecycle? = nil, runtimeEngine: RuntimeEngine? = nil) { + public init(general: General? = nil, lifecycle: Lifecycle? = nil, runtimeEngine: RuntimeEngine? = nil) { self.general = general ?? General() self.lifecycle = lifecycle ?? Lifecycle() self.runtimeEngine = runtimeEngine ?? RuntimeEngine() } - struct General: CustomStringConvertible { - let logLevel: Logger.Level + public struct General: CustomStringConvertible { + public let logLevel: Logger.Level init(logLevel: Logger.Level? = nil) { self.logLevel = logLevel ?? env("LOG_LEVEL").flatMap(Logger.Level.init) ?? .info } - var description: String { + public var description: String { "\(General.self)(logLevel: \(self.logLevel))" } } - struct Lifecycle: CustomStringConvertible { - let id: String - let maxTimes: Int + public struct Lifecycle: CustomStringConvertible { + public let id: String + public let maxTimes: Int let stopSignal: Signal init(id: String? = nil, maxTimes: Int? = nil, stopSignal: Signal? = nil) { @@ -56,18 +57,18 @@ extension Lambda { precondition(self.maxTimes >= 0, "maxTimes must be equal or larger than 0") } - var description: String { + public var description: String { "\(Lifecycle.self)(id: \(self.id), maxTimes: \(self.maxTimes), stopSignal: \(self.stopSignal))" } } - struct RuntimeEngine: CustomStringConvertible { - let ip: String - let port: Int - let requestTimeout: TimeAmount? + public struct RuntimeEngine: CustomStringConvertible { + public let ip: String + public let port: Int + public let requestTimeout: TimeAmount? init(address: String? = nil, keepAlive: Bool? = nil, requestTimeout: TimeAmount? = nil) { - let ipPort = (address ?? env("AWS_LAMBDA_RUNTIME_API"))?.split(separator: ":") ?? ["127.0.0.1", "7000"] + let ipPort = address?.split(separator: ":") ?? ["127.0.0.1", "7000"] guard ipPort.count == 2, let port = Int(ipPort[1]) else { preconditionFailure("invalid ip+port configuration \(ipPort)") } @@ -76,12 +77,12 @@ extension Lambda { self.requestTimeout = requestTimeout ?? env("REQUEST_TIMEOUT").flatMap(Int64.init).flatMap { .milliseconds($0) } } - var description: String { + public var description: String { "\(RuntimeEngine.self)(ip: \(self.ip), port: \(self.port), requestTimeout: \(String(describing: self.requestTimeout))" } } - var description: String { + public var description: String { "\(Configuration.self)\n \(self.general))\n \(self.lifecycle)\n \(self.runtimeEngine)" } } diff --git a/Sources/LambdaRuntimeCore/LambdaContext.swift b/Sources/LambdaRuntimeCore/LambdaContext.swift new file mode 100644 index 00000000..acb902d9 --- /dev/null +++ b/Sources/LambdaRuntimeCore/LambdaContext.swift @@ -0,0 +1,96 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftAWSLambdaRuntime open source project +// +// Copyright (c) 2017-2020 Apple Inc. and the SwiftAWSLambdaRuntime project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Dispatch +import Logging +import NIOCore + +// MARK: - InitializationContext + +extension Lambda { + /// Lambda runtime initialization context. + /// The Lambda runtime generates and passes the `InitializationContext` to the Handlers + /// ``ByteBufferLambdaHandler/makeHandler(context:)`` or ``LambdaHandler/init(context:)`` + /// as an argument. + public struct InitializationContext { + /// `Logger` to log with + /// + /// - note: The `LogLevel` can be configured using the `LOG_LEVEL` environment variable. + public let logger: Logger + + /// The `EventLoop` the Lambda is executed on. Use this to schedule work with. + /// + /// - note: The `EventLoop` is shared with the Lambda runtime engine and should be handled with extra care. + /// Most importantly the `EventLoop` must never be blocked. + public let eventLoop: EventLoop + + /// `ByteBufferAllocator` to allocate `ByteBuffer` + public let allocator: ByteBufferAllocator + + init(logger: Logger, eventLoop: EventLoop, allocator: ByteBufferAllocator) { + self.eventLoop = eventLoop + self.logger = logger + self.allocator = allocator + } + + /// This interface is not part of the public API and must not be used by adopters. This API is not part of semver versioning. + public static func __forTestsOnly( + logger: Logger, + eventLoop: EventLoop + ) -> InitializationContext { + InitializationContext( + logger: logger, + eventLoop: eventLoop, + allocator: ByteBufferAllocator() + ) + } + } +} + +// MARK: - Context + +public protocol LambdaContext: CustomDebugStringConvertible { + associatedtype Invocation: LambdaInvocation + + var requestID: String { get } + var logger: Logger { get } + var eventLoop: EventLoop { get } + var allocator: ByteBufferAllocator { get } + + init(logger: Logger, eventLoop: EventLoop, allocator: ByteBufferAllocator, invocation: Invocation) +} + +// MARK: - ShutdownContext + +extension Lambda { + /// Lambda runtime shutdown context. + /// The Lambda runtime generates and passes the `ShutdownContext` to the Lambda handler as an argument. + public final class ShutdownContext { + /// `Logger` to log with + /// + /// - note: The `LogLevel` can be configured using the `LOG_LEVEL` environment variable. + public let logger: Logger + + /// The `EventLoop` the Lambda is executed on. Use this to schedule work with. + /// + /// - note: The `EventLoop` is shared with the Lambda runtime engine and should be handled with extra care. + /// Most importantly the `EventLoop` must never be blocked. + public let eventLoop: EventLoop + + internal init(logger: Logger, eventLoop: EventLoop) { + self.eventLoop = eventLoop + self.logger = logger + } + } +} diff --git a/Sources/AWSLambdaRuntimeCore/LambdaHandler.swift b/Sources/LambdaRuntimeCore/LambdaHandler.swift similarity index 78% rename from Sources/AWSLambdaRuntimeCore/LambdaHandler.swift rename to Sources/LambdaRuntimeCore/LambdaHandler.swift index 3c2697ff..87239ab6 100644 --- a/Sources/AWSLambdaRuntimeCore/LambdaHandler.swift +++ b/Sources/LambdaRuntimeCore/LambdaHandler.swift @@ -14,10 +14,44 @@ import Dispatch import NIOCore +#if canImport(Darwin) +import Darwin +#else +import Glibc +#endif // MARK: - LambdaHandler #if compiler(>=5.5) && canImport(_Concurrency) +#if compiler(>=5.7) +/// Strongly typed, processing protocol for a Lambda that takes a user defined +/// ``EventLoopLambdaHandler/Event`` and returns a user defined +/// ``EventLoopLambdaHandler/Output`` asynchronously. +/// +/// - note: Most users should implement this protocol instead of the lower +/// level protocols ``EventLoopLambdaHandler`` and +/// ``ByteBufferLambdaHandler``. +@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *) +public protocol LambdaHandler: EventLoopLambdaHandler { + /// The Lambda initialization method + /// Use this method to initialize resources that will be used in every request. + /// + /// Examples for this can be HTTP or database clients. + /// - parameters: + /// - context: Runtime `InitializationContext`. + init(context: Lambda.InitializationContext) async throws + + /// The Lambda handling method + /// Concrete Lambda handlers implement this method to provide the Lambda functionality. + /// + /// - parameters: + /// - event: Event of type `Event` representing the event or request. + /// - context: Runtime `Context`. + /// + /// - Returns: A Lambda result ot type `Output`. + func handle(_ event: Event, context: Context) async throws -> Output +} +#else /// Strongly typed, processing protocol for a Lambda that takes a user defined /// ``EventLoopLambdaHandler/Event`` and returns a user defined /// ``EventLoopLambdaHandler/Output`` asynchronously. @@ -43,9 +77,11 @@ public protocol LambdaHandler: EventLoopLambdaHandler { /// - context: Runtime `Context`. /// /// - Returns: A Lambda result ot type `Output`. - func handle(_ event: Event, context: LambdaContext) async throws -> Output + func handle(_ event: Event, context: Context) async throws -> Output } +#endif +// @_spi(Lambda) @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *) extension LambdaHandler { public static func makeHandler(context: Lambda.InitializationContext) -> EventLoopFuture { @@ -56,7 +92,7 @@ extension LambdaHandler { return promise.futureResult } - public func handle(_ event: Event, context: LambdaContext) -> EventLoopFuture { + public func handle(_ event: Event, context: Context) -> EventLoopFuture { let promise = context.eventLoop.makePromise(of: Output.self) promise.completeWithTask { try await self.handle(event, context: context) @@ -100,7 +136,7 @@ public protocol EventLoopLambdaHandler: ByteBufferLambdaHandler { /// /// - Returns: An `EventLoopFuture` to report the result of the Lambda back to the runtime engine. /// The `EventLoopFuture` should be completed with either a response of type `Output` or an `Error` - func handle(_ event: Event, context: LambdaContext) -> EventLoopFuture + func handle(_ event: Event, context: Context) -> EventLoopFuture /// Encode a response of type `Output` to `ByteBuffer` /// Concrete Lambda handlers implement this method to provide coding functionality. @@ -124,7 +160,7 @@ public protocol EventLoopLambdaHandler: ByteBufferLambdaHandler { extension EventLoopLambdaHandler { /// Driver for `ByteBuffer` -> `Event` decoding and `Output` -> `ByteBuffer` encoding @inlinable - public func handle(_ event: ByteBuffer, context: LambdaContext) -> EventLoopFuture { + public func handle(_ event: ByteBuffer, context: Context) -> EventLoopFuture { let input: Event do { input = try self.decode(buffer: event) @@ -158,6 +194,8 @@ extension EventLoopLambdaHandler where Output == Void { /// ``LambdaHandler`` based APIs. /// Most users are not expected to use this protocol. public protocol ByteBufferLambdaHandler { + associatedtype Provider: LambdaProvider + /// Create your Lambda handler for the runtime. /// /// Use this to initialize all your resources that you want to cache between invocations. This could be database @@ -175,7 +213,7 @@ public protocol ByteBufferLambdaHandler { /// /// - Returns: An `EventLoopFuture` to report the result of the Lambda back to the runtime engine. /// The `EventLoopFuture` should be completed with either a response encoded as `ByteBuffer` or an `Error` - func handle(_ event: ByteBuffer, context: LambdaContext) -> EventLoopFuture + func handle(_ event: ByteBuffer, context: Provider.Context) -> EventLoopFuture /// Clean up the Lambda resources asynchronously. /// Concrete Lambda handlers implement this method to shutdown resources like `HTTPClient`s and database connections. @@ -185,6 +223,10 @@ public protocol ByteBufferLambdaHandler { func shutdown(context: Lambda.ShutdownContext) -> EventLoopFuture } +extension ByteBufferLambdaHandler { + public typealias Context = Provider.Context +} + extension ByteBufferLambdaHandler { public func shutdown(context: Lambda.ShutdownContext) -> EventLoopFuture { context.eventLoop.makeSucceededFuture(()) @@ -201,7 +243,22 @@ extension ByteBufferLambdaHandler { /// The lambda runtime provides a default implementation of the method that manages the launch /// process. public static func main() { - _ = Lambda.run(configuration: .init(), handlerType: Self.self) + #if DEBUG + if Lambda.env("LOCAL_LAMBDA_SERVER_ENABLED").flatMap(Bool.init) ?? false { + do { + return try Provider.withLocalServer { + NewLambdaRuntime.run(handlerType: Self.self) + } + } catch { + print(error) + exit(1) + } + } else { + NewLambdaRuntime.run(handlerType: Self.self) + } + #else + NewLambdaRuntime.run(handlerType: Self.self) + #endif } } diff --git a/Sources/LambdaRuntimeCore/LambdaProvider.swift b/Sources/LambdaRuntimeCore/LambdaProvider.swift new file mode 100644 index 00000000..b3bfca4e --- /dev/null +++ b/Sources/LambdaRuntimeCore/LambdaProvider.swift @@ -0,0 +1,35 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftAWSLambdaRuntime open source project +// +// Copyright (c) 2022 Apple Inc. and the SwiftAWSLambdaRuntime project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +public protocol LambdaProvider { + associatedtype Invocation + associatedtype RequestEncoder: ControlPlaneRequestEncoder + associatedtype Context: LambdaContext where Context.Invocation == Self.Invocation + associatedtype ResponseDecoder: ControlPlaneResponseDecoder where ResponseDecoder.Invocation == Self.Invocation + + static var runtimeEngineAddress: String? { get } + #if DEBUG + @_spi(Lambda) + static func withLocalServer(invocationEndpoint: String?, _ body: @escaping () -> Value) throws -> Value + #endif +} + +#if DEBUG +@_spi(Lambda) +extension LambdaProvider { + public static func withLocalServer(_ body: @escaping () -> Value) throws -> Value { + try self.withLocalServer(invocationEndpoint: nil, body) + } +} +#endif diff --git a/Sources/AWSLambdaRuntimeCore/LambdaRequestID.swift b/Sources/LambdaRuntimeCore/LambdaRequestID.swift similarity index 93% rename from Sources/AWSLambdaRuntimeCore/LambdaRequestID.swift rename to Sources/LambdaRuntimeCore/LambdaRequestID.swift index 86178ff4..6e812cbd 100644 --- a/Sources/AWSLambdaRuntimeCore/LambdaRequestID.swift +++ b/Sources/LambdaRuntimeCore/LambdaRequestID.swift @@ -17,7 +17,8 @@ import NIOCore // This is heavily inspired by: // https://github.com/swift-extras/swift-extras-uuid -struct LambdaRequestID { +@_spi(Lambda) +public struct LambdaRequestID { typealias uuid_t = (UInt8, UInt8, UInt8, UInt8, UInt8, UInt8, UInt8, UInt8, UInt8, UInt8, UInt8, UInt8, UInt8, UInt8, UInt8, UInt8) var uuid: uuid_t { @@ -27,11 +28,11 @@ struct LambdaRequestID { static let null: uuid_t = (0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) /// Creates a random [v4](https://tools.ietf.org/html/rfc4122#section-4.1.3) UUID. - init() { + public init() { self = Self.generateRandom() } - init?(uuidString: String) { + public init?(uuidString: String) { guard uuidString.utf8.count == 36 else { return nil } @@ -64,13 +65,8 @@ struct LambdaRequestID { private let _uuid: uuid_t - /// Returns a string representation for the `LambdaRequestID`, such as "E621E1F8-C36C-495A-93FC-0C247A3E6E5F" - var uuidString: String { - self.uppercased - } - /// Returns a lowercase string representation for the `LambdaRequestID`, such as "e621e1f8-c36c-495a-93fc-0c247a3e6e5f" - var lowercased: String { + public var lowercased: String { var bytes = self.toAsciiBytesOnStack(characters: Self.lowercaseLookup) return withUnsafeBytes(of: &bytes) { String(decoding: $0, as: Unicode.UTF8.self) @@ -78,7 +74,7 @@ struct LambdaRequestID { } /// Returns an uppercase string representation for the `LambdaRequestID`, such as "E621E1F8-C36C-495A-93FC-0C247A3E6E5F" - var uppercased: String { + public var uppercased: String { var bytes = self.toAsciiBytesOnStack(characters: Self.uppercaseLookup) return withUnsafeBytes(of: &bytes) { String(decoding: $0, as: Unicode.UTF8.self) @@ -113,7 +109,7 @@ struct LambdaRequestID { extension LambdaRequestID: Equatable { // sadly no auto conformance from the compiler - static func == (lhs: Self, rhs: Self) -> Bool { + public static func == (lhs: Self, rhs: Self) -> Bool { lhs._uuid.0 == rhs._uuid.0 && lhs._uuid.1 == rhs._uuid.1 && lhs._uuid.2 == rhs._uuid.2 && @@ -134,7 +130,7 @@ extension LambdaRequestID: Equatable { } extension LambdaRequestID: Hashable { - func hash(into hasher: inout Hasher) { + public func hash(into hasher: inout Hasher) { var value = self._uuid withUnsafeBytes(of: &value) { ptr in hasher.combine(bytes: ptr) @@ -143,19 +139,19 @@ extension LambdaRequestID: Hashable { } extension LambdaRequestID: CustomStringConvertible { - var description: String { - self.uuidString + public var description: String { + self.lowercased } } extension LambdaRequestID: CustomDebugStringConvertible { - var debugDescription: String { - self.uuidString + public var debugDescription: String { + self.lowercased } } -extension LambdaRequestID: Decodable { - init(from decoder: Decoder) throws { +extension LambdaRequestID: Codable { + public init(from decoder: Decoder) throws { let container = try decoder.singleValueContainer() let uuidString = try container.decode(String.self) @@ -165,12 +161,10 @@ extension LambdaRequestID: Decodable { self = uuid } -} -extension LambdaRequestID: Encodable { - func encode(to encoder: Encoder) throws { + public func encode(to encoder: Encoder) throws { var container = encoder.singleValueContainer() - try container.encode(self.uuidString) + try container.encode(self.lowercased) } } @@ -320,8 +314,9 @@ extension LambdaRequestID { } } +@_spi(Lambda) extension ByteBuffer { - func getRequestID(at index: Int) -> LambdaRequestID? { + public func getRequestID(at index: Int) -> LambdaRequestID? { guard let range = self.rangeWithinReadableBytes(index: index, length: 36) else { return nil } @@ -330,7 +325,7 @@ extension ByteBuffer { } } - mutating func readRequestID() -> LambdaRequestID? { + public mutating func readRequestID() -> LambdaRequestID? { guard let requestID = self.getRequestID(at: self.readerIndex) else { return nil } @@ -339,21 +334,22 @@ extension ByteBuffer { } @discardableResult - mutating func setRequestID(_ requestID: LambdaRequestID, at index: Int) -> Int { + public mutating func setRequestID(_ requestID: LambdaRequestID, at index: Int) -> Int { var localBytes = requestID.toAsciiBytesOnStack(characters: LambdaRequestID.lowercaseLookup) return withUnsafeBytes(of: &localBytes) { self.setBytes($0, at: index) } } - mutating func writeRequestID(_ requestID: LambdaRequestID) -> Int { + @discardableResult + public mutating func writeRequestID(_ requestID: LambdaRequestID) -> Int { let length = self.setRequestID(requestID, at: self.writerIndex) self.moveWriterIndex(forwardBy: length) return length } // copy and pasted from NIOCore - func rangeWithinReadableBytes(index: Int, length: Int) -> Range? { + public func rangeWithinReadableBytes(index: Int, length: Int) -> Range? { guard index >= self.readerIndex && length >= 0 else { return nil } diff --git a/Sources/LambdaRuntimeCore/LambdaRunner.swift b/Sources/LambdaRuntimeCore/LambdaRunner.swift new file mode 100644 index 00000000..2548682d --- /dev/null +++ b/Sources/LambdaRuntimeCore/LambdaRunner.swift @@ -0,0 +1,58 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftAWSLambdaRuntime open source project +// +// Copyright (c) 2017-2018 Apple Inc. and the SwiftAWSLambdaRuntime project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Dispatch +import Logging +import NIOCore + +// TODO: move to nio? +extension EventLoopFuture { + // callback does not have side effects, failing with original result + func peekError(_ callback: @escaping (Error) -> Void) -> EventLoopFuture { + self.flatMapError { error in + callback(error) + return self + } + } + + // callback does not have side effects, failing with original result + func peekError(_ callback: @escaping (Error) -> EventLoopFuture) -> EventLoopFuture { + self.flatMapError { error in + let promise = self.eventLoop.makePromise(of: Value.self) + callback(error).whenComplete { _ in + promise.completeWith(self) + } + return promise.futureResult + } + } + + func mapResult(_ callback: @escaping (Result) -> NewValue) -> EventLoopFuture { + self.map { value in + callback(.success(value)) + }.flatMapErrorThrowing { error in + callback(.failure(error)) + } + } +} + +extension Result { + private var successful: Bool { + switch self { + case .success: + return true + case .failure: + return false + } + } +} diff --git a/Sources/LambdaRuntimeCore/LambdaRuntimeError.swift b/Sources/LambdaRuntimeCore/LambdaRuntimeError.swift new file mode 100644 index 00000000..481c62fc --- /dev/null +++ b/Sources/LambdaRuntimeCore/LambdaRuntimeError.swift @@ -0,0 +1,70 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftAWSLambdaRuntime open source project +// +// Copyright (c) 2022 Apple Inc. and the SwiftAWSLambdaRuntime project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +@_spi(Lambda) +public struct LambdaRuntimeError: Error, Hashable { + enum Base: Hashable { + case unsolicitedResponse + case unexpectedStatusCode + + case responseHeadInvalidStatusLine + case responseHeadMissingContentLengthOrTransferEncodingChunked + case responseHeadMoreThan256BytesBeforeCRLF + case responseHeadHeaderInvalidCharacter + case responseHeadHeaderMissingColon + case responseHeadHeaderMissingFieldValue + case responseHeadInvalidHeader + case responseHeadInvalidContentLengthValue + case responseHeadInvalidRequestIDValue + case responseHeadInvalidTraceIDValue + case responseHeadInvalidDeadlineValue + + case invocationHeadMissingRequestID + case invocationHeadMissingDeadlineInMillisSinceEpoch + case invocationHeadMissingFunctionARN + case invocationHeadMissingTraceID + + case controlPlaneErrorResponse(ErrorResponse) + } + + private let base: Base + + private init(_ base: Base) { + self.base = base + } + + public static var unsolicitedResponse = LambdaRuntimeError(.unsolicitedResponse) + public static var unexpectedStatusCode = LambdaRuntimeError(.unexpectedStatusCode) + public static var responseHeadInvalidStatusLine = LambdaRuntimeError(.responseHeadInvalidStatusLine) + public static var responseHeadMissingContentLengthOrTransferEncodingChunked = + LambdaRuntimeError(.responseHeadMissingContentLengthOrTransferEncodingChunked) + public static var responseHeadMoreThan256BytesBeforeCRLF = LambdaRuntimeError(.responseHeadMoreThan256BytesBeforeCRLF) + public static var responseHeadHeaderInvalidCharacter = LambdaRuntimeError(.responseHeadHeaderInvalidCharacter) + public static var responseHeadHeaderMissingColon = LambdaRuntimeError(.responseHeadHeaderMissingColon) + public static var responseHeadHeaderMissingFieldValue = LambdaRuntimeError(.responseHeadHeaderMissingFieldValue) + public static var responseHeadInvalidHeader = LambdaRuntimeError(.responseHeadInvalidHeader) + public static var responseHeadInvalidContentLengthValue = LambdaRuntimeError(.responseHeadInvalidContentLengthValue) + public static var responseHeadInvalidRequestIDValue = LambdaRuntimeError(.responseHeadInvalidRequestIDValue) + public static var responseHeadInvalidTraceIDValue = LambdaRuntimeError(.responseHeadInvalidTraceIDValue) + public static var responseHeadInvalidDeadlineValue = LambdaRuntimeError(.responseHeadInvalidDeadlineValue) + + public static var invocationHeadMissingRequestID = LambdaRuntimeError(.invocationHeadMissingRequestID) + public static var invocationHeadMissingDeadlineInMillisSinceEpoch = LambdaRuntimeError(.invocationHeadMissingDeadlineInMillisSinceEpoch) + public static var invocationHeadMissingFunctionARN = LambdaRuntimeError(.invocationHeadMissingFunctionARN) + public static var invocationHeadMissingTraceID = LambdaRuntimeError(.invocationHeadMissingTraceID) + + public static func controlPlaneErrorResponse(_ response: ErrorResponse) -> Self { + LambdaRuntimeError(.controlPlaneErrorResponse(response)) + } +} diff --git a/Sources/LambdaRuntimeCore/NewLambdaChannelHandler.swift b/Sources/LambdaRuntimeCore/NewLambdaChannelHandler.swift new file mode 100644 index 00000000..fdba8217 --- /dev/null +++ b/Sources/LambdaRuntimeCore/NewLambdaChannelHandler.swift @@ -0,0 +1,86 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftAWSLambdaRuntime open source project +// +// Copyright (c) 2022 Apple Inc. and the SwiftAWSLambdaRuntime project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import NIOCore + +protocol LambdaChannelHandlerDelegate { + associatedtype Handler: ByteBufferLambdaHandler + + func responseReceived(_: ControlPlaneResponse) + + func errorCaught(_: Error) + + func channelInactive() +} + +final class NewLambdaChannelHandler: ChannelInboundHandler { + typealias InboundIn = ByteBuffer + typealias OutboundOut = ByteBuffer + + private let delegate: Delegate + private var requestsInFlight: CircularBuffer + + private var context: ChannelHandlerContext! + + private var encoder: Delegate.Handler.Provider.RequestEncoder + private var decoder: NIOSingleStepByteToMessageProcessor + + init(delegate: Delegate, host: String) { + precondition(Delegate.Handler.Provider.ResponseDecoder.InboundOut.self == ControlPlaneResponse.self) + + self.delegate = delegate + self.requestsInFlight = CircularBuffer(initialCapacity: 4) + + self.encoder = .init(host: host) + self.decoder = NIOSingleStepByteToMessageProcessor(.init(), maximumBufferSize: 7 * 1024 * 1024) + } + + func sendRequest(_ request: ControlPlaneRequest) { + self.requestsInFlight.append(request) + self.encoder.writeRequest(request, context: self.context, promise: nil) + } + + func handlerAdded(context: ChannelHandlerContext) { + self.context = context + self.encoder.writerAdded(context: context) + } + + func handlerRemoved(context: ChannelHandlerContext) { + self.context = context + self.encoder.writerRemoved(context: context) + } + + func channelRead(context: ChannelHandlerContext, data: NIOAny) { + do { + let buffer = self.unwrapInboundIn(data) + try self.decoder.process(buffer: buffer) { response in + guard self.requestsInFlight.popFirst() != nil else { + throw LambdaRuntimeError.unsolicitedResponse + } + + self.delegate.responseReceived(response as! ControlPlaneResponse) + } + } catch { + self.delegate.errorCaught(error) + } + } + + func channelInactive(context: ChannelHandlerContext) { + self.delegate.channelInactive() + } + + func errorCaught(context: ChannelHandlerContext, error: Error) { + self.delegate.errorCaught(error) + } +} diff --git a/Sources/LambdaRuntimeCore/NewLambdaRuntime+StateMachine.swift b/Sources/LambdaRuntimeCore/NewLambdaRuntime+StateMachine.swift new file mode 100644 index 00000000..ab5c7dd6 --- /dev/null +++ b/Sources/LambdaRuntimeCore/NewLambdaRuntime+StateMachine.swift @@ -0,0 +1,295 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftAWSLambdaRuntime open source project +// +// Copyright (c) 2022 Apple Inc. and the SwiftAWSLambdaRuntime project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import NIOCore + +extension NewLambdaRuntime { + struct Connection { + var channel: Channel + var handler: NewLambdaChannelHandler + } + + struct StateMachine { + enum Action { + case none + case createHandler(andConnection: Bool) + + case requestNextInvocation(NewLambdaChannelHandler, succeedStartPromise: EventLoopPromise?) + + case reportInvocationResult(LambdaRequestID, Result, pipelineNextInvocationRequest: Bool, NewLambdaChannelHandler) + case reportStartupError(Error, NewLambdaChannelHandler) + + case invokeHandler(Handler, Invocation, ByteBuffer) + + case failRuntime(Error, startPomise: EventLoopPromise?) + } + + private enum State { + case initialized + case starting(EventLoopPromise?) + case connected(Connection, EventLoopPromise?) + case handlerCreated(Handler, EventLoopPromise?) + case handlerCreationFailed(Error, EventLoopPromise?) + case reportingStartupError(Connection, Error, EventLoopPromise?) + + case waitingForInvocation(Connection, Handler) + case executingInvocation(Connection, Handler, LambdaRequestID) + case reportingInvocationResult(Connection, Handler, nextInvocationRequestPipelined: Bool) + + case failed(Error) + } + + private var markShutdown: Bool + private var state: State + + init() { + self.markShutdown = false + self.state = .initialized + } + + mutating func start(connection: Connection?, promise: EventLoopPromise?) -> Action { + switch self.state { + case .initialized: + if let connection = connection { + self.state = .connected(connection, promise) + return .createHandler(andConnection: false) + } + + self.state = .starting(promise) + return .createHandler(andConnection: true) + + case .starting, + .connected, + .handlerCreated, + .handlerCreationFailed, + .reportingStartupError, + .waitingForInvocation, + .executingInvocation, + .reportingInvocationResult, + .failed: + preconditionFailure("Invalid state: \(self.state)") + } + } + + mutating func handlerCreated(_ handler: Handler) -> Action { + switch self.state { + case .initialized, + .handlerCreated, + .handlerCreationFailed, + .waitingForInvocation, + .executingInvocation, + .reportingInvocationResult, + .reportingStartupError: + preconditionFailure("Invalid state: \(self.state)") + + case .starting(let promise): + self.state = .handlerCreated(handler, promise) + return .none + + case .connected(let connection, let promise): + self.state = .waitingForInvocation(connection, handler) + return .requestNextInvocation(connection.handler, succeedStartPromise: promise) + + case .failed: + return .none + } + } + + mutating func handlerCreationFailed(_ error: Error) -> Action { + switch self.state { + case .initialized, + .handlerCreated, + .handlerCreationFailed, + .waitingForInvocation, + .executingInvocation, + .reportingInvocationResult, + .reportingStartupError: + preconditionFailure("Invalid state: \(self.state)") + + case .starting(let promise): + self.state = .handlerCreationFailed(error, promise) + return .none + + case .connected(let connection, let promise): + self.state = .reportingStartupError(connection, error, promise) + return .reportStartupError(error, connection.handler) + + case .failed: + return .none + } + } + + mutating func httpConnectionCreated( + _ connection: Connection + ) -> Action { + switch self.state { + case .initialized, + .connected, + .waitingForInvocation, + .executingInvocation, + .reportingInvocationResult, + .reportingStartupError: + preconditionFailure("Invalid state: \(self.state)") + + case .starting(let promise): + self.state = .connected(connection, promise) + return .none + + case .handlerCreated(let handler, let promise): + self.state = .waitingForInvocation(connection, handler) + return .requestNextInvocation(connection.handler, succeedStartPromise: promise) + + case .handlerCreationFailed(let error, let promise): + self.state = .reportingStartupError(connection, error, promise) + return .reportStartupError(error, connection.handler) + + case .failed: + return .none + } + } + + mutating func httpChannelConnectFailed(_ error: Error) -> Action { + switch self.state { + case .initialized, + .connected, + .waitingForInvocation, + .executingInvocation, + .reportingInvocationResult, + .reportingStartupError: + preconditionFailure("Invalid state: \(self.state)") + + case .starting(let promise): + self.state = .failed(error) + return .failRuntime(error, startPomise: promise) + + case .handlerCreated(_, let promise): + self.state = .failed(error) + return .failRuntime(error, startPomise: promise) + + case .handlerCreationFailed(let error, let promise): + self.state = .failed(error) + return .failRuntime(error, startPomise: promise) + + case .failed: + return .none + } + } + + mutating func newInvocationReceived(_ invocation: Invocation, _ body: ByteBuffer) -> Action { + switch self.state { + case .initialized, + .starting, + .connected, + .handlerCreated, + .handlerCreationFailed, + .executingInvocation, + .reportingInvocationResult, + .reportingStartupError: + preconditionFailure("Invalid state: \(self.state)") + + case .waitingForInvocation(let connection, let handler): + self.state = .executingInvocation(connection, handler, LambdaRequestID(uuidString: invocation.requestID)!) + return .invokeHandler(handler, invocation, body) + + case .failed: + return .none + } + } + + mutating func acceptedReceived() -> Action { + switch self.state { + case .initialized, + .starting, + .connected, + .handlerCreated, + .handlerCreationFailed, + .executingInvocation: + preconditionFailure("Invalid state: \(self.state)") + + case .waitingForInvocation: + preconditionFailure("TODO: fixme") + + case .reportingStartupError(_, let error, let promise): + self.state = .failed(error) + return .failRuntime(error, startPomise: promise) + + case .reportingInvocationResult(let connection, let handler, true): + self.state = .waitingForInvocation(connection, handler) + return .none + + case .reportingInvocationResult(let connection, let handler, false): + self.state = .waitingForInvocation(connection, handler) + return .requestNextInvocation(connection.handler, succeedStartPromise: nil) + + case .failed: + return .none + } + } + + mutating func errorResponseReceived(_ errorResponse: ErrorResponse) -> Action { + switch self.state { + case .initialized, + .starting, + .connected, + .handlerCreated, + .handlerCreationFailed, + .executingInvocation: + preconditionFailure("Invalid state: \(self.state)") + + case .waitingForInvocation: + let error = LambdaRuntimeError.controlPlaneErrorResponse(errorResponse) + self.state = .failed(error) + return .failRuntime(error, startPomise: nil) + + case .reportingStartupError(_, let error, let promise): + self.state = .failed(error) + return .failRuntime(error, startPomise: promise) + + case .reportingInvocationResult: + let error = LambdaRuntimeError.controlPlaneErrorResponse(errorResponse) + self.state = .failed(error) + return .failRuntime(error, startPomise: nil) + + case .failed: + return .none + } + } + + mutating func handlerError(_: Error) {} + + mutating func channelInactive() {} + + mutating func invocationFinished(_ result: Result) -> Action { + switch self.state { + case .initialized, + .starting, + .handlerCreated, + .handlerCreationFailed, + .connected, + .waitingForInvocation, + .reportingStartupError, + .reportingInvocationResult: + preconditionFailure("Invalid state: \(self.state)") + + case .failed: + return .none + + case .executingInvocation(let connection, let handler, let requestID): + let pipelining = true + self.state = .reportingInvocationResult(connection, handler, nextInvocationRequestPipelined: pipelining) + return .reportInvocationResult(requestID, result, pipelineNextInvocationRequest: pipelining, connection.handler) + } + } + } +} diff --git a/Sources/LambdaRuntimeCore/NewLambdaRuntime.swift b/Sources/LambdaRuntimeCore/NewLambdaRuntime.swift new file mode 100644 index 00000000..a79cacef --- /dev/null +++ b/Sources/LambdaRuntimeCore/NewLambdaRuntime.swift @@ -0,0 +1,307 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftAWSLambdaRuntime open source project +// +// Copyright (c) 2022 Apple Inc. and the SwiftAWSLambdaRuntime project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Backtrace +import Logging +import NIOCore +import NIOPosix + +#if canImport(Glibc) +import Glibc +#endif + +/// `LambdaRuntime` manages the Lambda process lifecycle. +/// +/// - note: All state changes are dispatched onto the supplied EventLoop. +final class NewLambdaRuntime { + typealias Context = Handler.Provider.Context + typealias Invocation = Handler.Provider.Invocation + + private let eventLoop: EventLoop + private let shutdownPromise: EventLoopPromise + private let logger: Logger + private let configuration: Lambda.Configuration + + private var state: StateMachine + + init(eventLoop: EventLoop, + logger: Logger, + configuration: Lambda.Configuration, + handlerType: Handler.Type) { + self.state = StateMachine() + self.eventLoop = eventLoop + self.shutdownPromise = eventLoop.makePromise(of: Void.self) + self.logger = logger + self.configuration = configuration + } + + deinit { + // TODO: Verify is shutdown + } + + /// The `Lifecycle` shutdown future. + /// + /// - Returns: An `EventLoopFuture` that is fulfilled after the Lambda lifecycle has fully shutdown. + public var shutdownFuture: EventLoopFuture { + self.shutdownPromise.futureResult + } + + /// Start the `LambdaRuntime`. + /// + /// - Returns: An `EventLoopFuture` that is fulfilled after the Lambda hander has been created + /// and initiliazed, and a first run has been scheduled. + public func start() -> EventLoopFuture { + let promise = self.eventLoop.makePromise(of: Void.self) + self.start(promise: promise) + return promise.futureResult + } + + public func start(promise: EventLoopPromise?) { + if self.eventLoop.inEventLoop { + self.start0(promise: promise) + } else { + self.eventLoop.execute { + self.start0(promise: promise) + } + } + } + + public func __testOnly_start(channel: Channel, promise: EventLoopPromise?) { + precondition(channel.eventLoop === self.eventLoop, "Channel must be created on the supplied EventLoop.") + if self.eventLoop.inEventLoop { + self.__testOnly_start0(channel: channel, promise: promise) + } else { + self.eventLoop.execute { + self.__testOnly_start0(channel: channel, promise: promise) + } + } + } + + /// Begin the `LambdaRuntime` shutdown. Only needed for debugging purposes, hence behind a `DEBUG` flag. + public func shutdown(promise: EventLoopPromise?) { + if self.eventLoop.inEventLoop { + self.shutdown0(promise: promise) + } else { + self.eventLoop.execute { + self.shutdown0(promise: promise) + } + } + } + + // MARK: - Private - + + private func start0(promise: EventLoopPromise?) { + self.eventLoop.assertInEventLoop() + + // when starting we want to do thing in parallel: + // 1. start the connection to the control plane + // 2. create the lambda handler + + self.logger.debug("initializing lambda") + + let action = self.state.start(connection: nil, promise: promise) + self.run(action) + } + + private func shutdown0(promise: EventLoopPromise?) {} + + private func __testOnly_start0(channel: Channel, promise: EventLoopPromise?) { + channel.eventLoop.preconditionInEventLoop() + assert(channel.isActive) + + do { + let connection = try self.setupConnection(channel: channel) + let action = self.state.start(connection: connection, promise: promise) + self.run(action) + } catch { + promise?.fail(error) + } + } + + private func run(_ action: StateMachine.Action) { + switch action { + case .createHandler(andConnection: let andConnection): + self.createHandler() + if andConnection { + self.createConnection() + } + + case .invokeHandler(let handler, let invocation, let event): + self.logger.trace("invoking handler") + let context = Context( + logger: self.logger, + eventLoop: self.eventLoop, + allocator: .init(), + invocation: invocation + ) + handler.handle(event, context: context).whenComplete { result in + let action = self.state.invocationFinished(result) + self.run(action) + } + + case .failRuntime(let error, let startPromise): + startPromise?.fail(error) + self.shutdownPromise.fail(error) + + case .requestNextInvocation(let handler, let startPromise): + self.logger.trace("requesting next invocation") + handler.sendRequest(.next) + startPromise?.succeed(()) + + case .reportInvocationResult(let requestID, let result, let pipelineNextInvocationRequest, let handler): + switch result { + case .success(let body): + self.logger.trace("reporting invocation success", metadata: [ + "lambda-request-id": "\(requestID)", + ]) + handler.sendRequest(.invocationResponse(requestID, body)) + + case .failure(let error): + self.logger.trace("reporting invocation failure", metadata: [ + "lambda-request-id": "\(requestID)", + ]) + let errorString = String(describing: error) + let errorResponse = ErrorResponse(errorType: errorString, errorMessage: errorString) + handler.sendRequest(.invocationError(requestID, errorResponse)) + } + + if pipelineNextInvocationRequest { + handler.sendRequest(.next) + } + + case .reportStartupError(let error, let handler): + let errorString = String(describing: error) + handler.sendRequest(.initializationError(.init(errorType: errorString, errorMessage: errorString))) + + case .none: + break + } + } + + private func createConnection() { + let connectFuture = ClientBootstrap(group: self.eventLoop).connect( + host: self.configuration.runtimeEngine.ip, + port: self.configuration.runtimeEngine.port + ) + + connectFuture.whenComplete { result in + let action: StateMachine.Action + switch result { + case .success(let channel): + do { + let connection = try self.setupConnection(channel: channel) + action = self.state.httpConnectionCreated(connection) + } catch { + action = self.state.httpChannelConnectFailed(error) + } + case .failure(let error): + action = self.state.httpChannelConnectFailed(error) + } + self.run(action) + } + } + + private func setupConnection(channel: Channel) throws -> Connection { + let handler = NewLambdaChannelHandler(delegate: self, host: self.configuration.runtimeEngine.ip) + try channel.pipeline.syncOperations.addHandler(handler) + return Connection(channel: channel, handler: handler) + } + + private func createHandler() { + let context = Lambda.InitializationContext( + logger: self.logger, + eventLoop: self.eventLoop, + allocator: ByteBufferAllocator() + ) + + Handler.makeHandler(context: context).hop(to: self.eventLoop).whenComplete { result in + let action: StateMachine.Action + switch result { + case .success(let handler): + action = self.state.handlerCreated(handler) + case .failure(let error): + action = self.state.handlerCreationFailed(error) + } + self.run(action) + } + } +} + +extension NewLambdaRuntime: LambdaChannelHandlerDelegate { + func responseReceived(_ response: ControlPlaneResponse) { + let action: StateMachine.Action + switch response { + case .next(let invocation, let byteBuffer): + action = self.state.newInvocationReceived(invocation, byteBuffer) + + case .accepted: + action = self.state.acceptedReceived() + + case .error(let errorResponse): + action = self.state.errorResponseReceived(errorResponse) + } + + self.run(action) + } + + func errorCaught(_ error: Error) { + self.state.handlerError(error) + } + + func channelInactive() { + self.state.channelInactive() + } +} + +extension NewLambdaRuntime { + static func run(handlerType: Handler.Type) { + Backtrace.install() + + let configuration = Lambda.Configuration(runtimeEngine: Handler.Provider.runtimeEngineAddress) + var logger = Logger(label: "Lambda") + logger.logLevel = configuration.general.logLevel + + MultiThreadedEventLoopGroup.withCurrentThreadAsEventLoop { eventLoop in + let runtime = NewLambdaRuntime( + eventLoop: eventLoop, + logger: logger, + configuration: configuration, + handlerType: Handler.self + ) + + logger.info("lambda runtime starting with \(configuration)") + + #if DEBUG + let signalSource = trap(signal: configuration.lifecycle.stopSignal) { signal in + logger.info("intercepted signal: \(signal)") + runtime.shutdown(promise: nil) + } + #endif + + runtime.start().flatMap { + runtime.shutdownFuture + }.whenComplete { _ in + #if DEBUG + signalSource.cancel() + #endif + eventLoop.shutdownGracefully { error in + if let error = error { + preconditionFailure("Failed to shutdown eventloop: \(error)") + } + logger.info("shutdown completed") + } + } + } + } +} diff --git a/Sources/LambdaRuntimeCore/Utils.swift b/Sources/LambdaRuntimeCore/Utils.swift new file mode 100644 index 00000000..4404a95d --- /dev/null +++ b/Sources/LambdaRuntimeCore/Utils.swift @@ -0,0 +1,87 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftAWSLambdaRuntime open source project +// +// Copyright (c) 2017-2018 Apple Inc. and the SwiftAWSLambdaRuntime project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Dispatch +import NIOPosix + +/// Helper function to trap signals +internal func trap(signal sig: Signal, handler: @escaping (Signal) -> Void) -> DispatchSourceSignal { + let signalSource = DispatchSource.makeSignalSource(signal: sig.rawValue, queue: DispatchQueue.global()) + signal(sig.rawValue, SIG_IGN) + signalSource.setEventHandler(handler: { + signalSource.cancel() + handler(sig) + }) + signalSource.resume() + return signalSource +} + +internal enum Signal: Int32 { + case HUP = 1 + case INT = 2 + case QUIT = 3 + case ABRT = 6 + case KILL = 9 + case ALRM = 14 + case TERM = 15 +} + +@_spi(Lambda) +extension DispatchWallTime { + public init(millisSinceEpoch: Int64) { + let nanoSinceEpoch = UInt64(millisSinceEpoch) * 1_000_000 + let seconds = UInt64(nanoSinceEpoch / 1_000_000_000) + let nanoseconds = nanoSinceEpoch - (seconds * 1_000_000_000) + self.init(timespec: timespec(tv_sec: Int(seconds), tv_nsec: Int(nanoseconds))) + } + + public var millisSinceEpoch: Int64 { + Int64(bitPattern: self.rawValue) / -1_000_000 + } +} + +@_spi(Lambda) +extension String { + public func encodeAsJSONString(into bytes: inout [UInt8]) { + bytes.append(UInt8(ascii: "\"")) + let stringBytes = self.utf8 + var startCopyIndex = stringBytes.startIndex + var nextIndex = startCopyIndex + + while nextIndex != stringBytes.endIndex { + switch stringBytes[nextIndex] { + case 0 ..< 32, UInt8(ascii: "\""), UInt8(ascii: "\\"): + // All Unicode characters may be placed within the + // quotation marks, except for the characters that MUST be escaped: + // quotation mark, reverse solidus, and the control characters (U+0000 + // through U+001F). + // https://tools.ietf.org/html/rfc7159#section-7 + + // copy the current range over + bytes.append(contentsOf: stringBytes[startCopyIndex ..< nextIndex]) + bytes.append(UInt8(ascii: "\\")) + bytes.append(stringBytes[nextIndex]) + + nextIndex = stringBytes.index(after: nextIndex) + startCopyIndex = nextIndex + default: + nextIndex = stringBytes.index(after: nextIndex) + } + } + + // copy everything, that hasn't been copied yet + bytes.append(contentsOf: stringBytes[startCopyIndex ..< nextIndex]) + bytes.append(UInt8(ascii: "\"")) + } +} diff --git a/Tests/AWSLambdaRuntimeCoreTests/ControlPlaneRequestEncoderTests.swift b/Tests/AWSLambdaRuntimeCoreTests/ControlPlaneRequestEncoderTests.swift index ac6c0838..2d10094b 100644 --- a/Tests/AWSLambdaRuntimeCoreTests/ControlPlaneRequestEncoderTests.swift +++ b/Tests/AWSLambdaRuntimeCoreTests/ControlPlaneRequestEncoderTests.swift @@ -54,7 +54,7 @@ final class ControlPlaneRequestEncoderTests: XCTestCase { } func testPostInvocationSuccessWithoutBody() { - let requestID = UUID().uuidString + let requestID = LambdaRequestID() var request: NIOHTTPServerRequestFull? XCTAssertNoThrow(request = try self.sendRequest(.invocationResponse(requestID, nil))) @@ -70,7 +70,7 @@ final class ControlPlaneRequestEncoderTests: XCTestCase { } func testPostInvocationSuccessWithBody() { - let requestID = UUID().uuidString + let requestID = LambdaRequestID() let payload = ByteBuffer(string: "hello swift lambda!") var request: NIOHTTPServerRequestFull? @@ -89,7 +89,7 @@ final class ControlPlaneRequestEncoderTests: XCTestCase { } func testPostInvocationErrorWithBody() { - let requestID = UUID().uuidString + let requestID = LambdaRequestID() let error = ErrorResponse(errorType: "SomeError", errorMessage: "An error happened") var request: NIOHTTPServerRequestFull? XCTAssertNoThrow(request = try self.sendRequest(.invocationError(requestID, error))) @@ -137,7 +137,7 @@ final class ControlPlaneRequestEncoderTests: XCTestCase { XCTAssertEqual(nextRequest?.head.method, .GET) XCTAssertEqual(nextRequest?.head.uri, "/2018-06-01/runtime/invocation/next") - let requestID = UUID().uuidString + let requestID = LambdaRequestID() let payload = ByteBuffer(string: "hello swift lambda!") var successRequest: NIOHTTPServerRequestFull? XCTAssertNoThrow(successRequest = try self.sendRequest(.invocationResponse(requestID, payload))) diff --git a/Tests/AWSLambdaRuntimeCoreTests/ControlPlaneResponseDecoderTests.swift b/Tests/AWSLambdaRuntimeCoreTests/ControlPlaneResponseDecoderTests.swift new file mode 100644 index 00000000..27cea84c --- /dev/null +++ b/Tests/AWSLambdaRuntimeCoreTests/ControlPlaneResponseDecoderTests.swift @@ -0,0 +1,344 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftAWSLambdaRuntime open source project +// +// Copyright (c) 2022 Apple Inc. and the SwiftAWSLambdaRuntime project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +@testable import AWSLambdaRuntimeCore +import NIOCore +import NIOTestUtils +import XCTest + +final class ControlPlaneResponseDecoderTests: XCTestCase { + func testNextAndAcceptedResponse() { + let nextResponse = ByteBuffer(string: """ + HTTP/1.1 200 OK\r\n\ + Content-Type: application/json\r\n\ + Lambda-Runtime-Aws-Request-Id: 9028dc49-a01b-4b44-8ffe-4912e9dabbbd\r\n\ + Lambda-Runtime-Deadline-Ms: 1638392696671\r\n\ + Lambda-Runtime-Invoked-Function-Arn: arn:aws:lambda:eu-central-1:000000000000:function:lambda-log-http-HelloWorldLambda-NiDlzMFXtF3x\r\n\ + Lambda-Runtime-Trace-Id: Root=1-61a7e375-40b3edf95b388fe75d1fa416;Parent=348bb48e251c1254;Sampled=0\r\n\ + Date: Wed, 01 Dec 2021 21:04:53 GMT\r\n\ + Content-Length: 49\r\n\ + \r\n\ + {"name":"Fabian","key2":"value2","key3":"value3"} + """ + ) + let invocation = Invocation( + requestID: "9028dc49-a01b-4b44-8ffe-4912e9dabbbd", + deadlineInMillisSinceEpoch: 1_638_392_696_671, + invokedFunctionARN: "arn:aws:lambda:eu-central-1:000000000000:function:lambda-log-http-HelloWorldLambda-NiDlzMFXtF3x", + traceID: "Root=1-61a7e375-40b3edf95b388fe75d1fa416;Parent=348bb48e251c1254;Sampled=0", + clientContext: nil, + cognitoIdentity: nil + ) + let next: ControlPlaneResponse = .next(invocation, ByteBuffer(string: #"{"name":"Fabian","key2":"value2","key3":"value3"}"#)) + + let acceptedResponse = ByteBuffer(string: """ + HTTP/1.1 202 Accepted\r\n\ + Content-Type: application/json\r\n\ + Date: Sun, 05 Dec 2021 11:53:40 GMT\r\n\ + Content-Length: 16\r\n\ + \r\n\ + {"status":"OK"}\n + """ + ) + + let pairs: [(ByteBuffer, [ControlPlaneResponse])] = [ + (nextResponse, [next]), + (acceptedResponse, [.accepted]), + (nextResponse + acceptedResponse, [next, .accepted]), + ] + + XCTAssertNoThrow(try ByteToMessageDecoderVerifier.verifyDecoder( + inputOutputPairs: pairs, + decoderFactory: { ControlPlaneResponseDecoder() } + )) + } + + func testWhitespaceInHeaderIsRejected() { + let nextResponse = ByteBuffer(string: """ + HTTP/1.1 200 OK\r\n\ + Content-Type: application/json\r\n\ + Lambda-Runtime Aws-Request-Id: 9028dc49-a01b-4b44-8ffe-4912e9dabbbd\r\n\ + Lambda-Runtime-Deadline-Ms: 1638392696671\r\n\ + Lambda-Runtime-Invoked-Function-Arn: arn:aws:lambda:eu-central-1:000000000000:function:lambda-log-http-HelloWorldLambda-NiDlzMFXtF3x\r\n\ + Lambda-Runtime-Trace-Id: Root=1-61a7e375-40b3edf95b388fe75d1fa416;Parent=348bb48e251c1254;Sampled=0\r\n\ + Date: Wed, 01 Dec 2021 21:04:53 GMT\r\n\ + Content-Length: 49\r\n\ + \r\n\ + {"name":"Fabian","key2":"value2","key3":"value3"} + """ + ) + + XCTAssertThrowsError(try ByteToMessageDecoderVerifier.verifyDecoder( + inputOutputPairs: [(nextResponse, [])], + decoderFactory: { ControlPlaneResponseDecoder() } + )) { + XCTAssertEqual($0 as? LambdaRuntimeError, .responseHeadHeaderInvalidCharacter) + } + } + + func testVeryLongHTTPStatusLine() { + let nextResponse = ByteBuffer(repeating: UInt8(ascii: "H"), count: 1024) + + XCTAssertThrowsError(try ByteToMessageDecoderVerifier.verifyDecoder( + inputOutputPairs: [(nextResponse, [])], + decoderFactory: { ControlPlaneResponseDecoder() } + )) { + XCTAssertEqual($0 as? LambdaRuntimeError, .responseHeadMoreThan256BytesBeforeCRLF) + } + } + + func testVeryLongHTTPHeader() { + let acceptedResponse = ByteBuffer(string: """ + HTTP/1.1 202 Accepted\r\n\ + Content-Type: application/json\r\n\ + Date: Sun, 05 Dec 2021 11:53:40 GMT\r\n\ + Content-Length: 16\r\n + """ + ) + ByteBuffer(repeating: UInt8(ascii: "H"), count: 1024) + + XCTAssertThrowsError(try ByteToMessageDecoderVerifier.verifyDecoder( + inputOutputPairs: [(acceptedResponse, [])], + decoderFactory: { ControlPlaneResponseDecoder() } + )) { + XCTAssertEqual($0 as? LambdaRuntimeError, .responseHeadMoreThan256BytesBeforeCRLF) + } + } + + func testNextResponseWithoutTraceID() { + let nextResponse = ByteBuffer(string: """ + HTTP/1.1 200 OK\r\n\ + Content-Type: application/json\r\n\ + Lambda-Runtime-Aws-Request-Id: 9028dc49-a01b-4b44-8ffe-4912e9dabbbd\r\n\ + Lambda-Runtime-Deadline-Ms: 1638392696671\r\n\ + Lambda-Runtime-Invoked-Function-Arn: arn:aws:lambda:eu-central-1:000000000000:function:lambda-log-http-HelloWorldLambda-NiDlzMFXtF3x\r\n\ + Date: Wed, 01 Dec 2021 21:04:53 GMT\r\n\ + Content-Length: 49\r\n\ + \r\n\ + {"name":"Fabian","key2":"value2","key3":"value3"} + """ + ) + + XCTAssertThrowsError(try ByteToMessageDecoderVerifier.verifyDecoder( + inputOutputPairs: [(nextResponse, [])], + decoderFactory: { ControlPlaneResponseDecoder() } + )) { + XCTAssertEqual($0 as? LambdaRuntimeError, .invocationHeadMissingTraceID) + } + } + + func testNextResponseWithoutRequestID() { + let nextResponse = ByteBuffer(string: """ + HTTP/1.1 200 OK\r\n\ + Content-Type: application/json\r\n\ + Lambda-Runtime-Deadline-Ms: 1638392696671\r\n\ + Lambda-Runtime-Invoked-Function-Arn: arn:aws:lambda:eu-central-1:000000000000:function:lambda-log-http-HelloWorldLambda-NiDlzMFXtF3x\r\n\ + Lambda-Runtime-Trace-Id: Root=1-61a7e375-40b3edf95b388fe75d1fa416;Parent=348bb48e251c1254;Sampled=0\r\n\ + Date: Wed, 01 Dec 2021 21:04:53 GMT\r\n\ + Content-Length: 49\r\n\ + \r\n\ + {"name":"Fabian","key2":"value2","key3":"value3"} + """ + ) + + XCTAssertThrowsError(try ByteToMessageDecoderVerifier.verifyDecoder( + inputOutputPairs: [(nextResponse, [])], + decoderFactory: { ControlPlaneResponseDecoder() } + )) { + XCTAssertEqual($0 as? LambdaRuntimeError, .invocationHeadMissingRequestID) + } + } + + func testNextResponseWithInvalidStatusCode() { + let nextResponse = ByteBuffer(string: """ + HTTP/1.1 20 OK\r\n\ + Content-Type: application/json\r\n\ + Lambda-Runtime-Deadline-Ms: 1638392696671\r\n\ + Lambda-Runtime-Invoked-Function-Arn: arn:aws:lambda:eu-central-1:000000000000:function:lambda-log-http-HelloWorldLambda-NiDlzMFXtF3x\r\n\ + Lambda-Runtime-Trace-Id: Root=1-61a7e375-40b3edf95b388fe75d1fa416;Parent=348bb48e251c1254;Sampled=0\r\n\ + Date: Wed, 01 Dec 2021 21:04:53 GMT\r\n\ + Content-Length: 49\r\n\ + \r\n\ + {"name":"Fabian","key2":"value2","key3":"value3"} + """ + ) + + XCTAssertThrowsError(try ByteToMessageDecoderVerifier.verifyDecoder( + inputOutputPairs: [(nextResponse, [])], + decoderFactory: { ControlPlaneResponseDecoder() } + )) { + XCTAssertEqual($0 as? LambdaRuntimeError, .responseHeadInvalidStatusLine) + } + } + + func testNextResponseWithVersionHTTP2() { + let nextResponse = ByteBuffer(string: """ + HTTP/2.0 200 OK\r\n\ + Content-Type: application/json\r\n\ + Lambda-Runtime-Deadline-Ms: 1638392696671\r\n\ + Lambda-Runtime-Invoked-Function-Arn: arn:aws:lambda:eu-central-1:000000000000:function:lambda-log-http-HelloWorldLambda-NiDlzMFXtF3x\r\n\ + Lambda-Runtime-Trace-Id: Root=1-61a7e375-40b3edf95b388fe75d1fa416;Parent=348bb48e251c1254;Sampled=0\r\n\ + Date: Wed, 01 Dec 2021 21:04:53 GMT\r\n\ + Content-Length: 49\r\n\ + \r\n\ + {"name":"Fabian","key2":"value2","key3":"value3"} + """ + ) + + XCTAssertThrowsError(try ByteToMessageDecoderVerifier.verifyDecoder( + inputOutputPairs: [(nextResponse, [])], + decoderFactory: { ControlPlaneResponseDecoder() } + )) { + XCTAssertEqual($0 as? LambdaRuntimeError, .responseHeadInvalidStatusLine) + } + } + + func testNextResponseLeadingAndTrailingWhitespaceHeaders() { + let nextResponse = ByteBuffer(string: """ + HTTP/1.1 200 OK\r\n\ + Content-Type: \t \t application/json\t \t \r\n\ + Lambda-Runtime-Aws-Request-Id: \t \t 9028dc49-a01b-4b44-8ffe-4912e9dabbbd\t \t \r\n\ + Lambda-Runtime-Deadline-Ms: \t \t 1638392696671\t \t \r\n\ + Lambda-Runtime-Invoked-Function-Arn: \t \t arn:aws:lambda:eu-central-1:000000000000:function:lambda-log-http-HelloWorldLambda-NiDlzMFXtF3x\t \t \r\n\ + Lambda-Runtime-Trace-Id: \t \t Root=1-61a7e375-40b3edf95b388fe75d1fa416;Parent=348bb48e251c1254;Sampled=0\t \t \r\n\ + Date: \t \t Wed, 01 Dec 2021 21:04:53 GMT\t \t \r\n\ + Content-Length: \t \t 49\t \t \r\n\ + \r\n\ + {"name":"Fabian","key2":"value2","key3":"value3"} + """ + ) + let invocation = Invocation( + requestID: "9028dc49-a01b-4b44-8ffe-4912e9dabbbd", + deadlineInMillisSinceEpoch: 1_638_392_696_671, + invokedFunctionARN: "arn:aws:lambda:eu-central-1:000000000000:function:lambda-log-http-HelloWorldLambda-NiDlzMFXtF3x", + traceID: "Root=1-61a7e375-40b3edf95b388fe75d1fa416;Parent=348bb48e251c1254;Sampled=0", + clientContext: nil, + cognitoIdentity: nil + ) + let next: ControlPlaneResponse = .next(invocation, ByteBuffer(string: #"{"name":"Fabian","key2":"value2","key3":"value3"}"#)) + + XCTAssertNoThrow(try ByteToMessageDecoderVerifier.verifyDecoder( + inputOutputPairs: [(nextResponse, [next])], + decoderFactory: { ControlPlaneResponseDecoder() } + )) + } + + func testContentLengthHasTrailingCharacterSurroundedByWhitespace() { + let nextResponse = ByteBuffer(string: """ + HTTP/1.1 200 OK\r\n\ + Content-Type: \t \t application/json\t \t \r\n\ + Content-Length: 49 r \r\n\ + Date: \t \t Wed, 01 Dec 2021 21:04:53 GMT\t \t \r\n\ + + \r\n\ + {"name":"Fabian","key2":"value2","key3":"value3"} + """ + ) + + XCTAssertThrowsError(try ByteToMessageDecoderVerifier.verifyDecoder( + inputOutputPairs: [(nextResponse, [])], + decoderFactory: { ControlPlaneResponseDecoder() } + )) { + XCTAssertEqual($0 as? LambdaRuntimeError, .responseHeadInvalidContentLengthValue) + } + } + + func testInvalidContentLength() { + let nextResponse = ByteBuffer(string: """ + HTTP/1.1 200 OK\r\n\ + Content-Type: \t \t application/json\t \t \r\n\ + Content-Length: 4u9 \r\n\ + Date: \t \t Wed, 01 Dec 2021 21:04:53 GMT\t \t \r\n\ + + \r\n\ + {"name":"Fabian","key2":"value2","key3":"value3"} + """ + ) + + XCTAssertThrowsError(try ByteToMessageDecoderVerifier.verifyDecoder( + inputOutputPairs: [(nextResponse, [])], + decoderFactory: { ControlPlaneResponseDecoder() } + )) { + XCTAssertEqual($0 as? LambdaRuntimeError, .responseHeadInvalidContentLengthValue) + } + } + + func testResponseHeaderWithoutColon() { + let nextResponse = ByteBuffer(string: """ + HTTP/1.1 200 OK\r\n\ + Content-Type application/json\r\n\ + Content-Length: 49\r\n\ + Date: Wed, 01 Dec 2021 21:04:53 GMT\r\n\ + + \r\n\ + {"name":"Fabian","key2":"value2","key3":"value3"} + """ + ) + + XCTAssertThrowsError(try ByteToMessageDecoderVerifier.verifyDecoder( + inputOutputPairs: [(nextResponse, [])], + decoderFactory: { ControlPlaneResponseDecoder() } + )) { + XCTAssertEqual($0 as? LambdaRuntimeError, .responseHeadHeaderMissingColon) + } + } + + func testResponseHeaderWithDoubleCR() { + let nextResponse = ByteBuffer(string: """ + HTTP/1.1 200 OK\r\n\ + Content-Type: application/json\r\r\n\ + Content-Length: 49\r\n\ + Date: Wed, 01 Dec 2021 21:04:53 GMT\r\n\ + + \r\n\ + {"name":"Fabian","key2":"value2","key3":"value3"} + """ + ) + + XCTAssertThrowsError(try ByteToMessageDecoderVerifier.verifyDecoder( + inputOutputPairs: [(nextResponse, [])], + decoderFactory: { ControlPlaneResponseDecoder() } + )) { + XCTAssertEqual($0 as? LambdaRuntimeError, .responseHeadInvalidHeader) + } + } + + func testResponseHeaderWithoutValue() { + let nextResponse = ByteBuffer(string: """ + HTTP/1.1 200 OK\r\n\ + Content-Type: \r\n\ + Content-Length: 49\r\n\ + Date: Wed, 01 Dec 2021 21:04:53 GMT\r\n\ + \r\n\ + {"name":"Fabian","key2":"value2","key3":"value3"} + """ + ) + + XCTAssertThrowsError(try ByteToMessageDecoderVerifier.verifyDecoder( + inputOutputPairs: [(nextResponse, [])], + decoderFactory: { ControlPlaneResponseDecoder() } + )) { + // TODO: This should return an invalid header function error (.responseHeadInvalidHeader) + XCTAssertEqual($0 as? LambdaRuntimeError, .invocationHeadMissingRequestID) + } + } +} + +extension ByteBuffer { + static func + (lhs: Self, rhs: Self) -> ByteBuffer { + var new = lhs + var rhs = rhs + new.writeBuffer(&rhs) + return new + } +} diff --git a/Tests/AWSLambdaRuntimeCoreTests/LambdaRequestIDTests.swift b/Tests/AWSLambdaRuntimeCoreTests/LambdaRequestIDTests.swift index 7849fe09..e3d89dae 100644 --- a/Tests/AWSLambdaRuntimeCoreTests/LambdaRequestIDTests.swift +++ b/Tests/AWSLambdaRuntimeCoreTests/LambdaRequestIDTests.swift @@ -24,7 +24,7 @@ final class LambdaRequestIDTest: XCTestCase { let requestID = buffer.readRequestID() XCTAssertEqual(buffer.readerIndex, 36) XCTAssertEqual(buffer.readableBytes, 0) - XCTAssertEqual(requestID?.uuidString, UUID(uuidString: string)?.uuidString) + XCTAssertEqual(requestID?.uppercased, UUID(uuidString: string)?.uuidString) XCTAssertEqual(requestID?.uppercased, string) } @@ -35,7 +35,7 @@ final class LambdaRequestIDTest: XCTestCase { let requestID = originalBuffer.readRequestID() XCTAssertEqual(originalBuffer.readerIndex, 36) XCTAssertEqual(originalBuffer.readableBytes, 0) - XCTAssertEqual(requestID?.uuidString, UUID(uuidString: string)?.uuidString) + XCTAssertEqual(requestID?.uppercased, UUID(uuidString: string)?.uuidString) XCTAssertEqual(requestID?.lowercased, string) var newBuffer = ByteBuffer() @@ -109,7 +109,7 @@ final class LambdaRequestIDTest: XCTestCase { // achieve this though at the moment // XCTAssertFalse((nsString as String).isContiguousUTF8) let requestID = LambdaRequestID(uuidString: nsString as String) - XCTAssertEqual(requestID?.uuidString, LambdaRequestID(uuidString: nsString as String)?.uuidString) + XCTAssertEqual(requestID?.lowercased, LambdaRequestID(uuidString: nsString as String)?.lowercased) XCTAssertEqual(requestID?.uppercased, nsString as String) } @@ -121,10 +121,10 @@ final class LambdaRequestIDTest: XCTestCase { func testDescription() { let requestID = LambdaRequestID() - let fduuid = UUID(uuid: requestID.uuid) + let uuid = UUID(uuid: requestID.uuid) - XCTAssertEqual(fduuid.description, requestID.description) - XCTAssertEqual(fduuid.debugDescription, requestID.debugDescription) + XCTAssertEqual(uuid.description.lowercased(), requestID.description) + XCTAssertEqual(uuid.debugDescription.lowercased(), requestID.debugDescription) } func testFoundationInteropFromFoundation() { @@ -190,7 +190,7 @@ final class LambdaRequestIDTest: XCTestCase { var data: Data? XCTAssertNoThrow(data = try JSONEncoder().encode(test)) - XCTAssertEqual(try String(decoding: XCTUnwrap(data), as: Unicode.UTF8.self), #"{"requestID":"\#(requestID.uuidString)"}"#) + XCTAssertEqual(try String(decoding: XCTUnwrap(data), as: Unicode.UTF8.self), #"{"requestID":"\#(requestID.lowercased)"}"#) } func testDecodingSuccess() { @@ -198,7 +198,7 @@ final class LambdaRequestIDTest: XCTestCase { let requestID: LambdaRequestID } let requestID = LambdaRequestID() - let data = #"{"requestID":"\#(requestID.uuidString)"}"#.data(using: .utf8) + let data = #"{"requestID":"\#(requestID.lowercased)"}"#.data(using: .utf8) var result: Test? XCTAssertNoThrow(result = try JSONDecoder().decode(Test.self, from: XCTUnwrap(data))) @@ -210,7 +210,7 @@ final class LambdaRequestIDTest: XCTestCase { let requestID: LambdaRequestID } let requestID = LambdaRequestID() - var requestIDString = requestID.uuidString + var requestIDString = requestID.lowercased _ = requestIDString.removeLast() let data = #"{"requestID":"\#(requestIDString)"}"#.data(using: .utf8) diff --git a/Tests/AWSLambdaRuntimeCoreTests/NewLambdaChannelHandlerTests.swift b/Tests/AWSLambdaRuntimeCoreTests/NewLambdaChannelHandlerTests.swift new file mode 100644 index 00000000..f2c9fc33 --- /dev/null +++ b/Tests/AWSLambdaRuntimeCoreTests/NewLambdaChannelHandlerTests.swift @@ -0,0 +1,212 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftAWSLambdaRuntime open source project +// +// Copyright (c) 2022 Apple Inc. and the SwiftAWSLambdaRuntime project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +@testable import AWSLambdaRuntimeCore +import NIOCore +import NIOEmbedded +import NIOHTTP1 +import XCTest + +final class NewLambdaChannelHandlerTests: XCTestCase { + let host = "192.168.0.1" + + var delegate: EmbeddedLambdaChannelHandlerDelegate! + var handler: NewLambdaChannelHandler! + var client: EmbeddedChannel! + var server: EmbeddedChannel! + + override func setUp() { + self.delegate = EmbeddedLambdaChannelHandlerDelegate() + self.handler = NewLambdaChannelHandler(delegate: self.delegate, host: "127.0.0.1") + + self.client = EmbeddedChannel(handler: self.handler) + self.server = EmbeddedChannel(handlers: [ + NIOHTTPServerRequestAggregator(maxContentLength: 1024 * 1024), + ]) + + XCTAssertNoThrow(try self.server.pipeline.syncOperations.configureHTTPServerPipeline(position: .first)) + + XCTAssertNoThrow(try self.server.bind(to: .init(ipAddress: "127.0.0.1", port: 0), promise: nil)) + XCTAssertNoThrow(try self.client.connect(to: .init(ipAddress: "127.0.0.1", port: 0), promise: nil)) + } + + func testPipelineRequests() { + self.handler.sendRequest(.next) + + self.assertInteract() + + var nextRequest: NIOHTTPServerRequestFull? + XCTAssertNoThrow(nextRequest = try self.server.readInbound(as: NIOHTTPServerRequestFull.self)) + XCTAssertEqual(nextRequest?.head.uri, "/2018-06-01/runtime/invocation/next") + XCTAssertEqual(nextRequest?.head.method, .GET) + + XCTAssertNil(try self.server.readInbound(as: NIOHTTPServerRequestFull.self)) + + let requestID = LambdaRequestID() + let traceID = "foo" + let functionARN = "arn" + let deadline = UInt(Date().timeIntervalSince1970 * 1000) + 3000 + let requestBody = ByteBuffer(string: "foo bar") + + XCTAssertNoThrow(try self.server.writeOutboundInvocation( + requestID: requestID, + traceID: traceID, + functionARN: functionARN, + deadline: deadline, + body: requestBody + )) + + self.assertInteract() + + var response: (Invocation, ByteBuffer)? + XCTAssertNoThrow(response = try self.delegate.readNextResponse()) + + XCTAssertEqual(response?.0.requestID, requestID.lowercased) + XCTAssertEqual(response?.0.traceID, traceID) + XCTAssertEqual(response?.0.invokedFunctionARN, functionARN) + XCTAssertEqual(response?.0.deadlineInMillisSinceEpoch, Int64(deadline)) + XCTAssertEqual(response?.1, requestBody) + + let responseBody = ByteBuffer(string: "hello world") + + self.handler.sendRequest(.invocationResponse(requestID, responseBody)) + + self.assertInteract() + + var responseRequest: NIOHTTPServerRequestFull? + XCTAssertNoThrow(responseRequest = try self.server.readInbound(as: NIOHTTPServerRequestFull.self)) + XCTAssertEqual(responseRequest?.head.uri, "/2018-06-01/runtime/invocation/\(requestID.lowercased)/response") + XCTAssertEqual(responseRequest?.head.method, .POST) + XCTAssertEqual(responseRequest?.body, responseBody) + } + + func assertInteract(file: StaticString = #file, line: UInt = #line) { + XCTAssertNoThrow(try { + while let clientBuffer = try self.client.readOutbound(as: ByteBuffer.self) { + try self.server.writeInbound(clientBuffer) + } + + while let serverBuffer = try self.server.readOutbound(as: ByteBuffer.self) { + try self.client.writeInbound(serverBuffer) + } + }(), file: file, line: line) + } +} + +final class EmbeddedLambdaChannelHandlerDelegate: LambdaChannelHandlerDelegate { + enum Error: Swift.Error { + case missingEvent + case wrongEventType + case wrongResponseType + } + + private enum Event { + case channelInactive + case error(Swift.Error) + case response(ControlPlaneResponse) + } + + private var events: CircularBuffer + + init() { + self.events = CircularBuffer(initialCapacity: 8) + } + + func channelInactive() { + self.events.append(.channelInactive) + } + + func errorCaught(_ error: Swift.Error) { + self.events.append(.error(error)) + } + + func responseReceived(_ response: ControlPlaneResponse) { + self.events.append(.response(response)) + } + + func readResponse() throws -> ControlPlaneResponse { + guard case .response(let response) = try self.popNextEvent() else { + throw Error.wrongEventType + } + return response + } + + func readNextResponse() throws -> (Invocation, ByteBuffer) { + guard case .next(let invocation, let body) = try self.readResponse() else { + throw Error.wrongResponseType + } + return (invocation, body) + } + + func assertAcceptResponse() throws { + guard case .accepted = try self.readResponse() else { + throw Error.wrongResponseType + } + } + + func readErrorResponse() throws -> ErrorResponse { + guard case .error(let errorResponse) = try self.readResponse() else { + throw Error.wrongResponseType + } + return errorResponse + } + + func readError() throws -> Swift.Error { + guard case .error(let error) = try self.popNextEvent() else { + throw Error.wrongEventType + } + return error + } + + func assertChannelInactive() throws { + guard case .channelInactive = try self.popNextEvent() else { + throw Error.wrongEventType + } + } + + private func popNextEvent() throws -> Event { + guard let event = self.events.popFirst() else { + throw Error.missingEvent + } + return event + } +} + +extension EmbeddedChannel { + func writeOutboundInvocation( + requestID: LambdaRequestID = LambdaRequestID(), + traceID: String = "Root=\(DispatchTime.now().uptimeNanoseconds);Parent=\(DispatchTime.now().uptimeNanoseconds);Sampled=1", + functionARN: String = "", + deadline: UInt = UInt(Date().timeIntervalSince1970 * 1000) + 3000, + body: ByteBuffer? + ) throws { + let head = HTTPResponseHead( + version: .http1_1, + status: .ok, + headers: [ + "content-length": "\(body?.readableBytes ?? 0)", + "lambda-runtime-deadline-ms": "\(deadline)", + "lambda-runtime-trace-id": "\(traceID)", + "lambda-runtime-aws-request-id": "\(requestID)", + "lambda-runtime-invoked-function-arn": "\(functionARN)", + ] + ) + + try self.writeOutbound(HTTPServerResponsePart.head(head)) + if let body = body { + try self.writeOutbound(HTTPServerResponsePart.body(.byteBuffer(body))) + } + try self.writeOutbound(HTTPServerResponsePart.end(nil)) + } +} diff --git a/Tests/AWSLambdaRuntimeTests/Lambda+CodableTest.swift b/Tests/AWSLambdaRuntimeTests/Lambda+CodableTest.swift index c11cf005..eb8b955b 100644 --- a/Tests/AWSLambdaRuntimeTests/Lambda+CodableTest.swift +++ b/Tests/AWSLambdaRuntimeTests/Lambda+CodableTest.swift @@ -14,6 +14,7 @@ @testable import AWSLambdaRuntime @testable import AWSLambdaRuntimeCore +@testable import LambdaRuntimeCore import Logging import NIOCore import NIOFoundationCompat @@ -47,7 +48,7 @@ class CodableLambdaTest: XCTestCase { context.eventLoop.makeSucceededFuture(Handler()) } - func handle(_ event: Request, context: LambdaContext) -> EventLoopFuture { + func handle(_ event: Request, context: Context) -> EventLoopFuture { XCTAssertEqual(event, self.expected) return context.eventLoop.makeSucceededVoidFuture() } @@ -76,7 +77,7 @@ class CodableLambdaTest: XCTestCase { context.eventLoop.makeSucceededFuture(Handler()) } - func handle(_ event: Request, context: LambdaContext) -> EventLoopFuture { + func handle(_ event: Request, context: Context) -> EventLoopFuture { XCTAssertEqual(event, self.expected) return context.eventLoop.makeSucceededFuture(Response(requestId: event.requestId)) } @@ -101,7 +102,7 @@ class CodableLambdaTest: XCTestCase { init(context: Lambda.InitializationContext) async throws {} - func handle(_ event: Request, context: LambdaContext) async throws { + func handle(_ event: Request, context: Context) async throws { XCTAssertEqual(event, self.expected) } } @@ -130,7 +131,7 @@ class CodableLambdaTest: XCTestCase { init(context: Lambda.InitializationContext) async throws {} - func handle(_ event: Request, context: LambdaContext) async throws -> Response { + func handle(_ event: Request, context: Context) async throws -> Response { XCTAssertEqual(event, self.expected) return Response(requestId: event.requestId) } @@ -154,8 +155,8 @@ class CodableLambdaTest: XCTestCase { #endif // convenience method - func newContext() -> LambdaContext { - LambdaContext( + func newContext() -> AWSLambda.Context { + AWSLambda.Context( requestID: UUID().uuidString, traceID: "abc123", invokedFunctionARN: "aws:arn:", diff --git a/Tests/AWSLambdaTestingTests/Tests.swift b/Tests/AWSLambdaTestingTests/Tests.swift index 5801f605..02db3122 100644 --- a/Tests/AWSLambdaTestingTests/Tests.swift +++ b/Tests/AWSLambdaTestingTests/Tests.swift @@ -35,7 +35,7 @@ class LambdaTestingTests: XCTestCase { init(context: Lambda.InitializationContext) {} - func handle(_ event: Request, context: LambdaContext) async throws -> Response { + func handle(_ event: Request, context: Context) async throws -> Response { Response(message: "echo" + event.name) } } @@ -59,7 +59,7 @@ class LambdaTestingTests: XCTestCase { init(context: Lambda.InitializationContext) {} - func handle(_ event: Request, context: LambdaContext) async throws { + func handle(_ event: Request, context: Context) async throws { LambdaTestingTests.VoidLambdaHandlerInvokeCount += 1 } } @@ -79,7 +79,7 @@ class LambdaTestingTests: XCTestCase { init(context: Lambda.InitializationContext) {} - func handle(_ event: String, context: LambdaContext) async throws { + func handle(_ event: String, context: Context) async throws { throw MyError() } } @@ -96,7 +96,7 @@ class LambdaTestingTests: XCTestCase { init(context: Lambda.InitializationContext) {} - func handle(_ event: String, context: LambdaContext) async throws -> String { + func handle(_ event: String, context: Context) async throws -> String { try await Task.sleep(nanoseconds: 500 * 1000 * 1000) return event } diff --git a/scripts/performance_test.sh b/scripts/performance_test.sh index 77904eca..ed037c15 100755 --- a/scripts/performance_test.sh +++ b/scripts/performance_test.sh @@ -27,6 +27,8 @@ if [[ $(uname -s) == "Linux" ]]; then fi swift build -c release -Xswiftc -g +swift build --package-path Examples/Echo -c release -Xswiftc -g +swift build --package-path Examples/JSON -c release -Xswiftc -g cleanup() { kill -9 $server_pid @@ -58,7 +60,7 @@ cold=() export MAX_REQUESTS=1 for (( i=0; i<$cold_iterations; i++ )); do start=$(gdate +%s%N) - ./.build/release/StringSample + ./Examples/Echo/.build/release/MyLambda end=$(gdate +%s%N) cold+=( $(($end-$start)) ) done @@ -70,7 +72,7 @@ results+=( "$MODE, cold: $avg_cold (ns)" ) echo "running $MODE mode warm test" export MAX_REQUESTS=$warm_iterations start=$(gdate +%s%N) -./.build/release/StringSample +./Examples/Echo/.build/release/MyLambda end=$(gdate +%s%N) sum_warm=$(($end-$start-$avg_cold)) # substract by avg cold since the first call is cold avg_warm=$(($sum_warm/($warm_iterations-1))) # substract since the first call is cold @@ -96,7 +98,7 @@ cold=() export MAX_REQUESTS=1 for (( i=0; i<$cold_iterations; i++ )); do start=$(gdate +%s%N) - ./.build/release/CodableSample + ./Examples/JSON/.build/release/MyLambda end=$(gdate +%s%N) cold+=( $(($end-$start)) ) done @@ -108,7 +110,7 @@ results+=( "$MODE, cold: $avg_cold (ns)" ) echo "running $MODE mode warm test" export MAX_REQUESTS=$warm_iterations start=$(gdate +%s%N) -./.build/release/CodableSample +./Examples/JSON/.build/release/MyLambda end=$(gdate +%s%N) sum_warm=$(($end-$start-$avg_cold)) # substract by avg cold since the first call is cold avg_warm=$(($sum_warm/($warm_iterations-1))) # substract since the first call is cold diff --git a/scripts/soundness.sh b/scripts/soundness.sh index d9145903..d8ea1e34 100755 --- a/scripts/soundness.sh +++ b/scripts/soundness.sh @@ -19,7 +19,7 @@ here="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" function replace_acceptable_years() { # this needs to replace all acceptable forms with 'YEARS' - sed -e 's/2017-2018/YEARS/' -e 's/2017-2020/YEARS/' -e 's/2017-2021/YEARS/' -e 's/2020-2021/YEARS/' -e 's/2019/YEARS/' -e 's/2020/YEARS/' -e 's/2021/YEARS/' -e 's/2022/YEARS/' + sed -e 's/2017-2018/YEARS/' -e 's/2017-2020/YEARS/' -e 's/2017-2021/YEARS/' -e 's/2017-2022/YEARS/' -e 's/2020-2021/YEARS/' -e 's/2020-2022/YEARS/' -e 's/2021-2022/YEARS/' -e 's/2019/YEARS/' -e 's/2020/YEARS/' -e 's/2021/YEARS/' -e 's/2022/YEARS/' } printf "=> Checking for unacceptable language... "