diff --git a/Package.swift b/Package.swift index 42e4c3ed..1d56806e 100644 --- a/Package.swift +++ b/Package.swift @@ -35,6 +35,8 @@ let package = Package( ]), .testTarget(name: "AWSLambdaRuntimeCoreTests", dependencies: [ .byName(name: "AWSLambdaRuntimeCore"), + .product(name: "NIOTestUtils", package: "swift-nio"), + .product(name: "NIOFoundationCompat", package: "swift-nio"), ]), .testTarget(name: "AWSLambdaRuntimeTests", dependencies: [ .byName(name: "AWSLambdaRuntimeCore"), diff --git a/Sources/AWSLambdaRuntimeCore/HTTPClient.swift b/Sources/AWSLambdaRuntimeCore/HTTPClient.swift index f3d073d3..fcd2a450 100644 --- a/Sources/AWSLambdaRuntimeCore/HTTPClient.swift +++ b/Sources/AWSLambdaRuntimeCore/HTTPClient.swift @@ -27,27 +27,25 @@ internal final class HTTPClient { private var state = State.disconnected private var executing = false - private static let headers = HTTPHeaders([("user-agent", "Swift-Lambda/Unknown")]) - init(eventLoop: EventLoop, configuration: Lambda.Configuration.RuntimeEngine) { self.eventLoop = eventLoop self.configuration = configuration self.targetHost = "\(self.configuration.ip):\(self.configuration.port)" } - func get(url: String, timeout: TimeAmount? = nil) -> EventLoopFuture { + func get(url: String, headers: HTTPHeaders, timeout: TimeAmount? = nil) -> EventLoopFuture { self.execute(Request(targetHost: self.targetHost, url: url, method: .GET, - headers: HTTPClient.headers, + headers: headers, timeout: timeout ?? self.configuration.requestTimeout)) } - func post(url: String, body: ByteBuffer?, timeout: TimeAmount? = nil) -> EventLoopFuture { + func post(url: String, headers: HTTPHeaders, body: ByteBuffer?, timeout: TimeAmount? = nil) -> EventLoopFuture { self.execute(Request(targetHost: self.targetHost, url: url, method: .POST, - headers: HTTPClient.headers, + headers: headers, body: body, timeout: timeout ?? self.configuration.requestTimeout)) } diff --git a/Sources/AWSLambdaRuntimeCore/LambdaConfiguration.swift b/Sources/AWSLambdaRuntimeCore/LambdaConfiguration.swift index 44352d2f..9b9ec8fb 100644 --- a/Sources/AWSLambdaRuntimeCore/LambdaConfiguration.swift +++ b/Sources/AWSLambdaRuntimeCore/LambdaConfiguration.swift @@ -67,8 +67,8 @@ extension Lambda { let keepAlive: Bool let requestTimeout: TimeAmount? - init(baseURL: String? = nil, keepAlive: Bool? = nil, requestTimeout: TimeAmount? = nil) { - let ipPort = env("AWS_LAMBDA_RUNTIME_API")?.split(separator: ":") ?? ["127.0.0.1", "7000"] + init(address: String? = nil, keepAlive: Bool? = nil, requestTimeout: TimeAmount? = nil) { + let ipPort = (address ?? env("AWS_LAMBDA_RUNTIME_API"))?.split(separator: ":") ?? ["127.0.0.1", "7000"] guard ipPort.count == 2, let port = Int(ipPort[1]) else { preconditionFailure("invalid ip+port configuration \(ipPort)") } diff --git a/Sources/AWSLambdaRuntimeCore/LambdaRuntimeClient.swift b/Sources/AWSLambdaRuntimeCore/LambdaRuntimeClient.swift index 2976a8c2..fe43ac0a 100644 --- a/Sources/AWSLambdaRuntimeCore/LambdaRuntimeClient.swift +++ b/Sources/AWSLambdaRuntimeCore/LambdaRuntimeClient.swift @@ -36,7 +36,7 @@ extension Lambda { func getNextInvocation(logger: Logger) -> EventLoopFuture<(Invocation, ByteBuffer)> { let url = Consts.invocationURLPrefix + Consts.getNextInvocationURLSuffix logger.debug("requesting work from lambda runtime engine using \(url)") - return self.httpClient.get(url: url).flatMapThrowing { response in + return self.httpClient.get(url: url, headers: RuntimeClient.defaultHeaders).flatMapThrowing { response in guard response.status == .ok else { throw RuntimeError.badStatusCode(response.status) } @@ -61,19 +61,23 @@ extension Lambda { func reportResults(logger: Logger, invocation: Invocation, result: Result) -> EventLoopFuture { var url = Consts.invocationURLPrefix + "/" + invocation.requestID var body: ByteBuffer? + let headers: HTTPHeaders + switch result { case .success(let buffer): url += Consts.postResponseURLSuffix body = buffer + headers = RuntimeClient.defaultHeaders case .failure(let error): url += Consts.postErrorURLSuffix let errorResponse = ErrorResponse(errorType: Consts.functionError, errorMessage: "\(error)") let bytes = errorResponse.toJSONBytes() body = self.allocator.buffer(capacity: bytes.count) body!.writeBytes(bytes) + headers = RuntimeClient.errorHeaders } logger.debug("reporting results to lambda runtime engine using \(url)") - return self.httpClient.post(url: url, body: body).flatMapThrowing { response in + return self.httpClient.post(url: url, headers: headers, body: body).flatMapThrowing { response in guard response.status == .accepted else { throw RuntimeError.badStatusCode(response.status) } @@ -98,7 +102,7 @@ extension Lambda { var body = self.allocator.buffer(capacity: bytes.count) body.writeBytes(bytes) logger.warning("reporting initialization error to lambda runtime engine using \(url)") - return self.httpClient.post(url: url, body: body).flatMapThrowing { response in + return self.httpClient.post(url: url, headers: RuntimeClient.errorHeaders, body: body).flatMapThrowing { response in guard response.status == .accepted else { throw RuntimeError.badStatusCode(response.status) } @@ -186,3 +190,13 @@ extension Lambda { } } } + +extension Lambda.RuntimeClient { + internal static let defaultHeaders = HTTPHeaders([("user-agent", "Swift-Lambda/Unknown")]) + + /// These headers must be sent along an invocation or initialization error report + internal static let errorHeaders = HTTPHeaders([ + ("user-agent", "Swift-Lambda/Unknown"), + ("lambda-runtime-function-error-type", "Unhandled"), + ]) +} diff --git a/Tests/AWSLambdaRuntimeCoreTests/LambdaRuntimeClientTest.swift b/Tests/AWSLambdaRuntimeCoreTests/LambdaRuntimeClientTest.swift index 31ec5cbc..728672e6 100644 --- a/Tests/AWSLambdaRuntimeCoreTests/LambdaRuntimeClientTest.swift +++ b/Tests/AWSLambdaRuntimeCoreTests/LambdaRuntimeClientTest.swift @@ -13,6 +13,11 @@ //===----------------------------------------------------------------------===// @testable import AWSLambdaRuntimeCore +import Logging +import NIO +import NIOFoundationCompat +import NIOHTTP1 +import NIOTestUtils import XCTest class LambdaRuntimeClientTest: XCTestCase { @@ -209,6 +214,110 @@ class LambdaRuntimeClientTest: XCTestCase { XCTAssertEqual(#"{"errorType":"error","errorMessage":"๐Ÿฅ‘๐Ÿ‘จโ€๐Ÿ‘ฉโ€๐Ÿ‘งโ€๐Ÿ‘ง๐Ÿ‘ฉโ€๐Ÿ‘ฉโ€๐Ÿ‘งโ€๐Ÿ‘ง๐Ÿ‘จโ€๐Ÿ‘จโ€๐Ÿ‘ง"}"#, String(decoding: emojiBytes, as: Unicode.UTF8.self)) } + func testInitializationErrorReport() { + let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) + defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) } + + let server = NIOHTTP1TestServer(group: eventLoopGroup) + defer { XCTAssertNoThrow(try server.stop()) } + + let logger = Logger(label: "TestLogger") + let client = Lambda.RuntimeClient(eventLoop: eventLoopGroup.next(), configuration: .init(address: "127.0.0.1:\(server.serverPort)")) + let result = client.reportInitializationError(logger: logger, error: TestError("boom")) + + var inboundHeader: HTTPServerRequestPart? + XCTAssertNoThrow(inboundHeader = try server.readInbound()) + guard case .head(let head) = try? XCTUnwrap(inboundHeader) else { XCTFail("Expected to get a head first"); return } + XCTAssertEqual(head.headers["lambda-runtime-function-error-type"], ["Unhandled"]) + XCTAssertEqual(head.headers["user-agent"], ["Swift-Lambda/Unknown"]) + + var inboundBody: HTTPServerRequestPart? + XCTAssertNoThrow(inboundBody = try server.readInbound()) + guard case .body(let body) = try? XCTUnwrap(inboundBody) else { XCTFail("Expected body after head"); return } + XCTAssertEqual(try JSONDecoder().decode(ErrorResponse.self, from: body).errorMessage, "boom") + + XCTAssertEqual(try server.readInbound(), .end(nil)) + + XCTAssertNoThrow(try server.writeOutbound(.head(.init(version: .init(major: 1, minor: 1), status: .accepted)))) + XCTAssertNoThrow(try server.writeOutbound(.end(nil))) + XCTAssertNoThrow(try result.wait()) + } + + func testInvocationErrorReport() { + let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) + defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) } + + let server = NIOHTTP1TestServer(group: eventLoopGroup) + defer { XCTAssertNoThrow(try server.stop()) } + + let logger = Logger(label: "TestLogger") + let client = Lambda.RuntimeClient(eventLoop: eventLoopGroup.next(), configuration: .init(address: "127.0.0.1:\(server.serverPort)")) + + let header = HTTPHeaders([ + (AmazonHeaders.requestID, "test"), + (AmazonHeaders.deadline, String(Date(timeIntervalSinceNow: 60).millisSinceEpoch)), + (AmazonHeaders.invokedFunctionARN, "arn:aws:lambda:us-east-1:123456789012:function:custom-runtime"), + (AmazonHeaders.traceID, "Root=1-5bef4de7-ad49b0e87f6ef6c87fc2e700;Parent=9a9197af755a6419;Sampled=1"), + ]) + var inv: Lambda.Invocation? + XCTAssertNoThrow(inv = try Lambda.Invocation(headers: header)) + guard let invocation = inv else { return } + + let result = client.reportResults(logger: logger, invocation: invocation, result: Result.failure(TestError("boom"))) + + var inboundHeader: HTTPServerRequestPart? + XCTAssertNoThrow(inboundHeader = try server.readInbound()) + guard case .head(let head) = try? XCTUnwrap(inboundHeader) else { XCTFail("Expected to get a head first"); return } + XCTAssertEqual(head.headers["lambda-runtime-function-error-type"], ["Unhandled"]) + XCTAssertEqual(head.headers["user-agent"], ["Swift-Lambda/Unknown"]) + + var inboundBody: HTTPServerRequestPart? + XCTAssertNoThrow(inboundBody = try server.readInbound()) + guard case .body(let body) = try? XCTUnwrap(inboundBody) else { XCTFail("Expected body after head"); return } + XCTAssertEqual(try JSONDecoder().decode(ErrorResponse.self, from: body).errorMessage, "boom") + + XCTAssertEqual(try server.readInbound(), .end(nil)) + + XCTAssertNoThrow(try server.writeOutbound(.head(.init(version: .init(major: 1, minor: 1), status: .accepted)))) + XCTAssertNoThrow(try server.writeOutbound(.end(nil))) + XCTAssertNoThrow(try result.wait()) + } + + func testInvocationSuccessResponse() { + let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) + defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) } + + let server = NIOHTTP1TestServer(group: eventLoopGroup) + defer { XCTAssertNoThrow(try server.stop()) } + + let logger = Logger(label: "TestLogger") + let client = Lambda.RuntimeClient(eventLoop: eventLoopGroup.next(), configuration: .init(address: "127.0.0.1:\(server.serverPort)")) + + let header = HTTPHeaders([ + (AmazonHeaders.requestID, "test"), + (AmazonHeaders.deadline, String(Date(timeIntervalSinceNow: 60).millisSinceEpoch)), + (AmazonHeaders.invokedFunctionARN, "arn:aws:lambda:us-east-1:123456789012:function:custom-runtime"), + (AmazonHeaders.traceID, "Root=1-5bef4de7-ad49b0e87f6ef6c87fc2e700;Parent=9a9197af755a6419;Sampled=1"), + ]) + var inv: Lambda.Invocation? + XCTAssertNoThrow(inv = try Lambda.Invocation(headers: header)) + guard let invocation = inv else { return } + + let result = client.reportResults(logger: logger, invocation: invocation, result: Result.success(nil)) + + var inboundHeader: HTTPServerRequestPart? + XCTAssertNoThrow(inboundHeader = try server.readInbound()) + guard case .head(let head) = try? XCTUnwrap(inboundHeader) else { XCTFail("Expected to get a head first"); return } + XCTAssertFalse(head.headers.contains(name: "lambda-runtime-function-error-type")) + XCTAssertEqual(head.headers["user-agent"], ["Swift-Lambda/Unknown"]) + + XCTAssertEqual(try server.readInbound(), .end(nil)) + + XCTAssertNoThrow(try server.writeOutbound(.head(.init(version: .init(major: 1, minor: 1), status: .accepted)))) + XCTAssertNoThrow(try server.writeOutbound(.end(nil))) + XCTAssertNoThrow(try result.wait()) + } + class Behavior: LambdaServerBehavior { var state = 0