Skip to content

Refactor init #24

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Sources/SwiftAwsLambda/Lambda+Codable.swift
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ extension Lambda {

// for testing
internal static func run<In: Decodable, Out: Encodable>(configuration: Configuration = .init(), closure: @escaping LambdaCodableClosure<In, Out>) -> LambdaLifecycleResult {
return self.run(handler: LambdaClosureWrapper(closure), configuration: configuration)
return self.run(configuration: configuration, handler: LambdaClosureWrapper(closure))
}
}

Expand Down
2 changes: 1 addition & 1 deletion Sources/SwiftAwsLambda/Lambda+String.swift
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ extension Lambda {

// for testing
internal static func run(configuration: Configuration = .init(), _ closure: @escaping LambdaStringClosure) -> LambdaLifecycleResult {
return self.run(handler: LambdaClosureWrapper(closure), configuration: configuration)
return self.run(configuration: configuration, handler: LambdaClosureWrapper(closure))
}
}

Expand Down
99 changes: 65 additions & 34 deletions Sources/SwiftAwsLambda/Lambda.swift
Original file line number Diff line number Diff line change
Expand Up @@ -41,32 +41,47 @@ public enum Lambda {
self.run(handler: handler)
}

/// Run a Lambda defined by implementing the `LambdaHandler` protocol.
///
/// - note: This is a blocking operation that will run forever, as it's lifecycle is managed by the AWS Lambda Runtime Engine.
@inlinable
public static func run(_ provider: @escaping LambdaHandlerProvider) {
self.run(provider: provider)
}

// for testing and internal use
@usableFromInline
@discardableResult
internal static func run(configuration: Configuration = .init(), closure: @escaping LambdaClosure) -> LambdaLifecycleResult {
return self.run(handler: LambdaClosureWrapper(closure), configuration: configuration)
return self.run(configuration: configuration, handler: LambdaClosureWrapper(closure))
}

// for testing and internal use
@usableFromInline
@discardableResult
internal static func run(handler: LambdaHandler, configuration: Configuration = .init()) -> LambdaLifecycleResult {
internal static func run(configuration: Configuration = .init(), handler: LambdaHandler) -> LambdaLifecycleResult {
return self.run(configuration: configuration, provider: { _ in handler })
}

// for testing and internal use
@usableFromInline
@discardableResult
internal static func run(configuration: Configuration = .init(), provider: @escaping LambdaHandlerProvider) -> LambdaLifecycleResult {
do {
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) // only need one thread, will improve performance
defer { try! eventLoopGroup.syncShutdownGracefully() }
let result = try self.runAsync(eventLoopGroup: eventLoopGroup, handler: handler, configuration: configuration).wait()
let result = try self.runAsync(eventLoopGroup: eventLoopGroup, configuration: configuration, provider: provider).wait()
return .success(result)
} catch {
return .failure(error)
}
}

internal static func runAsync(eventLoopGroup: EventLoopGroup, handler: LambdaHandler, configuration: Configuration) -> EventLoopFuture<Int> {
internal static func runAsync(eventLoopGroup: EventLoopGroup, configuration: Configuration, provider: @escaping LambdaHandlerProvider) -> EventLoopFuture<Int> {
Backtrace.install()
var logger = Logger(label: "Lambda")
logger.logLevel = configuration.general.logLevel
let lifecycle = Lifecycle(eventLoop: eventLoopGroup.next(), logger: logger, configuration: configuration, handler: handler)
let lifecycle = Lifecycle(eventLoop: eventLoopGroup.next(), logger: logger, configuration: configuration, provider: provider)
let signalSource = trap(signal: configuration.lifecycle.stopSignal) { signal in
logger.info("intercepted signal: \(signal)")
lifecycle.stop()
Expand Down Expand Up @@ -113,31 +128,33 @@ public enum Lambda {
private let eventLoop: EventLoop
private let logger: Logger
private let configuration: Configuration
private let handler: LambdaHandler
private let provider: LambdaHandlerProvider

private var _state = LifecycleState.idle
private var _state = State.idle
private let stateLock = Lock()

init(eventLoop: EventLoop, logger: Logger, configuration: Configuration, handler: LambdaHandler) {
init(eventLoop: EventLoop, logger: Logger, configuration: Configuration, provider: @escaping LambdaHandlerProvider) {
self.eventLoop = eventLoop
self.logger = logger
self.configuration = configuration
self.handler = handler
self.provider = provider
}

deinit {
precondition(self.state == .shutdown, "invalid state \(self.state)")
guard case .shutdown = self.state else {
preconditionFailure("invalid state \(self.state)")
}
}

private var state: LifecycleState {
private var state: State {
get {
return self.stateLock.withLock {
self._state
}
}
set {
self.stateLock.withLockVoid {
precondition(newValue.rawValue > _state.rawValue, "invalid state \(newValue) after \(self._state)")
precondition(newValue.order > _state.order, "invalid state \(newValue) after \(self._state)")
self._state = newValue
}
}
Expand All @@ -148,10 +165,10 @@ public enum Lambda {
self.state = .initializing
var logger = self.logger
logger[metadataKey: "lifecycleId"] = .string(self.configuration.lifecycle.id)
let runner = LambdaRunner(eventLoop: self.eventLoop, configuration: self.configuration, lambdaHandler: self.handler)
return runner.initialize(logger: logger).flatMap { _ in
self.state = .active
return self.run(runner: runner)
let runner = LambdaRunner(eventLoop: self.eventLoop, configuration: self.configuration)
return runner.initialize(logger: logger, provider: self.provider).flatMap { handler in
self.state = .active(runner, handler)
return self.run()
}
}

Expand All @@ -166,18 +183,18 @@ public enum Lambda {
}

@inline(__always)
private func run(runner: LambdaRunner) -> EventLoopFuture<Int> {
private func run() -> EventLoopFuture<Int> {
let promise = self.eventLoop.makePromise(of: Int.self)

func _run(_ count: Int) {
switch self.state {
case .active:
case .active(let runner, let handler):
if self.configuration.lifecycle.maxTimes > 0, count >= self.configuration.lifecycle.maxTimes {
return promise.succeed(count)
}
var logger = self.logger
logger[metadataKey: "lifecycleIteration"] = "\(count)"
runner.run(logger: logger).whenComplete { result in
runner.run(logger: logger, handler: handler).whenComplete { result in
switch result {
case .success:
// recursive! per aws lambda runtime spec the polling requests are to be done one at a time
Expand All @@ -197,6 +214,29 @@ public enum Lambda {

return promise.futureResult
}

private enum State {
case idle
case initializing
case active(LambdaRunner, LambdaHandler)
case stopping
case shutdown

internal var order: Int {
switch self {
case .idle:
return 0
case .initializing:
return 1
case .active:
return 2
case .stopping:
return 3
case .shutdown:
return 4
}
}
}
}

@usableFromInline
Expand Down Expand Up @@ -274,14 +314,6 @@ public enum Lambda {
return "\(Configuration.self)\n \(self.general))\n \(self.lifecycle)\n \(self.runtimeEngine)"
}
}

private enum LifecycleState: Int {
case idle
case initializing
case active
case stopping
case shutdown
}
}

/// A result type for a Lambda that returns a `[UInt8]`.
Expand All @@ -298,18 +330,17 @@ public typealias LambdaInitResult = Result<Void, Error>
/// A callback to provide the result of Lambda initialization.
public typealias LambdaInitCallBack = (LambdaInitResult) -> Void

public typealias LambdaHandlerProvider = (EventLoop) throws -> LambdaHandler
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this a lot! What do you think about making this:

  • async with a promise?
  • providing the EventLoopGroup? As I stated before: with more than 1700mb of ram one get's access to two cores, and I would really like to use both 😉
  • I think, if we make this async we don't need the bootstrap callback, and we can all call it a day? With this we have one callback that initialises a struct where we can set all properties.

Copy link
Contributor Author

@tomerd tomerd Mar 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @fabianfett

async with a promise?
I think, if we make this async we don't need the bootstrap callback, and we can all call it a day? With this we have one callback that initialises a struct where we can set all properties.

the idea of separating this into a init (provider) and a bootstrap phases, is that we can make the provider blocking+throwing and the bootstrap async. this means you can do things like Lambda.run(MyLambda.init) in which you initialize some database/http client with the event loop that will run the lambda but dont have to worry calling a callback or completing a promise. we could change the provider API to also take a callback/promise and then create invariants of the user facing API that call-the-callback/complete-the-promise on the user's behalf to keep the user facing API simpler. I will play with that idea and post another version if it turns out okay

providing the EventLoopGroup? As I stated before: with more than 1700mb of ram one get's access to two cores, and I would really like to use both 😉

interesting point. the goal for exposing the underlying eventLoop to the be able to share it with a database/http client (for example) that also uses swift-nio. further, the eventLoopGroup we are using in the library only has 1 thread, since it optimizing for the lambda runtime use case and ensuring 1 thread is good for nio's performance. of course, the user can always create their own EventLoopGroup and use that if they want to do things in parallel.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fabianfett @glbrntt here is the other alternative: #28

it works but a bit complicated to reason about so not sure which I prefer


/// A processing protocol for a Lambda that takes a `[UInt8]` and returns a `LambdaResult` result type asynchronously.
public protocol LambdaHandler {
/// Initializes the `LambdaHandler`.
func initialize(callback: @escaping LambdaInitCallBack)
/// Handles the Lambda request.
func handle(context: Lambda.Context, payload: [UInt8], callback: @escaping LambdaCallback)
}

extension LambdaHandler {
@inlinable
public func initialize(callback: @escaping LambdaInitCallBack) {
callback(.success(()))
}
public protocol BootstrappedLambdaHandler: LambdaHandler {
/// Bootstraps the `LambdaHandler`.
func bootstrap(callback: @escaping LambdaInitCallBack)
}

@usableFromInline
Expand Down
59 changes: 37 additions & 22 deletions Sources/SwiftAwsLambda/LambdaRunner.swift
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,41 @@ import NIO
/// LambdaRunner manages the Lambda runtime workflow, or business logic.
internal struct LambdaRunner {
private let runtimeClient: LambdaRuntimeClient
private let lambdaHandler: LambdaHandler
private let eventLoop: EventLoop
private let lifecycleId: String
private let offload: Bool

init(eventLoop: EventLoop, configuration: Lambda.Configuration, lambdaHandler: LambdaHandler) {
init(eventLoop: EventLoop, configuration: Lambda.Configuration) {
self.eventLoop = eventLoop
self.runtimeClient = LambdaRuntimeClient(eventLoop: self.eventLoop, configuration: configuration.runtimeEngine)
self.lambdaHandler = lambdaHandler
self.lifecycleId = configuration.lifecycle.id
self.offload = configuration.runtimeEngine.offload
}

/// Run the user provided initializer. This *must* only be called once.
///
/// - Returns: An `EventLoopFuture<Void>` fulfilled with the outcome of the initialization.
func initialize(logger: Logger) -> EventLoopFuture<Void> {
/// - Returns: An `EventLoopFuture<LambdaHandler>` fulfilled with the outcome of the initialization.
func initialize(logger: Logger, provider: @escaping LambdaHandlerProvider) -> EventLoopFuture<LambdaHandler> {
logger.debug("initializing lambda")
// We need to use `flatMap` instead of `whenFailure` to ensure we complete reporting the result before stopping.
return self.lambdaHandler.initialize(eventLoop: self.eventLoop,
lifecycleId: self.lifecycleId,
offload: self.offload).peekError { error in

let future: EventLoopFuture<LambdaHandler>
do {
// 1. craete the handler from the provider
let handler = try provider(self.eventLoop)
// 2. bootstrap if needed
if let handler = handler as? BootstrappedLambdaHandler {
future = handler.bootstrap(eventLoop: self.eventLoop,
lifecycleId: self.lifecycleId,
offload: self.offload).map { handler }
} else {
future = self.eventLoop.makeSucceededFuture(handler)
}
} catch {
future = self.eventLoop.makeFailedFuture(error)
}

// 3. report initialization error if one occured
return future.peekError { error in
self.runtimeClient.reportInitializationError(logger: logger, error: error).peekError { reportingError in
// We're going to bail out because the init failed, so there's not a lot we can do other than log
// that we couldn't report this error back to the runtime.
Expand All @@ -49,24 +62,24 @@ internal struct LambdaRunner {
}
}

func run(logger: Logger) -> EventLoopFuture<Void> {
func run(logger: Logger, handler: LambdaHandler) -> EventLoopFuture<Void> {
logger.debug("lambda invocation sequence starting")
// 1. request work from lambda runtime engine
return self.runtimeClient.requestWork(logger: logger).peekError { error in
logger.error("could not fetch work from lambda runtime engine: \(error)")
}.flatMap { invocation, payload in
// 2. send work to handler
let context = Lambda.Context(logger: logger, eventLoop: self.eventLoop, invocation: invocation)
logger.debug("sending work to lambda handler \(self.lambdaHandler)")
logger.debug("sending work to lambda handler \(handler)")

// TODO: This is just for now, so that we can work with ByteBuffers only
// in the LambdaRuntimeClient
let bytes = [UInt8](payload.readableBytesView)
return self.lambdaHandler.handle(eventLoop: self.eventLoop,
lifecycleId: self.lifecycleId,
offload: self.offload,
context: context,
payload: bytes)
return handler.handle(eventLoop: self.eventLoop,
lifecycleId: self.lifecycleId,
offload: self.offload,
context: context,
payload: bytes)
.map {
// TODO: This mapping shall be removed as soon as the LambdaHandler protocol
// works with ByteBuffer? instead of [UInt8]
Expand All @@ -93,24 +106,26 @@ internal struct LambdaRunner {
}
}

private extension LambdaHandler {
func initialize(eventLoop: EventLoop, lifecycleId: String, offload: Bool) -> EventLoopFuture<Void> {
// offloading so user code never blocks the eventloop
private extension BootstrappedLambdaHandler {
func bootstrap(eventLoop: EventLoop, lifecycleId: String, offload: Bool) -> EventLoopFuture<Void> {
let promise = eventLoop.makePromise(of: Void.self)
if offload {
// offloading so user code never blocks the eventloop
DispatchQueue(label: "lambda-\(lifecycleId)").async {
self.initialize { promise.completeWith($0) }
self.bootstrap { promise.completeWith($0) }
}
} else {
self.initialize { promise.completeWith($0) }
self.bootstrap { promise.completeWith($0) }
}
return promise.futureResult
}
}

private extension LambdaHandler {
func handle(eventLoop: EventLoop, lifecycleId: String, offload: Bool, context: Lambda.Context, payload: [UInt8]) -> EventLoopFuture<LambdaResult> {
// offloading so user code never blocks the eventloop
let promise = eventLoop.makePromise(of: LambdaResult.self)
if offload {
// offloading so user code never blocks the eventloop
DispatchQueue(label: "lambda-\(lifecycleId)").async {
self.handle(context: context, payload: payload) { result in
promise.succeed(result)
Expand Down
6 changes: 4 additions & 2 deletions Tests/SwiftAwsLambdaTests/Lambda+CodeableTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ class CodableLambdaTest: XCTestCase {
callback(.success(Response(requestId: payload.requestId)))
}
}

let maxTimes = Int.random(in: 1 ... 10)
let configuration = Lambda.Configuration(lifecycle: .init(maxTimes: maxTimes))
let result = Lambda.run(handler: Handler(), configuration: configuration)
let result = Lambda.run(configuration: configuration, handler: Handler())
assertLambdaLifecycleResult(result, shoudHaveRun: maxTimes)
}

Expand All @@ -43,9 +44,10 @@ class CodableLambdaTest: XCTestCase {
callback(.failure(TestError("boom")))
}
}

let maxTimes = Int.random(in: 1 ... 10)
let configuration = Lambda.Configuration(lifecycle: .init(maxTimes: maxTimes))
let result = Lambda.run(handler: Handler(), configuration: configuration)
let result = Lambda.run(configuration: configuration, handler: Handler())
assertLambdaLifecycleResult(result, shoudHaveRun: maxTimes)
}

Expand Down
6 changes: 4 additions & 2 deletions Tests/SwiftAwsLambdaTests/Lambda+StringTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ class StringLambdaTest: XCTestCase {
callback(.success(payload))
}
}

let maxTimes = Int.random(in: 1 ... 10)
let configuration = Lambda.Configuration(lifecycle: .init(maxTimes: maxTimes))
let result = Lambda.run(handler: Handler(), configuration: configuration)
let result = Lambda.run(configuration: configuration, handler: Handler())
assertLambdaLifecycleResult(result, shoudHaveRun: maxTimes)
}

Expand All @@ -43,9 +44,10 @@ class StringLambdaTest: XCTestCase {
callback(.failure(TestError("boom")))
}
}

let maxTimes = Int.random(in: 1 ... 10)
let configuration = Lambda.Configuration(lifecycle: .init(maxTimes: maxTimes))
let result = Lambda.run(handler: Handler(), configuration: configuration)
let result = Lambda.run(configuration: configuration, handler: Handler())
assertLambdaLifecycleResult(result, shoudHaveRun: maxTimes)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@ extension LambdaRuntimeClientTest {
return [
("testSuccess", testSuccess),
("testFailure", testFailure),
("testProviderFailure", testProviderFailure),
("testBootstrapFailure", testBootstrapFailure),
("testGetWorkServerInternalError", testGetWorkServerInternalError),
("testGetWorkServerNoBodyError", testGetWorkServerNoBodyError),
("testGetWorkServerMissingHeaderRequestIDError", testGetWorkServerMissingHeaderRequestIDError),
("testProcessResponseInternalServerError", testProcessResponseInternalServerError),
("testProcessErrorInternalServerError", testProcessErrorInternalServerError),
("testProcessInitErrorOnProviderFailure", testProcessInitErrorOnProviderFailure),
("testProcessInitErrorOnBootstrapFailure", testProcessInitErrorOnBootstrapFailure),
]
}
Expand Down
Loading