Skip to content

[RFC] Playing with ServiceLifecycle #200

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ let package = Package(
.package(url: "https://github.com/apple/swift-nio.git", .upToNextMajor(from: "2.17.0")),
.package(url: "https://github.com/apple/swift-log.git", .upToNextMajor(from: "1.0.0")),
.package(url: "https://github.com/swift-server/swift-backtrace.git", .upToNextMajor(from: "1.1.0")),
.package(url: "https://github.com/swift-server/swift-service-lifecycle.git", .upToNextMajor(from: "1.0.0-alpha.7")),
],
targets: [
.target(name: "AWSLambdaRuntime", dependencies: [
Expand All @@ -29,6 +30,8 @@ let package = Package(
.product(name: "Logging", package: "swift-log"),
.product(name: "Backtrace", package: "swift-backtrace"),
.product(name: "NIOHTTP1", package: "swift-nio"),
.product(name: "Lifecycle", package: "swift-service-lifecycle"),
.product(name: "LifecycleNIOCompat", package: "swift-service-lifecycle"),
]),
.testTarget(name: "AWSLambdaRuntimeCoreTests", dependencies: [
.byName(name: "AWSLambdaRuntimeCore"),
Expand Down
4 changes: 2 additions & 2 deletions Sources/AWSLambdaRuntimeCore/Lambda+LocalServer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,15 @@ extension Lambda {

// MARK: - Local Mock Server

private enum LocalLambda {
enum LocalLambda {
struct Server {
private let logger: Logger
private let group: EventLoopGroup
private let host: String
private let port: Int
private let invocationEndpoint: String

public init(invocationEndpoint: String?) {
init(invocationEndpoint: String?) {
let configuration = Lambda.Configuration()
var logger = Logger(label: "LocalLambdaServer")
logger.logLevel = configuration.general.logLevel
Expand Down
85 changes: 38 additions & 47 deletions Sources/AWSLambdaRuntimeCore/Lambda.swift
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ import Glibc
import Darwin.C
#endif

import Backtrace
import Lifecycle
import LifecycleNIOCompat
import Logging
import NIO

Expand Down Expand Up @@ -100,55 +101,45 @@ public enum Lambda {

// for testing and internal use
internal static func run(configuration: Configuration = .init(), factory: @escaping HandlerFactory) -> Result<Int, Error> {
let _run = { (configuration: Configuration, factory: @escaping HandlerFactory) -> Result<Int, Error> in
Backtrace.install()
var logger = Logger(label: "Lambda")
logger.logLevel = configuration.general.logLevel

var result: Result<Int, Error>!
MultiThreadedEventLoopGroup.withCurrentThreadAsEventLoop { eventLoop in
let lifecycle = Lifecycle(eventLoop: eventLoop, logger: logger, configuration: configuration, factory: factory)
#if DEBUG
let signalSource = trap(signal: configuration.lifecycle.stopSignal) { signal in
logger.info("intercepted signal: \(signal)")
lifecycle.shutdown()
}
#endif

lifecycle.start().flatMap {
lifecycle.shutdownFuture
}.whenComplete { lifecycleResult in
#if DEBUG
signalSource.cancel()
#endif
eventLoop.shutdownGracefully { error in
if let error = error {
preconditionFailure("Failed to shutdown eventloop: \(error)")
}
}
result = lifecycleResult
}
}

logger.info("shutdown completed")
return result
}

// start local server for debugging in DEBUG mode only
var logger = Logger(label: "Lambda")
logger.logLevel = configuration.general.logLevel

let serviceLifecycle = ServiceLifecycle(configuration: .init(label: "lambda.lifecycle", logger: logger, installBacktrace: true))

#if DEBUG
if Lambda.env("LOCAL_LAMBDA_SERVER_ENABLED").flatMap(Bool.init) ?? false {
do {
return try Lambda.withLocalServer {
_run(configuration, factory)
}
} catch {
return .failure(error)
}
} else {
return _run(configuration, factory)
let server = LocalLambda.Server(invocationEndpoint: nil)
serviceLifecycle.register(
label: "LocalServer",
start: .eventLoopFuture(server.start),
shutdown: .sync(server.stop))
}
#else
return _run(configuration, factory)
#endif

var result: Result<Int, Error>!

MultiThreadedEventLoopGroup.withCurrentThreadAsEventLoop { eventLoop in

serviceLifecycle.registerShutdown(label: "EventLoop", .async( eventLoop.shutdownGracefully ))

let lambdaLifecycle = Lifecycle(eventLoop: eventLoop, logger: logger, configuration: configuration, factory: factory)
serviceLifecycle.register(
label: "LambdaLifecycle",
start: .eventLoopFuture(lambdaLifecycle.start),
shutdown: .async { cb in
lambdaLifecycle.shutdown()
lambdaLifecycle.shutdownFuture.whenComplete { localResult in
result = localResult
cb(nil)
}
})

serviceLifecycle.start { error in }
}

serviceLifecycle.wait()

logger.info("shutdown completed")
return result
}
}
6 changes: 5 additions & 1 deletion Sources/AWSLambdaRuntimeCore/LambdaContext.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
//===----------------------------------------------------------------------===//

import Dispatch
import Lifecycle
import Logging
import NIO

Expand All @@ -32,13 +33,16 @@ extension Lambda {
/// - note: The `EventLoop` is shared with the Lambda runtime engine and should be handled with extra care.
/// Most importantly the `EventLoop` must never be blocked.
public let eventLoop: EventLoop

public let lifeycle: ComponentLifecycle

/// `ByteBufferAllocator` to allocate `ByteBuffer`
public let allocator: ByteBufferAllocator

internal init(logger: Logger, eventLoop: EventLoop, allocator: ByteBufferAllocator) {
internal init(logger: Logger, eventLoop: EventLoop, lifecycle: ComponentLifecycle, allocator: ByteBufferAllocator) {
self.eventLoop = eventLoop
self.logger = logger
self.lifeycle = lifecycle
self.allocator = allocator
}
}
Expand Down
34 changes: 0 additions & 34 deletions Sources/AWSLambdaRuntimeCore/LambdaHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -62,27 +62,6 @@ extension LambdaHandler {
}
}

extension LambdaHandler {
public func shutdown(context: Lambda.ShutdownContext) -> EventLoopFuture<Void> {
let promise = context.eventLoop.makePromise(of: Void.self)
self.offloadQueue.async {
do {
try self.syncShutdown(context: context)
promise.succeed(())
} catch {
promise.fail(error)
}
}
return promise.futureResult
}

/// Clean up the Lambda resources synchronously.
/// Concrete Lambda handlers implement this method to shutdown resources like `HTTPClient`s and database connections.
public func syncShutdown(context: Lambda.ShutdownContext) throws {
// noop
}
}

// MARK: - EventLoopLambdaHandler

/// Strongly typed, `EventLoopFuture` based processing protocol for a Lambda that takes a user defined `In` and returns a user defined `Out` asynchronously.
Expand Down Expand Up @@ -185,19 +164,6 @@ public protocol ByteBufferLambdaHandler {
/// - Returns: An `EventLoopFuture` to report the result of the Lambda back to the runtime engine.
/// The `EventLoopFuture` should be completed with either a response encoded as `ByteBuffer` or an `Error`
func handle(context: Lambda.Context, event: ByteBuffer) -> EventLoopFuture<ByteBuffer?>

/// Clean up the Lambda resources asynchronously.
/// Concrete Lambda handlers implement this method to shutdown resources like `HTTPClient`s and database connections.
///
/// - Note: In case your Lambda fails while creating your LambdaHandler in the `HandlerFactory`, this method
/// **is not invoked**. In this case you must cleanup the created resources immediately in the `HandlerFactory`.
func shutdown(context: Lambda.ShutdownContext) -> EventLoopFuture<Void>
}

extension ByteBufferLambdaHandler {
public func shutdown(context: Lambda.ShutdownContext) -> EventLoopFuture<Void> {
context.eventLoop.makeSucceededFuture(())
}
}

private enum CodecError: Error {
Expand Down
31 changes: 19 additions & 12 deletions Sources/AWSLambdaRuntimeCore/LambdaLifecycle.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import Logging
import NIO
import NIOConcurrencyHelpers
import Lifecycle

extension Lambda {
/// `Lifecycle` manages the Lambda process lifecycle.
Expand All @@ -26,6 +27,7 @@ extension Lambda {
private let logger: Logger
private let configuration: Configuration
private let factory: HandlerFactory
private let lifecycle: ComponentLifecycle

private var state = State.idle {
willSet {
Expand All @@ -50,6 +52,7 @@ extension Lambda {
self.logger = logger
self.configuration = configuration
self.factory = factory
self.lifecycle = ComponentLifecycle(label: "User lifecycle", logger: logger)
}

deinit {
Expand All @@ -71,7 +74,11 @@ extension Lambda {
///
/// - note: This method must be called on the `EventLoop` the `Lifecycle` has been initialized with.
public func start() -> EventLoopFuture<Void> {
self.eventLoop.assertInEventLoop()
guard self.eventLoop.inEventLoop else {
return self.eventLoop.flatSubmit {
self.start()
}
}

logger.info("lambda lifecycle starting with \(self.configuration)")
self.state = .initializing
Expand All @@ -80,25 +87,25 @@ extension Lambda {
logger[metadataKey: "lifecycleId"] = .string(self.configuration.lifecycle.id)
let runner = Runner(eventLoop: self.eventLoop, configuration: self.configuration)

let startupFuture = runner.initialize(logger: logger, factory: self.factory)
startupFuture.flatMap { handler -> EventLoopFuture<(ByteBufferLambdaHandler, Result<Int, Error>)> in
let startupFuture = runner.initialize(logger: logger, lifecycle: lifecycle, factory: self.factory).flatMap { handler in
self.lifecycle.start(on: self.eventLoop).map { _ in handler }
}
startupFuture.flatMap { handler -> EventLoopFuture<Result<Int, Error>> in
// after the startup future has succeeded, we have a handler that we can use
// to `run` the lambda.
let finishedPromise = self.eventLoop.makePromise(of: Int.self)
self.state = .active(runner, handler)
self.run(promise: finishedPromise)
return finishedPromise.futureResult.mapResult { (handler, $0) }
return finishedPromise.futureResult.mapResult { ($0) }
}
.flatMap { (handler, runnerResult) -> EventLoopFuture<Int> in
.flatMap { (runnerResult) -> EventLoopFuture<Int> in
// after the lambda finishPromise has succeeded or failed we need to
// shutdown the handler
let shutdownContext = ShutdownContext(logger: logger, eventLoop: self.eventLoop)
return handler.shutdown(context: shutdownContext).flatMapErrorThrowing { error in
// if, we had an error shuting down the lambda, we want to concatenate it with
// the runner result
logger.error("Error shutting down handler: \(error)")
throw RuntimeError.shutdownError(shutdownError: error, runnerResult: runnerResult)
}.flatMapResult { (_) -> Result<Int, Error> in

let promise = self.eventLoop.makePromise(of: Void.self)
self.lifecycle.shutdown { promise.completeWith($0 != nil ? .failure($0!) : .success(()))}

return promise.futureResult.flatMapResult { (_) -> Result<Int, Error> in
// we had no error shutting down the lambda. let's return the runner's result
runnerResult
}
Expand Down
4 changes: 3 additions & 1 deletion Sources/AWSLambdaRuntimeCore/LambdaRunner.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
//===----------------------------------------------------------------------===//

import Dispatch
import Lifecycle
import Logging
import NIO

Expand All @@ -34,12 +35,13 @@ extension Lambda {
/// Run the user provided initializer. This *must* only be called once.
///
/// - Returns: An `EventLoopFuture<LambdaHandler>` fulfilled with the outcome of the initialization.
func initialize(logger: Logger, factory: @escaping HandlerFactory) -> EventLoopFuture<Handler> {
func initialize(logger: Logger, lifecycle: ComponentLifecycle, factory: @escaping HandlerFactory) -> EventLoopFuture<Handler> {
logger.debug("initializing lambda")
// 1. create the handler from the factory
// 2. report initialization error if one occured
let context = InitializationContext(logger: logger,
eventLoop: self.eventLoop,
lifecycle: lifecycle,
allocator: self.allocator)
return factory(context)
// Hopping back to "our" EventLoop is important in case the factory returns a future
Expand Down
4 changes: 3 additions & 1 deletion Tests/AWSLambdaRuntimeCoreTests/Utils.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

@testable import AWSLambdaRuntimeCore
import Logging
import Lifecycle
import NIO
import XCTest

Expand All @@ -27,9 +28,10 @@ func runLambda(behavior: LambdaServerBehavior, factory: @escaping Lambda.Handler
let logger = Logger(label: "TestLogger")
let configuration = Lambda.Configuration(runtimeEngine: .init(requestTimeout: .milliseconds(100)))
let runner = Lambda.Runner(eventLoop: eventLoopGroup.next(), configuration: configuration)
let lifecycle = ComponentLifecycle(label: "TestLifecycle")
let server = try MockLambdaServer(behavior: behavior).start().wait()
defer { XCTAssertNoThrow(try server.stop().wait()) }
try runner.initialize(logger: logger, factory: factory).flatMap { handler in
try runner.initialize(logger: logger, lifecycle: lifecycle, factory: factory).flatMap { handler in
runner.run(logger: logger, handler: handler)
}.wait()
}
Expand Down