Skip to content

Commit d6c9977

Browse files
committed
refactor init
motivation: make initialization logic more robust, allowing setup at contructor time and also async bootstrap changes: * break apart "initialization" into two parts: * optional throwing constructor (provider) that takes an EventLoop * optional BootstrappedLambdaHandler protocol that takes an EventLoop and returns async * update core API and logic to support new initialization logic * add tests to various initialization flows
1 parent a6af55c commit d6c9977

11 files changed

+271
-93
lines changed

Sources/SwiftAwsLambda/Lambda+Codable.swift

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
//===----------------------------------------------------------------------===//
1414

1515
import Foundation // for JSON
16+
import NIO
1617

1718
/// Extension to the `Lambda` companion to enable execution of Lambdas that take and return `Codable` payloads.
1819
/// This is the most common way to use this library in AWS Lambda, since its JSON based.
@@ -31,14 +32,26 @@ extension Lambda {
3132
self.run(handler as LambdaHandler)
3233
}
3334

35+
/// Run a Lambda defined by implementing the `LambdaCodableHandler` protocol, having `In` and `Out` are `Decodable` and `Encodable` respectively.
36+
///
37+
/// - note: This is a blocking operation that will run forever, as it's lifecycle is managed by the AWS Lambda Runtime Engine.
38+
public static func run<Handler>(_ provider: @escaping (EventLoop) throws -> Handler) where Handler: LambdaCodableHandler {
39+
self.run { try provider($0) as LambdaHandler }
40+
}
41+
3442
// for testing
3543
internal static func run<In: Decodable, Out: Encodable>(configuration: Configuration = .init(), closure: @escaping LambdaCodableClosure<In, Out>) -> LambdaLifecycleResult {
36-
return self.run(handler: LambdaClosureWrapper(closure), configuration: configuration)
44+
return self.run(configuration: configuration, handler: LambdaClosureWrapper(closure))
45+
}
46+
47+
// for testing
48+
internal static func run<Handler>(configuration: Configuration = .init(), handler: Handler) -> LambdaLifecycleResult where Handler: LambdaCodableHandler {
49+
return self.run(configuration: configuration, handler: handler as LambdaHandler)
3750
}
3851

3952
// for testing
40-
internal static func run<Handler>(handler: Handler, configuration: Configuration = .init()) -> LambdaLifecycleResult where Handler: LambdaCodableHandler {
41-
return self.run(handler: handler as LambdaHandler, configuration: configuration)
53+
internal static func run<Handler>(configuration: Configuration = .init(), provider: @escaping (EventLoop) throws -> Handler) -> LambdaLifecycleResult where Handler: LambdaCodableHandler {
54+
return self.run(configuration: configuration, provider: { try provider($0) as LambdaHandler })
4255
}
4356
}
4457

Sources/SwiftAwsLambda/Lambda+String.swift

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
//
1313
//===----------------------------------------------------------------------===//
1414

15+
import NIO
16+
1517
/// Extension to the `Lambda` companion to enable execution of Lambdas that take and return `String` payloads.
1618
extension Lambda {
1719
/// Run a Lambda defined by implementing the `LambdaStringClosure` protocol.
@@ -28,14 +30,26 @@ extension Lambda {
2830
self.run(handler as LambdaHandler)
2931
}
3032

33+
/// Run a Lambda defined by implementing the `LambdaStringHandler` protocol.
34+
///
35+
/// - note: This is a blocking operation that will run forever, as it's lifecycle is managed by the AWS Lambda Runtime Engine.
36+
public static func run(_ provider: @escaping (EventLoop) throws -> LambdaStringHandler) {
37+
self.run { try provider($0) as LambdaHandler }
38+
}
39+
3140
// for testing
3241
internal static func run(configuration: Configuration = .init(), _ closure: @escaping LambdaStringClosure) -> LambdaLifecycleResult {
33-
return self.run(handler: LambdaClosureWrapper(closure), configuration: configuration)
42+
return self.run(configuration: configuration, handler: LambdaClosureWrapper(closure))
43+
}
44+
45+
// for testing
46+
internal static func run(configuration: Configuration = .init(), handler: LambdaStringHandler) -> LambdaLifecycleResult {
47+
return self.run(configuration: configuration, handler: handler as LambdaHandler)
3448
}
3549

3650
// for testing
37-
internal static func run(handler: LambdaStringHandler, configuration: Configuration = .init()) -> LambdaLifecycleResult {
38-
return self.run(handler: handler as LambdaHandler, configuration: configuration)
51+
internal static func run(configuration: Configuration = .init(), provider: @escaping (EventLoop) throws -> LambdaStringHandler) -> LambdaLifecycleResult {
52+
return self.run(configuration: configuration, provider: { try provider($0) as LambdaHandler })
3953
}
4054
}
4155

Sources/SwiftAwsLambda/Lambda.swift

Lines changed: 65 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -41,32 +41,47 @@ public enum Lambda {
4141
self.run(handler: handler)
4242
}
4343

44+
/// Run a Lambda defined by implementing the `LambdaHandler` protocol.
45+
///
46+
/// - note: This is a blocking operation that will run forever, as it's lifecycle is managed by the AWS Lambda Runtime Engine.
47+
@inlinable
48+
public static func run(_ provider: @escaping LambdaHandlerProvider) {
49+
self.run(provider: provider)
50+
}
51+
4452
// for testing and internal use
4553
@usableFromInline
4654
@discardableResult
4755
internal static func run(configuration: Configuration = .init(), closure: @escaping LambdaClosure) -> LambdaLifecycleResult {
48-
return self.run(handler: LambdaClosureWrapper(closure), configuration: configuration)
56+
return self.run(configuration: configuration, handler: LambdaClosureWrapper(closure))
4957
}
5058

5159
// for testing and internal use
5260
@usableFromInline
5361
@discardableResult
54-
internal static func run(handler: LambdaHandler, configuration: Configuration = .init()) -> LambdaLifecycleResult {
62+
internal static func run(configuration: Configuration = .init(), handler: LambdaHandler) -> LambdaLifecycleResult {
63+
return self.run(configuration: configuration, provider: { _ in handler })
64+
}
65+
66+
// for testing and internal use
67+
@usableFromInline
68+
@discardableResult
69+
internal static func run(configuration: Configuration = .init(), provider: @escaping LambdaHandlerProvider) -> LambdaLifecycleResult {
5570
do {
5671
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) // only need one thread, will improve performance
5772
defer { try! eventLoopGroup.syncShutdownGracefully() }
58-
let result = try self.runAsync(eventLoopGroup: eventLoopGroup, handler: handler, configuration: configuration).wait()
73+
let result = try self.runAsync(eventLoopGroup: eventLoopGroup, configuration: configuration, provider: provider).wait()
5974
return .success(result)
6075
} catch {
6176
return .failure(error)
6277
}
6378
}
6479

65-
internal static func runAsync(eventLoopGroup: EventLoopGroup, handler: LambdaHandler, configuration: Configuration) -> EventLoopFuture<Int> {
80+
internal static func runAsync(eventLoopGroup: EventLoopGroup, configuration: Configuration, provider: @escaping LambdaHandlerProvider) -> EventLoopFuture<Int> {
6681
Backtrace.install()
6782
var logger = Logger(label: "Lambda")
6883
logger.logLevel = configuration.general.logLevel
69-
let lifecycle = Lifecycle(eventLoop: eventLoopGroup.next(), logger: logger, configuration: configuration, handler: handler)
84+
let lifecycle = Lifecycle(eventLoop: eventLoopGroup.next(), logger: logger, configuration: configuration, provider: provider)
7085
let signalSource = trap(signal: configuration.lifecycle.stopSignal) { signal in
7186
logger.info("intercepted signal: \(signal)")
7287
lifecycle.stop()
@@ -113,31 +128,33 @@ public enum Lambda {
113128
private let eventLoop: EventLoop
114129
private let logger: Logger
115130
private let configuration: Configuration
116-
private let handler: LambdaHandler
131+
private let provider: LambdaHandlerProvider
117132

118-
private var _state = LifecycleState.idle
133+
private var _state = State.idle
119134
private let stateLock = Lock()
120135

121-
init(eventLoop: EventLoop, logger: Logger, configuration: Configuration, handler: LambdaHandler) {
136+
init(eventLoop: EventLoop, logger: Logger, configuration: Configuration, provider: @escaping LambdaHandlerProvider) {
122137
self.eventLoop = eventLoop
123138
self.logger = logger
124139
self.configuration = configuration
125-
self.handler = handler
140+
self.provider = provider
126141
}
127142

128143
deinit {
129-
precondition(self.state == .shutdown, "invalid state \(self.state)")
144+
guard case .shutdown = self.state else {
145+
preconditionFailure("invalid state \(self.state)")
146+
}
130147
}
131148

132-
private var state: LifecycleState {
149+
private var state: State {
133150
get {
134151
return self.stateLock.withLock {
135152
self._state
136153
}
137154
}
138155
set {
139156
self.stateLock.withLockVoid {
140-
precondition(newValue.rawValue > _state.rawValue, "invalid state \(newValue) after \(self._state)")
157+
precondition(newValue.order > _state.order, "invalid state \(newValue) after \(self._state)")
141158
self._state = newValue
142159
}
143160
}
@@ -148,10 +165,10 @@ public enum Lambda {
148165
self.state = .initializing
149166
var logger = self.logger
150167
logger[metadataKey: "lifecycleId"] = .string(self.configuration.lifecycle.id)
151-
let runner = LambdaRunner(eventLoop: self.eventLoop, configuration: self.configuration, lambdaHandler: self.handler)
152-
return runner.initialize(logger: logger).flatMap { _ in
153-
self.state = .active
154-
return self.run(runner: runner)
168+
let runner = LambdaRunner(eventLoop: self.eventLoop, configuration: self.configuration)
169+
return runner.initialize(logger: logger, provider: self.provider).flatMap { handler in
170+
self.state = .active(runner, handler)
171+
return self.run()
155172
}
156173
}
157174

@@ -166,18 +183,18 @@ public enum Lambda {
166183
}
167184

168185
@inline(__always)
169-
private func run(runner: LambdaRunner) -> EventLoopFuture<Int> {
186+
private func run() -> EventLoopFuture<Int> {
170187
let promise = self.eventLoop.makePromise(of: Int.self)
171188

172189
func _run(_ count: Int) {
173190
switch self.state {
174-
case .active:
191+
case .active(let runner, let handler):
175192
if self.configuration.lifecycle.maxTimes > 0, count >= self.configuration.lifecycle.maxTimes {
176193
return promise.succeed(count)
177194
}
178195
var logger = self.logger
179196
logger[metadataKey: "lifecycleIteration"] = "\(count)"
180-
runner.run(logger: logger).whenComplete { result in
197+
runner.run(logger: logger, handler: handler).whenComplete { result in
181198
switch result {
182199
case .success:
183200
// recursive! per aws lambda runtime spec the polling requests are to be done one at a time
@@ -197,6 +214,29 @@ public enum Lambda {
197214

198215
return promise.futureResult
199216
}
217+
218+
private enum State {
219+
case idle
220+
case initializing
221+
case active(LambdaRunner, LambdaHandler)
222+
case stopping
223+
case shutdown
224+
225+
internal var order: Int {
226+
switch self {
227+
case .idle:
228+
return 0
229+
case .initializing:
230+
return 1
231+
case .active:
232+
return 2
233+
case .stopping:
234+
return 3
235+
case .shutdown:
236+
return 4
237+
}
238+
}
239+
}
200240
}
201241

202242
@usableFromInline
@@ -274,14 +314,6 @@ public enum Lambda {
274314
return "\(Configuration.self)\n \(self.general))\n \(self.lifecycle)\n \(self.runtimeEngine)"
275315
}
276316
}
277-
278-
private enum LifecycleState: Int {
279-
case idle
280-
case initializing
281-
case active
282-
case stopping
283-
case shutdown
284-
}
285317
}
286318

287319
/// A result type for a Lambda that returns a `[UInt8]`.
@@ -298,18 +330,17 @@ public typealias LambdaInitResult = Result<Void, Error>
298330
/// A callback to provide the result of Lambda initialization.
299331
public typealias LambdaInitCallBack = (LambdaInitResult) -> Void
300332

333+
public typealias LambdaHandlerProvider = (EventLoop) throws -> LambdaHandler
334+
301335
/// A processing protocol for a Lambda that takes a `[UInt8]` and returns a `LambdaResult` result type asynchronously.
302336
public protocol LambdaHandler {
303-
/// Initializes the `LambdaHandler`.
304-
func initialize(callback: @escaping LambdaInitCallBack)
337+
/// Handles the Lambda request.
305338
func handle(context: Lambda.Context, payload: [UInt8], callback: @escaping LambdaCallback)
306339
}
307340

308-
extension LambdaHandler {
309-
@inlinable
310-
public func initialize(callback: @escaping LambdaInitCallBack) {
311-
callback(.success(()))
312-
}
341+
public protocol BootstrappedLambdaHandler: LambdaHandler {
342+
/// Bootstraps the `LambdaHandler`.
343+
func bootstrap(callback: @escaping LambdaInitCallBack)
313344
}
314345

315346
@usableFromInline

Sources/SwiftAwsLambda/LambdaRunner.swift

Lines changed: 37 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -19,28 +19,41 @@ import NIO
1919
/// LambdaRunner manages the Lambda runtime workflow, or business logic.
2020
internal struct LambdaRunner {
2121
private let runtimeClient: LambdaRuntimeClient
22-
private let lambdaHandler: LambdaHandler
2322
private let eventLoop: EventLoop
2423
private let lifecycleId: String
2524
private let offload: Bool
2625

27-
init(eventLoop: EventLoop, configuration: Lambda.Configuration, lambdaHandler: LambdaHandler) {
26+
init(eventLoop: EventLoop, configuration: Lambda.Configuration) {
2827
self.eventLoop = eventLoop
2928
self.runtimeClient = LambdaRuntimeClient(eventLoop: self.eventLoop, configuration: configuration.runtimeEngine)
30-
self.lambdaHandler = lambdaHandler
3129
self.lifecycleId = configuration.lifecycle.id
3230
self.offload = configuration.runtimeEngine.offload
3331
}
3432

3533
/// Run the user provided initializer. This *must* only be called once.
3634
///
37-
/// - Returns: An `EventLoopFuture<Void>` fulfilled with the outcome of the initialization.
38-
func initialize(logger: Logger) -> EventLoopFuture<Void> {
35+
/// - Returns: An `EventLoopFuture<LambdaHandler>` fulfilled with the outcome of the initialization.
36+
func initialize(logger: Logger, provider: @escaping LambdaHandlerProvider) -> EventLoopFuture<LambdaHandler> {
3937
logger.debug("initializing lambda")
40-
// We need to use `flatMap` instead of `whenFailure` to ensure we complete reporting the result before stopping.
41-
return self.lambdaHandler.initialize(eventLoop: self.eventLoop,
42-
lifecycleId: self.lifecycleId,
43-
offload: self.offload).peekError { error in
38+
39+
let future: EventLoopFuture<LambdaHandler>
40+
do {
41+
// 1. craete the handler from the provider
42+
let handler = try provider(self.eventLoop)
43+
// 2. bootstrap if needed
44+
if let handler = handler as? BootstrappedLambdaHandler {
45+
future = handler.bootstrap(eventLoop: self.eventLoop,
46+
lifecycleId: self.lifecycleId,
47+
offload: self.offload).map { handler }
48+
} else {
49+
future = self.eventLoop.makeSucceededFuture(handler)
50+
}
51+
} catch {
52+
future = self.eventLoop.makeFailedFuture(error)
53+
}
54+
55+
// 3. report initialization error if one occured
56+
return future.peekError { error in
4457
self.runtimeClient.reportInitializationError(logger: logger, error: error).peekError { reportingError in
4558
// We're going to bail out because the init failed, so there's not a lot we can do other than log
4659
// that we couldn't report this error back to the runtime.
@@ -49,24 +62,24 @@ internal struct LambdaRunner {
4962
}
5063
}
5164

52-
func run(logger: Logger) -> EventLoopFuture<Void> {
65+
func run(logger: Logger, handler: LambdaHandler) -> EventLoopFuture<Void> {
5366
logger.debug("lambda invocation sequence starting")
5467
// 1. request work from lambda runtime engine
5568
return self.runtimeClient.requestWork(logger: logger).peekError { error in
5669
logger.error("could not fetch work from lambda runtime engine: \(error)")
5770
}.flatMap { invocation, payload in
5871
// 2. send work to handler
5972
let context = Lambda.Context(logger: logger, eventLoop: self.eventLoop, invocation: invocation)
60-
logger.debug("sending work to lambda handler \(self.lambdaHandler)")
73+
logger.debug("sending work to lambda handler \(handler)")
6174

6275
// TODO: This is just for now, so that we can work with ByteBuffers only
6376
// in the LambdaRuntimeClient
6477
let bytes = [UInt8](payload.readableBytesView)
65-
return self.lambdaHandler.handle(eventLoop: self.eventLoop,
66-
lifecycleId: self.lifecycleId,
67-
offload: self.offload,
68-
context: context,
69-
payload: bytes)
78+
return handler.handle(eventLoop: self.eventLoop,
79+
lifecycleId: self.lifecycleId,
80+
offload: self.offload,
81+
context: context,
82+
payload: bytes)
7083
.map {
7184
// TODO: This mapping shall be removed as soon as the LambdaHandler protocol
7285
// works with ByteBuffer? instead of [UInt8]
@@ -93,24 +106,26 @@ internal struct LambdaRunner {
93106
}
94107
}
95108

96-
private extension LambdaHandler {
97-
func initialize(eventLoop: EventLoop, lifecycleId: String, offload: Bool) -> EventLoopFuture<Void> {
98-
// offloading so user code never blocks the eventloop
109+
private extension BootstrappedLambdaHandler {
110+
func bootstrap(eventLoop: EventLoop, lifecycleId: String, offload: Bool) -> EventLoopFuture<Void> {
99111
let promise = eventLoop.makePromise(of: Void.self)
100112
if offload {
113+
// offloading so user code never blocks the eventloop
101114
DispatchQueue(label: "lambda-\(lifecycleId)").async {
102-
self.initialize { promise.completeWith($0) }
115+
self.bootstrap { promise.completeWith($0) }
103116
}
104117
} else {
105-
self.initialize { promise.completeWith($0) }
118+
self.bootstrap { promise.completeWith($0) }
106119
}
107120
return promise.futureResult
108121
}
122+
}
109123

124+
private extension LambdaHandler {
110125
func handle(eventLoop: EventLoop, lifecycleId: String, offload: Bool, context: Lambda.Context, payload: [UInt8]) -> EventLoopFuture<LambdaResult> {
111-
// offloading so user code never blocks the eventloop
112126
let promise = eventLoop.makePromise(of: LambdaResult.self)
113127
if offload {
128+
// offloading so user code never blocks the eventloop
114129
DispatchQueue(label: "lambda-\(lifecycleId)").async {
115130
self.handle(context: context, payload: payload) { result in
116131
promise.succeed(result)

0 commit comments

Comments
 (0)