From 8199dbf4f91289e67fa135bafc39597c4a432295 Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Fri, 12 Jun 2020 10:48:10 +0200 Subject: [PATCH 1/3] Added `syncShutdown() throws` to the `ByteBufferLambdaHandler` --- .../AWSLambdaRuntimeCore/LambdaHandler.swift | 11 ++ .../LambdaLifecycle.swift | 33 ++++-- .../LambdaLifecycleTest.swift | 110 ++++++++++++++++++ 3 files changed, 145 insertions(+), 9 deletions(-) create mode 100644 Tests/AWSLambdaRuntimeCoreTests/LambdaLifecycleTest.swift diff --git a/Sources/AWSLambdaRuntimeCore/LambdaHandler.swift b/Sources/AWSLambdaRuntimeCore/LambdaHandler.swift index f14fb787..2b905729 100644 --- a/Sources/AWSLambdaRuntimeCore/LambdaHandler.swift +++ b/Sources/AWSLambdaRuntimeCore/LambdaHandler.swift @@ -164,6 +164,17 @@ public protocol ByteBufferLambdaHandler { /// - Returns: An `EventLoopFuture` to report the result of the Lambda back to the runtime engine. /// The `EventLoopFuture` should be completed with either a response encoded as `ByteBuffer` or an `Error` func handle(context: Lambda.Context, event: ByteBuffer) -> EventLoopFuture + + /// The method to clean up your resources. + /// Concrete Lambda handlers implement this method to shutdown their `HTTPClient`s and database connections. + /// + /// - Note: In case your Lambda fails while creating your LambdaHandler in the `HandlerFactory`, this method + /// **is not invoked**. In this case you must cleanup the created resources immediately in the `HandlerFactory`. + func syncShutdown() throws +} + +public extension ByteBufferLambdaHandler { + func syncShutdown() throws {} } private enum CodecError: Error { diff --git a/Sources/AWSLambdaRuntimeCore/LambdaLifecycle.swift b/Sources/AWSLambdaRuntimeCore/LambdaLifecycle.swift index 0c96a05f..396dbd30 100644 --- a/Sources/AWSLambdaRuntimeCore/LambdaLifecycle.swift +++ b/Sources/AWSLambdaRuntimeCore/LambdaLifecycle.swift @@ -29,7 +29,7 @@ extension Lambda { private var state = State.idle { willSet { - assert(self.eventLoop.inEventLoop, "State may only be changed on the `Lifecycle`'s `eventLoop`") + self.eventLoop.assertInEventLoop() precondition(newValue.order > self.state.order, "invalid state \(newValue) after \(self.state.order)") } } @@ -71,22 +71,37 @@ extension Lambda { /// /// - note: This method must be called on the `EventLoop` the `Lifecycle` has been initialized with. public func start() -> EventLoopFuture { - assert(self.eventLoop.inEventLoop, "Start must be called on the `EventLoop` the `Lifecycle` has been initialized with.") + self.eventLoop.assertInEventLoop() logger.info("lambda lifecycle starting with \(self.configuration)") self.state = .initializing - // triggered when the Lambda has finished its last run - let finishedPromise = self.eventLoop.makePromise(of: Int.self) - finishedPromise.futureResult.always { _ in - self.markShutdown() - }.cascade(to: self.shutdownPromise) + var logger = self.logger logger[metadataKey: "lifecycleId"] = .string(self.configuration.lifecycle.id) let runner = Runner(eventLoop: self.eventLoop, configuration: self.configuration) - return runner.initialize(logger: logger, factory: self.factory).map { handler in + + let startupFuture = runner.initialize(logger: logger, factory: self.factory) + startupFuture.flatMap { handler in + // after the startup future has succeeded, we have a handler that we can use + // to `run` the lambda. + let finishedPromise = self.eventLoop.makePromise(of: Int.self) self.state = .active(runner, handler) self.run(promise: finishedPromise) - } + return finishedPromise.futureResult.always { _ in + // If the lambda is terminated (e.g. LocalServer shutdown), we make sure + // developers have the chance to cleanup their resources. + do { + try handler.syncShutdown() + } catch { + logger.error("Error shutting down handler: \(error)") + } + } + }.always { _ in + // triggered when the Lambda has finished its last run or has a startup failure. + self.markShutdown() + }.cascade(to: self.shutdownPromise) + + return startupFuture.map { _ in } } // MARK: - Private diff --git a/Tests/AWSLambdaRuntimeCoreTests/LambdaLifecycleTest.swift b/Tests/AWSLambdaRuntimeCoreTests/LambdaLifecycleTest.swift new file mode 100644 index 00000000..28b62866 --- /dev/null +++ b/Tests/AWSLambdaRuntimeCoreTests/LambdaLifecycleTest.swift @@ -0,0 +1,110 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftAWSLambdaRuntime open source project +// +// Copyright (c) 2017-2018 Apple Inc. and the SwiftAWSLambdaRuntime project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +@testable import AWSLambdaRuntimeCore +import Logging +import NIO +import NIOHTTP1 +import XCTest + +class LambdaLifecycleTest: XCTestCase { + func testShutdownFutureIsFulfilledWithStartUpError() { + let server = MockLambdaServer(behavior: FailedBootstrapBehavior()) + XCTAssertNoThrow(try server.start().wait()) + defer { XCTAssertNoThrow(try server.stop().wait()) } + let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) + defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) } + + let eventLoop = eventLoopGroup.next() + let logger = Logger(label: "TestLogger") + let testError = TestError("kaboom") + let lifecycle = Lambda.Lifecycle(eventLoop: eventLoop, logger: logger, factory: { + $0.makeFailedFuture(testError) + }) + + // eventLoop.submit in this case returns an EventLoopFuture> + // which is why we need `wait().wait()` + XCTAssertThrowsError(_ = try eventLoop.flatSubmit { lifecycle.start() }.wait()) { error in + XCTAssertEqual(testError, error as? TestError) + } + + XCTAssertThrowsError(_ = try lifecycle.shutdownFuture.wait()) { error in + XCTAssertEqual(testError, error as? TestError) + } + } + + func testSyncShutdownIsCalledWhenLambdaShutsdown() { + struct CallbackLambdaHandler: ByteBufferLambdaHandler { + let handler: (Lambda.Context, ByteBuffer) -> (EventLoopFuture) + let shutdown: () throws -> Void + + init(_ handler: @escaping (Lambda.Context, ByteBuffer) -> (EventLoopFuture), shutdown: @escaping () throws -> Void) { + self.handler = handler + self.shutdown = shutdown + } + + func handle(context: Lambda.Context, event: ByteBuffer) -> EventLoopFuture { + self.handler(context, event) + } + + func syncShutdown() throws { + try self.shutdown() + } + } + + let server = MockLambdaServer(behavior: BadBehavior()) + XCTAssertNoThrow(try server.start().wait()) + defer { XCTAssertNoThrow(try server.stop().wait()) } + let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) + defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) } + + var count = 0 + let handler = CallbackLambdaHandler({ XCTFail("Should not be reached"); return $0.eventLoop.makeSucceededFuture($1) }) { + count += 1 + } + + let eventLoop = eventLoopGroup.next() + let logger = Logger(label: "TestLogger") + let lifecycle = Lambda.Lifecycle(eventLoop: eventLoop, logger: logger, factory: { + $0.makeSucceededFuture(handler) + }) + + XCTAssertNoThrow(_ = try eventLoop.flatSubmit { lifecycle.start() }.wait()) + XCTAssertThrowsError(_ = try lifecycle.shutdownFuture.wait()) { error in + XCTAssertEqual(.badStatusCode(HTTPResponseStatus.internalServerError), error as? Lambda.RuntimeError) + } + XCTAssertEqual(count, 1) + } +} + +struct BadBehavior: LambdaServerBehavior { + func getInvocation() -> GetInvocationResult { + .failure(.internalServerError) + } + + func processResponse(requestId: String, response: String?) -> Result { + XCTFail("should not report a response") + return .failure(.internalServerError) + } + + func processError(requestId: String, error: ErrorResponse) -> Result { + XCTFail("should not report an error") + return .failure(.internalServerError) + } + + func processInitError(error: ErrorResponse) -> Result { + XCTFail("should not report an error") + return .failure(.internalServerError) + } +} From 1629c5a97631dbf482cc6c9a48dd24104e4cf01f Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Mon, 15 Jun 2020 10:55:23 +0200 Subject: [PATCH 2/3] Updated to use a `ShutdownContext` --- .../AWSLambdaRuntimeCore/LambdaContext.swift | 24 +++++++++++++++++++ .../AWSLambdaRuntimeCore/LambdaHandler.swift | 6 +++-- .../LambdaLifecycle.swift | 18 +++++++------- .../AWSLambdaRuntimeCore/LambdaRunner.swift | 2 +- .../LambdaLifecycleTest.swift | 11 +++++---- 5 files changed, 43 insertions(+), 18 deletions(-) diff --git a/Sources/AWSLambdaRuntimeCore/LambdaContext.swift b/Sources/AWSLambdaRuntimeCore/LambdaContext.swift index a5ce528a..ab30dd7b 100644 --- a/Sources/AWSLambdaRuntimeCore/LambdaContext.swift +++ b/Sources/AWSLambdaRuntimeCore/LambdaContext.swift @@ -122,3 +122,27 @@ extension Lambda { } } } + +// MARK: - ShutdownContext + +extension Lambda { + /// Lambda runtime shutdown context. + /// The Lambda runtime generates and passes the `ShutdownContext` to the Lambda handler as an argument. + public final class ShutdownContext { + /// `Logger` to log with + /// + /// - note: The `LogLevel` can be configured using the `LOG_LEVEL` environment variable. + public let logger: Logger + + /// The `EventLoop` the Lambda is executed on. Use this to schedule work with. + /// + /// - note: The `EventLoop` is shared with the Lambda runtime engine and should be handled with extra care. + /// Most importantly the `EventLoop` must never be blocked. + public let eventLoop: EventLoop + + internal init(logger: Logger, eventLoop: EventLoop) { + self.eventLoop = eventLoop + self.logger = logger + } + } +} diff --git a/Sources/AWSLambdaRuntimeCore/LambdaHandler.swift b/Sources/AWSLambdaRuntimeCore/LambdaHandler.swift index 2b905729..f7559218 100644 --- a/Sources/AWSLambdaRuntimeCore/LambdaHandler.swift +++ b/Sources/AWSLambdaRuntimeCore/LambdaHandler.swift @@ -170,11 +170,13 @@ public protocol ByteBufferLambdaHandler { /// /// - Note: In case your Lambda fails while creating your LambdaHandler in the `HandlerFactory`, this method /// **is not invoked**. In this case you must cleanup the created resources immediately in the `HandlerFactory`. - func syncShutdown() throws + func shutdown(context: Lambda.ShutdownContext) -> EventLoopFuture } public extension ByteBufferLambdaHandler { - func syncShutdown() throws {} + func shutdown(context: Lambda.ShutdownContext) -> EventLoopFuture { + context.eventLoop.makeSucceededFuture(Void()) + } } private enum CodecError: Error { diff --git a/Sources/AWSLambdaRuntimeCore/LambdaLifecycle.swift b/Sources/AWSLambdaRuntimeCore/LambdaLifecycle.swift index 396dbd30..9639e179 100644 --- a/Sources/AWSLambdaRuntimeCore/LambdaLifecycle.swift +++ b/Sources/AWSLambdaRuntimeCore/LambdaLifecycle.swift @@ -81,21 +81,19 @@ extension Lambda { let runner = Runner(eventLoop: self.eventLoop, configuration: self.configuration) let startupFuture = runner.initialize(logger: logger, factory: self.factory) - startupFuture.flatMap { handler in + startupFuture.flatMap { handler -> EventLoopFuture<(ByteBufferLambdaHandler, Result)> in // after the startup future has succeeded, we have a handler that we can use // to `run` the lambda. let finishedPromise = self.eventLoop.makePromise(of: Int.self) self.state = .active(runner, handler) self.run(promise: finishedPromise) - return finishedPromise.futureResult.always { _ in - // If the lambda is terminated (e.g. LocalServer shutdown), we make sure - // developers have the chance to cleanup their resources. - do { - try handler.syncShutdown() - } catch { - logger.error("Error shutting down handler: \(error)") - } - } + return finishedPromise.futureResult.mapResult { (handler, $0) } + } + .flatMap { (handler, result) -> EventLoopFuture in + let shutdownContext = ShutdownContext(logger: logger, eventLoop: self.eventLoop) + return handler.shutdown(context: shutdownContext).recover { error in + logger.error("Error shutting down handler: \(error)") + }.flatMapResult { _ in result } }.always { _ in // triggered when the Lambda has finished its last run or has a startup failure. self.markShutdown() diff --git a/Sources/AWSLambdaRuntimeCore/LambdaRunner.swift b/Sources/AWSLambdaRuntimeCore/LambdaRunner.swift index 08aacd75..8fc22de3 100644 --- a/Sources/AWSLambdaRuntimeCore/LambdaRunner.swift +++ b/Sources/AWSLambdaRuntimeCore/LambdaRunner.swift @@ -115,7 +115,7 @@ private extension Lambda.Context { } // TODO: move to nio? -private extension EventLoopFuture { +extension EventLoopFuture { // callback does not have side effects, failing with original result func peekError(_ callback: @escaping (Error) -> Void) -> EventLoopFuture { self.flatMapError { error in diff --git a/Tests/AWSLambdaRuntimeCoreTests/LambdaLifecycleTest.swift b/Tests/AWSLambdaRuntimeCoreTests/LambdaLifecycleTest.swift index 28b62866..152bc08a 100644 --- a/Tests/AWSLambdaRuntimeCoreTests/LambdaLifecycleTest.swift +++ b/Tests/AWSLambdaRuntimeCoreTests/LambdaLifecycleTest.swift @@ -47,9 +47,9 @@ class LambdaLifecycleTest: XCTestCase { func testSyncShutdownIsCalledWhenLambdaShutsdown() { struct CallbackLambdaHandler: ByteBufferLambdaHandler { let handler: (Lambda.Context, ByteBuffer) -> (EventLoopFuture) - let shutdown: () throws -> Void + let shutdown: (Lambda.ShutdownContext) -> EventLoopFuture - init(_ handler: @escaping (Lambda.Context, ByteBuffer) -> (EventLoopFuture), shutdown: @escaping () throws -> Void) { + init(_ handler: @escaping (Lambda.Context, ByteBuffer) -> (EventLoopFuture), shutdown: @escaping (Lambda.ShutdownContext) -> EventLoopFuture) { self.handler = handler self.shutdown = shutdown } @@ -58,8 +58,8 @@ class LambdaLifecycleTest: XCTestCase { self.handler(context, event) } - func syncShutdown() throws { - try self.shutdown() + func shutdown(context: Lambda.ShutdownContext) -> EventLoopFuture { + self.shutdown(context) } } @@ -70,8 +70,9 @@ class LambdaLifecycleTest: XCTestCase { defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) } var count = 0 - let handler = CallbackLambdaHandler({ XCTFail("Should not be reached"); return $0.eventLoop.makeSucceededFuture($1) }) { + let handler = CallbackLambdaHandler({ XCTFail("Should not be reached"); return $0.eventLoop.makeSucceededFuture($1) }) { context in count += 1 + return context.eventLoop.makeSucceededFuture(Void()) } let eventLoop = eventLoopGroup.next() From ed7d57a5a435911f03bae1859c3b34e395f04dff Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Wed, 17 Jun 2020 11:03:36 +0200 Subject: [PATCH 3/3] Review comments addressed. --- .../LambdaLifecycle.swift | 14 ++++- .../LambdaRuntimeClient.swift | 1 + .../LambdaLifecycleTest.swift | 63 ++++++++++++++----- 3 files changed, 59 insertions(+), 19 deletions(-) diff --git a/Sources/AWSLambdaRuntimeCore/LambdaLifecycle.swift b/Sources/AWSLambdaRuntimeCore/LambdaLifecycle.swift index 9639e179..ec609901 100644 --- a/Sources/AWSLambdaRuntimeCore/LambdaLifecycle.swift +++ b/Sources/AWSLambdaRuntimeCore/LambdaLifecycle.swift @@ -89,11 +89,19 @@ extension Lambda { self.run(promise: finishedPromise) return finishedPromise.futureResult.mapResult { (handler, $0) } } - .flatMap { (handler, result) -> EventLoopFuture in + .flatMap { (handler, runnerResult) -> EventLoopFuture in + // after the lambda finishPromise has succeeded or failed we need to + // shutdown the handler let shutdownContext = ShutdownContext(logger: logger, eventLoop: self.eventLoop) - return handler.shutdown(context: shutdownContext).recover { error in + return handler.shutdown(context: shutdownContext).flatMapErrorThrowing { error in + // if, we had an error shuting down the lambda, we want to concatenate it with + // the runner result logger.error("Error shutting down handler: \(error)") - }.flatMapResult { _ in result } + throw RuntimeError.shutdownError(shutdownError: error, runnerResult: runnerResult) + }.flatMapResult { (_) -> Result in + // we had no error shutting down the lambda. let's return the runner's result + runnerResult + } }.always { _ in // triggered when the Lambda has finished its last run or has a startup failure. self.markShutdown() diff --git a/Sources/AWSLambdaRuntimeCore/LambdaRuntimeClient.swift b/Sources/AWSLambdaRuntimeCore/LambdaRuntimeClient.swift index fe43ac0a..5e9e6aea 100644 --- a/Sources/AWSLambdaRuntimeCore/LambdaRuntimeClient.swift +++ b/Sources/AWSLambdaRuntimeCore/LambdaRuntimeClient.swift @@ -133,6 +133,7 @@ internal extension Lambda { case invocationMissingHeader(String) case noBody case json(Error) + case shutdownError(shutdownError: Error, runnerResult: Result) } } diff --git a/Tests/AWSLambdaRuntimeCoreTests/LambdaLifecycleTest.swift b/Tests/AWSLambdaRuntimeCoreTests/LambdaLifecycleTest.swift index 152bc08a..a485530d 100644 --- a/Tests/AWSLambdaRuntimeCoreTests/LambdaLifecycleTest.swift +++ b/Tests/AWSLambdaRuntimeCoreTests/LambdaLifecycleTest.swift @@ -30,7 +30,7 @@ class LambdaLifecycleTest: XCTestCase { let logger = Logger(label: "TestLogger") let testError = TestError("kaboom") let lifecycle = Lambda.Lifecycle(eventLoop: eventLoop, logger: logger, factory: { - $0.makeFailedFuture(testError) + $0.eventLoop.makeFailedFuture(testError) }) // eventLoop.submit in this case returns an EventLoopFuture> @@ -44,25 +44,25 @@ class LambdaLifecycleTest: XCTestCase { } } - func testSyncShutdownIsCalledWhenLambdaShutsdown() { - struct CallbackLambdaHandler: ByteBufferLambdaHandler { - let handler: (Lambda.Context, ByteBuffer) -> (EventLoopFuture) - let shutdown: (Lambda.ShutdownContext) -> EventLoopFuture + struct CallbackLambdaHandler: ByteBufferLambdaHandler { + let handler: (Lambda.Context, ByteBuffer) -> (EventLoopFuture) + let shutdown: (Lambda.ShutdownContext) -> EventLoopFuture - init(_ handler: @escaping (Lambda.Context, ByteBuffer) -> (EventLoopFuture), shutdown: @escaping (Lambda.ShutdownContext) -> EventLoopFuture) { - self.handler = handler - self.shutdown = shutdown - } + init(_ handler: @escaping (Lambda.Context, ByteBuffer) -> (EventLoopFuture), shutdown: @escaping (Lambda.ShutdownContext) -> EventLoopFuture) { + self.handler = handler + self.shutdown = shutdown + } - func handle(context: Lambda.Context, event: ByteBuffer) -> EventLoopFuture { - self.handler(context, event) - } + func handle(context: Lambda.Context, event: ByteBuffer) -> EventLoopFuture { + self.handler(context, event) + } - func shutdown(context: Lambda.ShutdownContext) -> EventLoopFuture { - self.shutdown(context) - } + func shutdown(context: Lambda.ShutdownContext) -> EventLoopFuture { + self.shutdown(context) } + } + func testShutdownIsCalledWhenLambdaShutsdown() { let server = MockLambdaServer(behavior: BadBehavior()) XCTAssertNoThrow(try server.start().wait()) defer { XCTAssertNoThrow(try server.stop().wait()) } @@ -78,7 +78,7 @@ class LambdaLifecycleTest: XCTestCase { let eventLoop = eventLoopGroup.next() let logger = Logger(label: "TestLogger") let lifecycle = Lambda.Lifecycle(eventLoop: eventLoop, logger: logger, factory: { - $0.makeSucceededFuture(handler) + $0.eventLoop.makeSucceededFuture(handler) }) XCTAssertNoThrow(_ = try eventLoop.flatSubmit { lifecycle.start() }.wait()) @@ -87,6 +87,37 @@ class LambdaLifecycleTest: XCTestCase { } XCTAssertEqual(count, 1) } + + func testLambdaResultIfShutsdownIsUnclean() { + let server = MockLambdaServer(behavior: BadBehavior()) + XCTAssertNoThrow(try server.start().wait()) + defer { XCTAssertNoThrow(try server.stop().wait()) } + let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) + defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) } + + var count = 0 + let handler = CallbackLambdaHandler({ XCTFail("Should not be reached"); return $0.eventLoop.makeSucceededFuture($1) }) { context in + count += 1 + return context.eventLoop.makeFailedFuture(TestError("kaboom")) + } + + let eventLoop = eventLoopGroup.next() + let logger = Logger(label: "TestLogger") + let lifecycle = Lambda.Lifecycle(eventLoop: eventLoop, logger: logger, factory: { + $0.eventLoop.makeSucceededFuture(handler) + }) + + XCTAssertNoThrow(_ = try eventLoop.flatSubmit { lifecycle.start() }.wait()) + XCTAssertThrowsError(_ = try lifecycle.shutdownFuture.wait()) { error in + guard case Lambda.RuntimeError.shutdownError(let shutdownError, .failure(let runtimeError)) = error else { + XCTFail("Unexpected error"); return + } + + XCTAssertEqual(shutdownError as? TestError, TestError("kaboom")) + XCTAssertEqual(runtimeError as? Lambda.RuntimeError, .badStatusCode(.internalServerError)) + } + XCTAssertEqual(count, 1) + } } struct BadBehavior: LambdaServerBehavior {