-
Notifications
You must be signed in to change notification settings - Fork 113
RFC: Added ServiceLifecycle dependency and started integration #141
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,6 +20,7 @@ import Darwin.C | |
|
||
import Backtrace | ||
import Logging | ||
import Lifecycle | ||
import NIO | ||
|
||
public enum Lambda { | ||
|
@@ -101,13 +102,15 @@ public enum Lambda { | |
// for testing and internal use | ||
internal static func run(configuration: Configuration = .init(), factory: @escaping HandlerFactory) -> Result<Int, Error> { | ||
let _run = { (configuration: Configuration, factory: @escaping HandlerFactory) -> Result<Int, Error> in | ||
Backtrace.install() | ||
var logger = Logger(label: "Lambda") | ||
logger.logLevel = configuration.general.logLevel | ||
|
||
// we don't intercept the shutdown signal here yet. | ||
let serviceLifecycle = ServiceLifecycle(configuration: .init(logger: logger, shutdownSignal: [], installBacktrace: true)) | ||
|
||
var result: Result<Int, Error>! | ||
MultiThreadedEventLoopGroup.withCurrentThreadAsEventLoop { eventLoop in | ||
let lifecycle = Lifecycle(eventLoop: eventLoop, logger: logger, configuration: configuration, factory: factory) | ||
let lifecycle = Lifecycle(eventLoop: eventLoop, serviceLifecycle: serviceLifecycle, logger: logger, configuration: configuration, factory: factory) | ||
#if DEBUG | ||
let signalSource = trap(signal: configuration.lifecycle.stopSignal) { signal in | ||
logger.info("intercepted signal: \(signal)") | ||
|
@@ -121,11 +124,19 @@ public enum Lambda { | |
#if DEBUG | ||
signalSource.cancel() | ||
#endif | ||
eventLoop.shutdownGracefully { error in | ||
|
||
serviceLifecycle.shutdown { (error) in | ||
if let error = error { | ||
preconditionFailure("Failed to shutdown eventloop: \(error)") | ||
preconditionFailure("Failed to shutdown service: \(error)") | ||
} | ||
|
||
eventLoop.shutdownGracefully { error in | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. shouldn't that also be triggered by the lifecycle shutdown as a lifecycle task? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think thats a good idea: register the event loop in L110 then just call serviceLifecycle.shutdown |
||
if let error = error { | ||
preconditionFailure("Failed to shutdown eventloop: \(error)") | ||
} | ||
} | ||
} | ||
|
||
result = lifecycleResult | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,6 +13,7 @@ | |
//===----------------------------------------------------------------------===// | ||
|
||
import Dispatch | ||
import Lifecycle | ||
import Logging | ||
import NIO | ||
|
||
|
@@ -32,13 +33,17 @@ extension Lambda { | |
/// - note: The `EventLoop` is shared with the Lambda runtime engine and should be handled with extra care. | ||
/// Most importantly the `EventLoop` must never be blocked. | ||
public let eventLoop: EventLoop | ||
|
||
/// `ServiceLifecycle` to register services with | ||
public let serviceLifecycle: ServiceLifecycle | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe simplify name to "lifecycle"? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure if just |
||
|
||
/// `ByteBufferAllocator` to allocate `ByteBuffer` | ||
public let allocator: ByteBufferAllocator | ||
|
||
internal init(logger: Logger, eventLoop: EventLoop, allocator: ByteBufferAllocator) { | ||
internal init(logger: Logger, eventLoop: EventLoop, serviceLifecycle: ServiceLifecycle, allocator: ByteBufferAllocator) { | ||
self.eventLoop = eventLoop | ||
self.logger = logger | ||
self.serviceLifecycle = serviceLifecycle | ||
self.allocator = allocator | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,6 +13,7 @@ | |
//===----------------------------------------------------------------------===// | ||
|
||
import Logging | ||
import Lifecycle | ||
import NIO | ||
import NIOConcurrencyHelpers | ||
|
||
|
@@ -22,6 +23,7 @@ extension Lambda { | |
/// - note: It is intended to be used within a single `EventLoop`. For this reason this class is not thread safe. | ||
public final class Lifecycle { | ||
private let eventLoop: EventLoop | ||
private let serviceLifecycle: ServiceLifecycle | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto on name |
||
private let shutdownPromise: EventLoopPromise<Int> | ||
private let logger: Logger | ||
private let configuration: Configuration | ||
|
@@ -40,12 +42,13 @@ extension Lambda { | |
/// - eventLoop: An `EventLoop` to run the Lambda on. | ||
/// - logger: A `Logger` to log the Lambda events. | ||
/// - factory: A `LambdaHandlerFactory` to create the concrete Lambda handler. | ||
public convenience init(eventLoop: EventLoop, logger: Logger, factory: @escaping HandlerFactory) { | ||
self.init(eventLoop: eventLoop, logger: logger, configuration: .init(), factory: factory) | ||
public convenience init(eventLoop: EventLoop, serviceLifecycle: ServiceLifecycle, logger: Logger, factory: @escaping HandlerFactory) { | ||
self.init(eventLoop: eventLoop, serviceLifecycle: serviceLifecycle, logger: logger, configuration: .init(), factory: factory) | ||
} | ||
|
||
init(eventLoop: EventLoop, logger: Logger, configuration: Configuration, factory: @escaping HandlerFactory) { | ||
init(eventLoop: EventLoop, serviceLifecycle: ServiceLifecycle, logger: Logger, configuration: Configuration, factory: @escaping HandlerFactory) { | ||
self.eventLoop = eventLoop | ||
self.serviceLifecycle = serviceLifecycle | ||
self.shutdownPromise = eventLoop.makePromise(of: Int.self) | ||
self.logger = logger | ||
self.configuration = configuration | ||
|
@@ -80,7 +83,7 @@ extension Lambda { | |
logger[metadataKey: "lifecycleId"] = .string(self.configuration.lifecycle.id) | ||
let runner = Runner(eventLoop: self.eventLoop, configuration: self.configuration) | ||
|
||
let startupFuture = runner.initialize(logger: logger, factory: self.factory) | ||
let startupFuture = runner.initialize(serviceLifecycle: self.serviceLifecycle, logger: logger, factory: self.factory) | ||
startupFuture.flatMap { handler -> EventLoopFuture<(ByteBufferLambdaHandler, Result<Int, Error>)> in | ||
// after the startup future has succeeded, we have a handler that we can use | ||
// to `run` the lambda. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,6 +14,7 @@ | |
|
||
import Dispatch | ||
import Logging | ||
import Lifecycle | ||
import NIO | ||
|
||
extension Lambda { | ||
|
@@ -34,19 +35,34 @@ extension Lambda { | |
/// Run the user provided initializer. This *must* only be called once. | ||
/// | ||
/// - Returns: An `EventLoopFuture<LambdaHandler>` fulfilled with the outcome of the initialization. | ||
func initialize(logger: Logger, factory: @escaping HandlerFactory) -> EventLoopFuture<Handler> { | ||
func initialize(serviceLifecycle: ServiceLifecycle, logger: Logger, factory: @escaping HandlerFactory) -> EventLoopFuture<Handler> { | ||
logger.debug("initializing lambda") | ||
// 1. create the handler from the factory | ||
// 2. report initialization error if one occured | ||
let context = InitializationContext(logger: logger, | ||
eventLoop: self.eventLoop, | ||
serviceLifecycle: serviceLifecycle, | ||
allocator: self.allocator) | ||
return factory(context) | ||
// Hopping back to "our" EventLoop is important in case the factory returns a future | ||
// that originated from a foreign EventLoop/EventLoopGroup. | ||
// This can happen if the factory uses a library (let's say a database client) that manages its own threads/loops | ||
// for whatever reason and returns a future that originated from that foreign EventLoop. | ||
.hop(to: self.eventLoop) | ||
.flatMap { (handler) in | ||
let promise = self.eventLoop.makePromise(of: ByteBufferLambdaHandler.self) | ||
// after we have created the LambdaHandler we must now start the services. | ||
// in order to not have to map once our success case returns the handler. | ||
serviceLifecycle.start { (error) in | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if we should add sugar in the lifecycle NIO compact module to make this kind of integration easier. wdyt? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
if let error = error { | ||
promise.fail(error) | ||
} | ||
else { | ||
promise.succeed(handler) | ||
} | ||
} | ||
return promise.futureResult | ||
} | ||
.peekError { error in | ||
self.runtimeClient.reportInitializationError(logger: logger, error: error).peekError { reportingError in | ||
// We're going to bail out because the init failed, so there's not a lot we can do other than log | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,6 +14,7 @@ | |
|
||
@testable import AWSLambdaRuntimeCore | ||
import Logging | ||
import Lifecycle | ||
import NIO | ||
import NIOHTTP1 | ||
import XCTest | ||
|
@@ -25,11 +26,16 @@ class LambdaLifecycleTest: XCTestCase { | |
defer { XCTAssertNoThrow(try server.stop().wait()) } | ||
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) | ||
defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) } | ||
|
||
|
||
let serviceLifecycle = ServiceLifecycle(configuration: .init(shutdownSignal: [], installBacktrace: false)) | ||
defer { | ||
serviceLifecycle.shutdown() | ||
serviceLifecycle.wait() | ||
} | ||
let eventLoop = eventLoopGroup.next() | ||
let logger = Logger(label: "TestLogger") | ||
let testError = TestError("kaboom") | ||
let lifecycle = Lambda.Lifecycle(eventLoop: eventLoop, logger: logger, factory: { | ||
let lifecycle = Lambda.Lifecycle(eventLoop: eventLoop, serviceLifecycle: serviceLifecycle, logger: logger, factory: { | ||
$0.eventLoop.makeFailedFuture(testError) | ||
}) | ||
|
||
|
@@ -68,6 +74,12 @@ class LambdaLifecycleTest: XCTestCase { | |
defer { XCTAssertNoThrow(try server.stop().wait()) } | ||
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) | ||
defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) } | ||
|
||
let serviceLifecycle = ServiceLifecycle(configuration: .init(shutdownSignal: [], installBacktrace: false)) | ||
defer { | ||
serviceLifecycle.shutdown() | ||
serviceLifecycle.wait() | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if we can now simplify the shutdown logic a bit by relying more on the users using lifecycle |
||
|
||
var count = 0 | ||
let handler = CallbackLambdaHandler({ XCTFail("Should not be reached"); return $0.eventLoop.makeSucceededFuture($1) }) { context in | ||
|
@@ -77,7 +89,7 @@ class LambdaLifecycleTest: XCTestCase { | |
|
||
let eventLoop = eventLoopGroup.next() | ||
let logger = Logger(label: "TestLogger") | ||
let lifecycle = Lambda.Lifecycle(eventLoop: eventLoop, logger: logger, factory: { | ||
let lifecycle = Lambda.Lifecycle(eventLoop: eventLoop, serviceLifecycle: serviceLifecycle, logger: logger, factory: { | ||
$0.eventLoop.makeSucceededFuture(handler) | ||
}) | ||
|
||
|
@@ -94,6 +106,12 @@ class LambdaLifecycleTest: XCTestCase { | |
defer { XCTAssertNoThrow(try server.stop().wait()) } | ||
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) | ||
defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) } | ||
|
||
let serviceLifecycle = ServiceLifecycle(configuration: .init(shutdownSignal: [], installBacktrace: false)) | ||
defer { | ||
serviceLifecycle.shutdown() | ||
serviceLifecycle.wait() | ||
} | ||
|
||
var count = 0 | ||
let handler = CallbackLambdaHandler({ XCTFail("Should not be reached"); return $0.eventLoop.makeSucceededFuture($1) }) { context in | ||
|
@@ -103,7 +121,7 @@ class LambdaLifecycleTest: XCTestCase { | |
|
||
let eventLoop = eventLoopGroup.next() | ||
let logger = Logger(label: "TestLogger") | ||
let lifecycle = Lambda.Lifecycle(eventLoop: eventLoop, logger: logger, factory: { | ||
let lifecycle = Lambda.Lifecycle(eventLoop: eventLoop, serviceLifecycle: serviceLifecycle, logger: logger, factory: { | ||
$0.eventLoop.makeSucceededFuture(handler) | ||
}) | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,6 +14,7 @@ | |
|
||
@testable import AWSLambdaRuntimeCore | ||
import Logging | ||
import Lifecycle | ||
import NIO | ||
import XCTest | ||
|
||
|
@@ -29,7 +30,14 @@ func runLambda(behavior: LambdaServerBehavior, factory: @escaping Lambda.Handler | |
let runner = Lambda.Runner(eventLoop: eventLoopGroup.next(), configuration: configuration) | ||
let server = try MockLambdaServer(behavior: behavior).start().wait() | ||
defer { XCTAssertNoThrow(try server.stop().wait()) } | ||
try runner.initialize(logger: logger, factory: factory).flatMap { handler in | ||
|
||
let serviceLifecycle = ServiceLifecycle(configuration: .init(shutdownSignal: [], installBacktrace: false)) | ||
defer { | ||
serviceLifecycle.shutdown() | ||
serviceLifecycle.wait() | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess we can register the server with lifecycle but maybe will make the test harder to reason about |
||
|
||
try runner.initialize(serviceLifecycle: serviceLifecycle, logger: logger, factory: factory).flatMap { handler in | ||
runner.run(logger: logger, handler: handler) | ||
}.wait() | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why
(error)
?