diff --git a/Sources/AWSLambdaRuntimeCore/Lambda.swift b/Sources/AWSLambdaRuntimeCore/Lambda.swift index 3f4abf34..9f55a007 100644 --- a/Sources/AWSLambdaRuntimeCore/Lambda.swift +++ b/Sources/AWSLambdaRuntimeCore/Lambda.swift @@ -27,8 +27,8 @@ public enum Lambda { /// `ByteBufferLambdaHandler` factory. /// - /// A function that takes a `EventLoop` and returns an `EventLoopFuture` of a `ByteBufferLambdaHandler` - public typealias HandlerFactory = (EventLoop) -> EventLoopFuture + /// A function that takes a `InitializationContext` and returns an `EventLoopFuture` of a `ByteBufferLambdaHandler` + public typealias HandlerFactory = (InitializationContext) -> EventLoopFuture /// Run a Lambda defined by implementing the `LambdaHandler` protocol. /// @@ -58,7 +58,7 @@ public enum Lambda { /// - factory: A `ByteBufferLambdaHandler` factory. /// /// - note: This is a blocking operation that will run forever, as its lifecycle is managed by the AWS Lambda Runtime Engine. - public static func run(_ factory: @escaping (EventLoop) throws -> Handler) { + public static func run(_ factory: @escaping (InitializationContext) throws -> Handler) { self.run(factory: factory) } @@ -73,19 +73,19 @@ public enum Lambda { // for testing and internal use @discardableResult internal static func run(configuration: Configuration = .init(), handler: Handler) -> Result { - self.run(configuration: configuration, factory: { $0.makeSucceededFuture(handler) }) + self.run(configuration: configuration, factory: { $0.eventLoop.makeSucceededFuture(handler) }) } // for testing and internal use @discardableResult - internal static func run(configuration: Configuration = .init(), factory: @escaping (EventLoop) throws -> Handler) -> Result { - self.run(configuration: configuration, factory: { eventloop -> EventLoopFuture in - let promise = eventloop.makePromise(of: Handler.self) + internal static func run(configuration: Configuration = .init(), factory: @escaping (InitializationContext) throws -> Handler) -> Result { + self.run(configuration: configuration, factory: { context -> EventLoopFuture in + let promise = context.eventLoop.makePromise(of: Handler.self) // if we have a callback based handler factory, we offload the creation of the handler // onto the default offload queue, to ensure that the eventloop is never blocked. Lambda.defaultOffloadQueue.async { do { - promise.succeed(try factory(eventloop)) + promise.succeed(try factory(context)) } catch { promise.fail(error) } diff --git a/Sources/AWSLambdaRuntimeCore/LambdaContext.swift b/Sources/AWSLambdaRuntimeCore/LambdaContext.swift index f0ca2f29..a5ce528a 100644 --- a/Sources/AWSLambdaRuntimeCore/LambdaContext.swift +++ b/Sources/AWSLambdaRuntimeCore/LambdaContext.swift @@ -16,6 +16,36 @@ import Dispatch import Logging import NIO +// MARK: - InitializationContext + +extension Lambda { + /// Lambda runtime initialization context. + /// The Lambda runtime generates and passes the `InitializationContext` to the Lambda factory as an argument. + public final class InitializationContext { + /// `Logger` to log with + /// + /// - note: The `LogLevel` can be configured using the `LOG_LEVEL` environment variable. + public let logger: Logger + + /// The `EventLoop` the Lambda is executed on. Use this to schedule work with. + /// + /// - note: The `EventLoop` is shared with the Lambda runtime engine and should be handled with extra care. + /// Most importantly the `EventLoop` must never be blocked. + public let eventLoop: EventLoop + + /// `ByteBufferAllocator` to allocate `ByteBuffer` + public let allocator: ByteBufferAllocator + + internal init(logger: Logger, eventLoop: EventLoop, allocator: ByteBufferAllocator) { + self.eventLoop = eventLoop + self.logger = logger + self.allocator = allocator + } + } +} + +// MARK: - Context + extension Lambda { /// Lambda runtime context. /// The Lambda runtime generates and passes the `Context` to the Lambda handler as an argument. @@ -61,7 +91,8 @@ extension Lambda { cognitoIdentity: String? = nil, clientContext: String? = nil, logger: Logger, - eventLoop: EventLoop) { + eventLoop: EventLoop, + allocator: ByteBufferAllocator) { self.requestID = requestID self.traceID = traceID self.invokedFunctionARN = invokedFunctionARN @@ -70,7 +101,7 @@ extension Lambda { self.deadline = deadline // utility self.eventLoop = eventLoop - self.allocator = ByteBufferAllocator() + self.allocator = allocator // mutate logger with context var logger = logger logger[metadataKey: "awsRequestID"] = .string(requestID) diff --git a/Sources/AWSLambdaRuntimeCore/LambdaRunner.swift b/Sources/AWSLambdaRuntimeCore/LambdaRunner.swift index f42ec229..08aacd75 100644 --- a/Sources/AWSLambdaRuntimeCore/LambdaRunner.swift +++ b/Sources/AWSLambdaRuntimeCore/LambdaRunner.swift @@ -21,12 +21,14 @@ extension Lambda { internal final class Runner { private let runtimeClient: RuntimeClient private let eventLoop: EventLoop + private let allocator: ByteBufferAllocator private var isGettingNextInvocation = false init(eventLoop: EventLoop, configuration: Configuration) { self.eventLoop = eventLoop self.runtimeClient = RuntimeClient(eventLoop: self.eventLoop, configuration: configuration.runtimeEngine) + self.allocator = ByteBufferAllocator() } /// Run the user provided initializer. This *must* only be called once. @@ -36,13 +38,22 @@ extension Lambda { logger.debug("initializing lambda") // 1. create the handler from the factory // 2. report initialization error if one occured - return factory(self.eventLoop).hop(to: self.eventLoop).peekError { error in - self.runtimeClient.reportInitializationError(logger: logger, error: error).peekError { reportingError in - // We're going to bail out because the init failed, so there's not a lot we can do other than log - // that we couldn't report this error back to the runtime. - logger.error("failed reporting initialization error to lambda runtime engine: \(reportingError)") + let context = InitializationContext(logger: logger, + eventLoop: self.eventLoop, + allocator: self.allocator) + return factory(context) + // Hopping back to "our" EventLoop is importnant in case the factory returns a future + // that originated from a foreign EventLoop/EventLoopGroup. + // This can happen if the factory uses a library (let's say a database client) that manages its own threads/loops + // for whatever reason and returns a future that originated from that foreign EventLoop. + .hop(to: self.eventLoop) + .peekError { error in + self.runtimeClient.reportInitializationError(logger: logger, error: error).peekError { reportingError in + // We're going to bail out because the init failed, so there's not a lot we can do other than log + // that we couldn't report this error back to the runtime. + logger.error("failed reporting initialization error to lambda runtime engine: \(reportingError)") + } } - } } func run(logger: Logger, handler: Handler) -> EventLoopFuture { @@ -54,9 +65,17 @@ extension Lambda { }.flatMap { invocation, event in // 2. send invocation to handler self.isGettingNextInvocation = false - let context = Context(logger: logger, eventLoop: self.eventLoop, invocation: invocation) + let context = Context(logger: logger, + eventLoop: self.eventLoop, + allocator: self.allocator, + invocation: invocation) logger.debug("sending invocation to lambda handler \(handler)") return handler.handle(context: context, event: event) + // Hopping back to "our" EventLoop is importnant in case the handler returns a future that + // originiated from a foreign EventLoop/EventLoopGroup. + // This can happen if the handler uses a library (lets say a DB client) that manages its own threads/loops + // for whatever reason and returns a future that originated from that foreign EventLoop. + .hop(to: self.eventLoop) .mapResult { result in if case .failure(let error) = result { logger.warning("lambda handler returned an error: \(error)") @@ -82,7 +101,7 @@ extension Lambda { } private extension Lambda.Context { - convenience init(logger: Logger, eventLoop: EventLoop, invocation: Lambda.Invocation) { + convenience init(logger: Logger, eventLoop: EventLoop, allocator: ByteBufferAllocator, invocation: Lambda.Invocation) { self.init(requestID: invocation.requestID, traceID: invocation.traceID, invokedFunctionARN: invocation.invokedFunctionARN, @@ -90,7 +109,8 @@ private extension Lambda.Context { cognitoIdentity: invocation.cognitoIdentity, clientContext: invocation.clientContext, logger: logger, - eventLoop: eventLoop) + eventLoop: eventLoop, + allocator: allocator) } } diff --git a/Sources/AWSLambdaTesting/Lambda+Testing.swift b/Sources/AWSLambdaTesting/Lambda+Testing.swift index 961ce681..981ca736 100644 --- a/Sources/AWSLambdaTesting/Lambda+Testing.swift +++ b/Sources/AWSLambdaTesting/Lambda+Testing.swift @@ -102,7 +102,8 @@ extension Lambda { invokedFunctionARN: config.invokedFunctionARN, deadline: .now() + config.timeout, logger: logger, - eventLoop: eventLoop) + eventLoop: eventLoop, + allocator: ByteBufferAllocator()) return try eventLoop.flatSubmit { handler.handle(context: context, event: event) diff --git a/Tests/AWSLambdaRuntimeCoreTests/Lambda+StringTest.swift b/Tests/AWSLambdaRuntimeCoreTests/Lambda+StringTest.swift index 9d54e277..8e880296 100644 --- a/Tests/AWSLambdaRuntimeCoreTests/Lambda+StringTest.swift +++ b/Tests/AWSLambdaRuntimeCoreTests/Lambda+StringTest.swift @@ -185,7 +185,7 @@ class StringLambdaTest: XCTestCase { typealias In = String typealias Out = String - init(eventLoop: EventLoop) throws { + init(context: Lambda.InitializationContext) throws { throw TestError("kaboom") } diff --git a/Tests/AWSLambdaRuntimeCoreTests/LambdaRuntimeClientTest.swift b/Tests/AWSLambdaRuntimeCoreTests/LambdaRuntimeClientTest.swift index 2f2529c4..31ec5cbc 100644 --- a/Tests/AWSLambdaRuntimeCoreTests/LambdaRuntimeClientTest.swift +++ b/Tests/AWSLambdaRuntimeCoreTests/LambdaRuntimeClientTest.swift @@ -30,7 +30,7 @@ class LambdaRuntimeClientTest: XCTestCase { func testBootstrapFailure() { let behavior = Behavior() - XCTAssertThrowsError(try runLambda(behavior: behavior, factory: { $0.makeFailedFuture(TestError("boom")) })) { error in + XCTAssertThrowsError(try runLambda(behavior: behavior, factory: { $0.eventLoop.makeFailedFuture(TestError("boom")) })) { error in XCTAssertEqual(error as? TestError, TestError("boom")) } XCTAssertEqual(behavior.state, 1) @@ -186,7 +186,7 @@ class LambdaRuntimeClientTest: XCTestCase { .failure(.internalServerError) } } - XCTAssertThrowsError(try runLambda(behavior: Behavior(), factory: { $0.makeFailedFuture(TestError("boom")) })) { error in + XCTAssertThrowsError(try runLambda(behavior: Behavior(), factory: { $0.eventLoop.makeFailedFuture(TestError("boom")) })) { error in XCTAssertEqual(error as? TestError, TestError("boom")) } } diff --git a/Tests/AWSLambdaRuntimeCoreTests/LambdaTest.swift b/Tests/AWSLambdaRuntimeCoreTests/LambdaTest.swift index 536beaa2..30fed618 100644 --- a/Tests/AWSLambdaRuntimeCoreTests/LambdaTest.swift +++ b/Tests/AWSLambdaRuntimeCoreTests/LambdaTest.swift @@ -51,7 +51,7 @@ class LambdaTest: XCTestCase { var initialized = false - init(eventLoop: EventLoop) { + init(context: Lambda.InitializationContext) { XCTAssertFalse(self.initialized) self.initialized = true } @@ -72,7 +72,7 @@ class LambdaTest: XCTestCase { XCTAssertNoThrow(try server.start().wait()) defer { XCTAssertNoThrow(try server.stop().wait()) } - let result = Lambda.run(factory: { $0.makeFailedFuture(TestError("kaboom")) }) + let result = Lambda.run(factory: { $0.eventLoop.makeFailedFuture(TestError("kaboom")) }) assertLambdaLifecycleResult(result, shouldFailWithError: TestError("kaboom")) } @@ -85,7 +85,7 @@ class LambdaTest: XCTestCase { typealias In = String typealias Out = Void - init(eventLoop: EventLoop) throws { + init(context: Lambda.InitializationContext) throws { throw TestError("kaboom") } @@ -124,7 +124,7 @@ class LambdaTest: XCTestCase { XCTAssertNoThrow(try server.start().wait()) defer { XCTAssertNoThrow(try server.stop().wait()) } - let result = Lambda.run(factory: { $0.makeFailedFuture(TestError("kaboom")) }) + let result = Lambda.run(factory: { $0.eventLoop.makeFailedFuture(TestError("kaboom")) }) assertLambdaLifecycleResult(result, shouldFailWithError: TestError("kaboom")) } @@ -143,7 +143,7 @@ class LambdaTest: XCTestCase { usleep(100_000) kill(getpid(), signal.rawValue) } - let result = Lambda.run(configuration: configuration, factory: { $0.makeSucceededFuture(EchoHandler()) }) + let result = Lambda.run(configuration: configuration, factory: { $0.eventLoop.makeSucceededFuture(EchoHandler()) }) switch result { case .success(let invocationCount): @@ -263,7 +263,8 @@ class LambdaTest: XCTestCase { cognitoIdentity: nil, clientContext: nil, logger: Logger(label: "test"), - eventLoop: MultiThreadedEventLoopGroup(numberOfThreads: 1).next()) + eventLoop: MultiThreadedEventLoopGroup(numberOfThreads: 1).next(), + allocator: ByteBufferAllocator()) XCTAssertGreaterThan(context.deadline, .now()) let expiredContext = Lambda.Context(requestID: context.requestID, @@ -273,7 +274,8 @@ class LambdaTest: XCTestCase { cognitoIdentity: context.cognitoIdentity, clientContext: context.clientContext, logger: context.logger, - eventLoop: context.eventLoop) + eventLoop: context.eventLoop, + allocator: context.allocator) XCTAssertLessThan(expiredContext.deadline, .now()) } @@ -285,7 +287,8 @@ class LambdaTest: XCTestCase { cognitoIdentity: nil, clientContext: nil, logger: Logger(label: "test"), - eventLoop: MultiThreadedEventLoopGroup(numberOfThreads: 1).next()) + eventLoop: MultiThreadedEventLoopGroup(numberOfThreads: 1).next(), + allocator: ByteBufferAllocator()) XCTAssertLessThanOrEqual(context.getRemainingTime(), .seconds(1)) XCTAssertGreaterThan(context.getRemainingTime(), .milliseconds(800)) } diff --git a/Tests/AWSLambdaRuntimeCoreTests/Utils.swift b/Tests/AWSLambdaRuntimeCoreTests/Utils.swift index 409461f5..e7160307 100644 --- a/Tests/AWSLambdaRuntimeCoreTests/Utils.swift +++ b/Tests/AWSLambdaRuntimeCoreTests/Utils.swift @@ -18,7 +18,7 @@ import NIO import XCTest func runLambda(behavior: LambdaServerBehavior, handler: Lambda.Handler) throws { - try runLambda(behavior: behavior, factory: { $0.makeSucceededFuture(handler) }) + try runLambda(behavior: behavior, factory: { $0.eventLoop.makeSucceededFuture(handler) }) } func runLambda(behavior: LambdaServerBehavior, factory: @escaping Lambda.HandlerFactory) throws { diff --git a/Tests/AWSLambdaRuntimeTests/Lambda+CodeableTest.swift b/Tests/AWSLambdaRuntimeTests/Lambda+CodeableTest.swift index 5a99dc42..9aa3f72a 100644 --- a/Tests/AWSLambdaRuntimeTests/Lambda+CodeableTest.swift +++ b/Tests/AWSLambdaRuntimeTests/Lambda+CodeableTest.swift @@ -72,7 +72,8 @@ class CodableLambdaTest: XCTestCase { cognitoIdentity: nil, clientContext: nil, logger: Logger(label: "test"), - eventLoop: self.eventLoopGroup.next()) + eventLoop: self.eventLoopGroup.next(), + allocator: ByteBufferAllocator()) } }