From 40d035aa8a9debd32faae8cf88bc9a3db6725444 Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Tue, 13 Apr 2021 03:11:17 +0200 Subject: [PATCH] [RFC] Playing with ServiceLifecyle --- Package.swift | 3 + .../Lambda+LocalServer.swift | 4 +- Sources/AWSLambdaRuntimeCore/Lambda.swift | 85 +++++++++---------- .../AWSLambdaRuntimeCore/LambdaContext.swift | 6 +- .../AWSLambdaRuntimeCore/LambdaHandler.swift | 34 -------- .../LambdaLifecycle.swift | 31 ++++--- .../AWSLambdaRuntimeCore/LambdaRunner.swift | 4 +- Tests/AWSLambdaRuntimeCoreTests/Utils.swift | 4 +- 8 files changed, 73 insertions(+), 98 deletions(-) diff --git a/Package.swift b/Package.swift index 0e5823f6..dc5a1fc6 100644 --- a/Package.swift +++ b/Package.swift @@ -18,6 +18,7 @@ let package = Package( .package(url: "https://github.com/apple/swift-nio.git", .upToNextMajor(from: "2.17.0")), .package(url: "https://github.com/apple/swift-log.git", .upToNextMajor(from: "1.0.0")), .package(url: "https://github.com/swift-server/swift-backtrace.git", .upToNextMajor(from: "1.1.0")), + .package(url: "https://github.com/swift-server/swift-service-lifecycle.git", .upToNextMajor(from: "1.0.0-alpha.7")), ], targets: [ .target(name: "AWSLambdaRuntime", dependencies: [ @@ -29,6 +30,8 @@ let package = Package( .product(name: "Logging", package: "swift-log"), .product(name: "Backtrace", package: "swift-backtrace"), .product(name: "NIOHTTP1", package: "swift-nio"), + .product(name: "Lifecycle", package: "swift-service-lifecycle"), + .product(name: "LifecycleNIOCompat", package: "swift-service-lifecycle"), ]), .testTarget(name: "AWSLambdaRuntimeCoreTests", dependencies: [ .byName(name: "AWSLambdaRuntimeCore"), diff --git a/Sources/AWSLambdaRuntimeCore/Lambda+LocalServer.swift b/Sources/AWSLambdaRuntimeCore/Lambda+LocalServer.swift index 48cea94e..4aff4d53 100644 --- a/Sources/AWSLambdaRuntimeCore/Lambda+LocalServer.swift +++ b/Sources/AWSLambdaRuntimeCore/Lambda+LocalServer.swift @@ -45,7 +45,7 @@ extension Lambda { // MARK: - Local Mock Server -private enum LocalLambda { +enum LocalLambda { struct Server { private let logger: Logger private let group: EventLoopGroup @@ -53,7 +53,7 @@ private enum LocalLambda { private let port: Int private let invocationEndpoint: String - public init(invocationEndpoint: String?) { + init(invocationEndpoint: String?) { let configuration = Lambda.Configuration() var logger = Logger(label: "LocalLambdaServer") logger.logLevel = configuration.general.logLevel diff --git a/Sources/AWSLambdaRuntimeCore/Lambda.swift b/Sources/AWSLambdaRuntimeCore/Lambda.swift index 5dc27648..71628b12 100644 --- a/Sources/AWSLambdaRuntimeCore/Lambda.swift +++ b/Sources/AWSLambdaRuntimeCore/Lambda.swift @@ -18,7 +18,8 @@ import Glibc import Darwin.C #endif -import Backtrace +import Lifecycle +import LifecycleNIOCompat import Logging import NIO @@ -100,55 +101,45 @@ public enum Lambda { // for testing and internal use internal static func run(configuration: Configuration = .init(), factory: @escaping HandlerFactory) -> Result { - let _run = { (configuration: Configuration, factory: @escaping HandlerFactory) -> Result in - Backtrace.install() - var logger = Logger(label: "Lambda") - logger.logLevel = configuration.general.logLevel - - var result: Result! - MultiThreadedEventLoopGroup.withCurrentThreadAsEventLoop { eventLoop in - let lifecycle = Lifecycle(eventLoop: eventLoop, logger: logger, configuration: configuration, factory: factory) - #if DEBUG - let signalSource = trap(signal: configuration.lifecycle.stopSignal) { signal in - logger.info("intercepted signal: \(signal)") - lifecycle.shutdown() - } - #endif - - lifecycle.start().flatMap { - lifecycle.shutdownFuture - }.whenComplete { lifecycleResult in - #if DEBUG - signalSource.cancel() - #endif - eventLoop.shutdownGracefully { error in - if let error = error { - preconditionFailure("Failed to shutdown eventloop: \(error)") - } - } - result = lifecycleResult - } - } - - logger.info("shutdown completed") - return result - } - - // start local server for debugging in DEBUG mode only + var logger = Logger(label: "Lambda") + logger.logLevel = configuration.general.logLevel + + let serviceLifecycle = ServiceLifecycle(configuration: .init(label: "lambda.lifecycle", logger: logger, installBacktrace: true)) + #if DEBUG if Lambda.env("LOCAL_LAMBDA_SERVER_ENABLED").flatMap(Bool.init) ?? false { - do { - return try Lambda.withLocalServer { - _run(configuration, factory) - } - } catch { - return .failure(error) - } - } else { - return _run(configuration, factory) + let server = LocalLambda.Server(invocationEndpoint: nil) + serviceLifecycle.register( + label: "LocalServer", + start: .eventLoopFuture(server.start), + shutdown: .sync(server.stop)) } - #else - return _run(configuration, factory) #endif + + var result: Result! + + MultiThreadedEventLoopGroup.withCurrentThreadAsEventLoop { eventLoop in + + serviceLifecycle.registerShutdown(label: "EventLoop", .async( eventLoop.shutdownGracefully )) + + let lambdaLifecycle = Lifecycle(eventLoop: eventLoop, logger: logger, configuration: configuration, factory: factory) + serviceLifecycle.register( + label: "LambdaLifecycle", + start: .eventLoopFuture(lambdaLifecycle.start), + shutdown: .async { cb in + lambdaLifecycle.shutdown() + lambdaLifecycle.shutdownFuture.whenComplete { localResult in + result = localResult + cb(nil) + } + }) + + serviceLifecycle.start { error in } + } + + serviceLifecycle.wait() + + logger.info("shutdown completed") + return result } } diff --git a/Sources/AWSLambdaRuntimeCore/LambdaContext.swift b/Sources/AWSLambdaRuntimeCore/LambdaContext.swift index 19acc469..48c8cef6 100644 --- a/Sources/AWSLambdaRuntimeCore/LambdaContext.swift +++ b/Sources/AWSLambdaRuntimeCore/LambdaContext.swift @@ -13,6 +13,7 @@ //===----------------------------------------------------------------------===// import Dispatch +import Lifecycle import Logging import NIO @@ -32,13 +33,16 @@ extension Lambda { /// - note: The `EventLoop` is shared with the Lambda runtime engine and should be handled with extra care. /// Most importantly the `EventLoop` must never be blocked. public let eventLoop: EventLoop + + public let lifeycle: ComponentLifecycle /// `ByteBufferAllocator` to allocate `ByteBuffer` public let allocator: ByteBufferAllocator - internal init(logger: Logger, eventLoop: EventLoop, allocator: ByteBufferAllocator) { + internal init(logger: Logger, eventLoop: EventLoop, lifecycle: ComponentLifecycle, allocator: ByteBufferAllocator) { self.eventLoop = eventLoop self.logger = logger + self.lifeycle = lifecycle self.allocator = allocator } } diff --git a/Sources/AWSLambdaRuntimeCore/LambdaHandler.swift b/Sources/AWSLambdaRuntimeCore/LambdaHandler.swift index 87c8c102..0079c07a 100644 --- a/Sources/AWSLambdaRuntimeCore/LambdaHandler.swift +++ b/Sources/AWSLambdaRuntimeCore/LambdaHandler.swift @@ -62,27 +62,6 @@ extension LambdaHandler { } } -extension LambdaHandler { - public func shutdown(context: Lambda.ShutdownContext) -> EventLoopFuture { - let promise = context.eventLoop.makePromise(of: Void.self) - self.offloadQueue.async { - do { - try self.syncShutdown(context: context) - promise.succeed(()) - } catch { - promise.fail(error) - } - } - return promise.futureResult - } - - /// Clean up the Lambda resources synchronously. - /// Concrete Lambda handlers implement this method to shutdown resources like `HTTPClient`s and database connections. - public func syncShutdown(context: Lambda.ShutdownContext) throws { - // noop - } -} - // MARK: - EventLoopLambdaHandler /// Strongly typed, `EventLoopFuture` based processing protocol for a Lambda that takes a user defined `In` and returns a user defined `Out` asynchronously. @@ -185,19 +164,6 @@ public protocol ByteBufferLambdaHandler { /// - Returns: An `EventLoopFuture` to report the result of the Lambda back to the runtime engine. /// The `EventLoopFuture` should be completed with either a response encoded as `ByteBuffer` or an `Error` func handle(context: Lambda.Context, event: ByteBuffer) -> EventLoopFuture - - /// Clean up the Lambda resources asynchronously. - /// Concrete Lambda handlers implement this method to shutdown resources like `HTTPClient`s and database connections. - /// - /// - Note: In case your Lambda fails while creating your LambdaHandler in the `HandlerFactory`, this method - /// **is not invoked**. In this case you must cleanup the created resources immediately in the `HandlerFactory`. - func shutdown(context: Lambda.ShutdownContext) -> EventLoopFuture -} - -extension ByteBufferLambdaHandler { - public func shutdown(context: Lambda.ShutdownContext) -> EventLoopFuture { - context.eventLoop.makeSucceededFuture(()) - } } private enum CodecError: Error { diff --git a/Sources/AWSLambdaRuntimeCore/LambdaLifecycle.swift b/Sources/AWSLambdaRuntimeCore/LambdaLifecycle.swift index ec609901..839895c4 100644 --- a/Sources/AWSLambdaRuntimeCore/LambdaLifecycle.swift +++ b/Sources/AWSLambdaRuntimeCore/LambdaLifecycle.swift @@ -15,6 +15,7 @@ import Logging import NIO import NIOConcurrencyHelpers +import Lifecycle extension Lambda { /// `Lifecycle` manages the Lambda process lifecycle. @@ -26,6 +27,7 @@ extension Lambda { private let logger: Logger private let configuration: Configuration private let factory: HandlerFactory + private let lifecycle: ComponentLifecycle private var state = State.idle { willSet { @@ -50,6 +52,7 @@ extension Lambda { self.logger = logger self.configuration = configuration self.factory = factory + self.lifecycle = ComponentLifecycle(label: "User lifecycle", logger: logger) } deinit { @@ -71,7 +74,11 @@ extension Lambda { /// /// - note: This method must be called on the `EventLoop` the `Lifecycle` has been initialized with. public func start() -> EventLoopFuture { - self.eventLoop.assertInEventLoop() + guard self.eventLoop.inEventLoop else { + return self.eventLoop.flatSubmit { + self.start() + } + } logger.info("lambda lifecycle starting with \(self.configuration)") self.state = .initializing @@ -80,25 +87,25 @@ extension Lambda { logger[metadataKey: "lifecycleId"] = .string(self.configuration.lifecycle.id) let runner = Runner(eventLoop: self.eventLoop, configuration: self.configuration) - let startupFuture = runner.initialize(logger: logger, factory: self.factory) - startupFuture.flatMap { handler -> EventLoopFuture<(ByteBufferLambdaHandler, Result)> in + let startupFuture = runner.initialize(logger: logger, lifecycle: lifecycle, factory: self.factory).flatMap { handler in + self.lifecycle.start(on: self.eventLoop).map { _ in handler } + } + startupFuture.flatMap { handler -> EventLoopFuture> in // after the startup future has succeeded, we have a handler that we can use // to `run` the lambda. let finishedPromise = self.eventLoop.makePromise(of: Int.self) self.state = .active(runner, handler) self.run(promise: finishedPromise) - return finishedPromise.futureResult.mapResult { (handler, $0) } + return finishedPromise.futureResult.mapResult { ($0) } } - .flatMap { (handler, runnerResult) -> EventLoopFuture in + .flatMap { (runnerResult) -> EventLoopFuture in // after the lambda finishPromise has succeeded or failed we need to // shutdown the handler - let shutdownContext = ShutdownContext(logger: logger, eventLoop: self.eventLoop) - return handler.shutdown(context: shutdownContext).flatMapErrorThrowing { error in - // if, we had an error shuting down the lambda, we want to concatenate it with - // the runner result - logger.error("Error shutting down handler: \(error)") - throw RuntimeError.shutdownError(shutdownError: error, runnerResult: runnerResult) - }.flatMapResult { (_) -> Result in + + let promise = self.eventLoop.makePromise(of: Void.self) + self.lifecycle.shutdown { promise.completeWith($0 != nil ? .failure($0!) : .success(()))} + + return promise.futureResult.flatMapResult { (_) -> Result in // we had no error shutting down the lambda. let's return the runner's result runnerResult } diff --git a/Sources/AWSLambdaRuntimeCore/LambdaRunner.swift b/Sources/AWSLambdaRuntimeCore/LambdaRunner.swift index ec5e9898..25da002f 100644 --- a/Sources/AWSLambdaRuntimeCore/LambdaRunner.swift +++ b/Sources/AWSLambdaRuntimeCore/LambdaRunner.swift @@ -13,6 +13,7 @@ //===----------------------------------------------------------------------===// import Dispatch +import Lifecycle import Logging import NIO @@ -34,12 +35,13 @@ extension Lambda { /// Run the user provided initializer. This *must* only be called once. /// /// - Returns: An `EventLoopFuture` fulfilled with the outcome of the initialization. - func initialize(logger: Logger, factory: @escaping HandlerFactory) -> EventLoopFuture { + func initialize(logger: Logger, lifecycle: ComponentLifecycle, factory: @escaping HandlerFactory) -> EventLoopFuture { logger.debug("initializing lambda") // 1. create the handler from the factory // 2. report initialization error if one occured let context = InitializationContext(logger: logger, eventLoop: self.eventLoop, + lifecycle: lifecycle, allocator: self.allocator) return factory(context) // Hopping back to "our" EventLoop is important in case the factory returns a future diff --git a/Tests/AWSLambdaRuntimeCoreTests/Utils.swift b/Tests/AWSLambdaRuntimeCoreTests/Utils.swift index 7fc9f9ef..ec20d06c 100644 --- a/Tests/AWSLambdaRuntimeCoreTests/Utils.swift +++ b/Tests/AWSLambdaRuntimeCoreTests/Utils.swift @@ -14,6 +14,7 @@ @testable import AWSLambdaRuntimeCore import Logging +import Lifecycle import NIO import XCTest @@ -27,9 +28,10 @@ func runLambda(behavior: LambdaServerBehavior, factory: @escaping Lambda.Handler let logger = Logger(label: "TestLogger") let configuration = Lambda.Configuration(runtimeEngine: .init(requestTimeout: .milliseconds(100))) let runner = Lambda.Runner(eventLoop: eventLoopGroup.next(), configuration: configuration) + let lifecycle = ComponentLifecycle(label: "TestLifecycle") let server = try MockLambdaServer(behavior: behavior).start().wait() defer { XCTAssertNoThrow(try server.stop().wait()) } - try runner.initialize(logger: logger, factory: factory).flatMap { handler in + try runner.initialize(logger: logger, lifecycle: lifecycle, factory: factory).flatMap { handler in runner.run(logger: logger, handler: handler) }.wait() }