Skip to content

Commit 3fcaf66

Browse files
tomerdglbrntt
authored andcommitted
refactor (#12)
motivation: make code simpler to reason about, better use of swift-nio changes: * remove main loop on global queue + wait, replace with recursion * make better use of EvenLoopFuture, instead of result types to signal errors * inject lifecycleId so we can share offloading queue * improve logging * adjust and improve tests * update sanity and generate linux tests script to work better with dates and swiftformat
1 parent fc5e632 commit 3fcaf66

18 files changed

+200
-206
lines changed

Package.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ let package = Package(
1919
.executable(name: "SwiftAwsLambdaCodableSample", targets: ["SwiftAwsLambdaCodableSample"]),
2020
],
2121
dependencies: [
22-
.package(url: "https://github.com/apple/swift-nio.git", from: "2.0.0"),
22+
.package(url: "https://github.com/apple/swift-nio.git", from: "2.8.0"),
2323
.package(url: "https://github.com/apple/swift-log.git", from: "1.0.0"),
2424
.package(url: "https://github.com/ianpartridge/swift-backtrace.git", from: "1.1.0"),
2525
],

Sources/SwiftAwsLambda/Lambda.swift

Lines changed: 64 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -29,49 +29,49 @@ public enum Lambda {
2929
///
3030
/// - note: This is a blocking operation that will run forever, as it's lifecycle is managed by the AWS Lambda Runtime Engine.
3131
public static func run(_ closure: @escaping LambdaClosure) {
32-
let _: LambdaLifecycleResult = self.run(closure)
32+
self.run(closure: closure)
3333
}
3434

3535
/// Run a Lambda defined by implementing the `LambdaHandler` protocol.
3636
///
3737
/// - note: This is a blocking operation that will run forever, as it's lifecycle is managed by the AWS Lambda Runtime Engine.
3838
public static func run(_ handler: LambdaHandler) {
39-
let _: LambdaLifecycleResult = self.run(handler: handler)
39+
self.run(handler: handler)
4040
}
4141

42-
// for testing
43-
internal static func run(maxTimes: Int = 0, stopSignal: Signal = .TERM, _ closure: @escaping LambdaClosure) -> LambdaLifecycleResult {
42+
// for testing and internal use
43+
@discardableResult
44+
internal static func run(maxTimes: Int = 0, stopSignal: Signal = .TERM, closure: @escaping LambdaClosure) -> LambdaLifecycleResult {
4445
return self.run(handler: LambdaClosureWrapper(closure), maxTimes: maxTimes, stopSignal: stopSignal)
4546
}
4647

47-
// for testing
48+
// for testing and internal use
49+
@discardableResult
4850
internal static func run(handler: LambdaHandler, maxTimes: Int = 0, stopSignal: Signal = .TERM) -> LambdaLifecycleResult {
4951
do {
50-
return try self.runAsync(handler: handler, maxTimes: maxTimes, stopSignal: stopSignal).wait()
52+
return try self.runAsync(handler: handler, maxTimes: maxTimes, stopSignal: stopSignal).map { .success($0) }.wait()
5153
} catch {
5254
return .failure(error)
5355
}
5456
}
5557

56-
internal static func runAsync(handler: LambdaHandler, maxTimes: Int = 0, stopSignal: Signal = .TERM) -> EventLoopFuture<LambdaLifecycleResult> {
58+
internal static func runAsync(handler: LambdaHandler, maxTimes: Int = 0, stopSignal: Signal = .TERM) -> EventLoopFuture<Int> {
59+
Backtrace.install()
5760
let logger = Logger(label: "Lambda")
5861
let lifecycle = Lifecycle(logger: logger, handler: handler, maxTimes: maxTimes)
59-
Backtrace.install()
6062
let signalSource = trap(signal: stopSignal) { signal in
6163
logger.info("intercepted signal: \(signal)")
6264
lifecycle.stop()
6365
}
64-
let future = lifecycle.start()
65-
future.whenComplete { _ in
66+
return lifecycle.start().always { _ in
6667
lifecycle.stop()
6768
signalSource.cancel()
6869
}
69-
return future
7070
}
7171

7272
private class Lifecycle {
7373
private let logger: Logger
74-
private let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
74+
private let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: System.coreCount)
7575
private let handler: LambdaHandler
7676
private let max: Int
7777

@@ -88,7 +88,7 @@ public enum Lambda {
8888

8989
deinit {
9090
self.logger.info("lambda lifecycle deinit")
91-
assert(state == .shutdown)
91+
assert(self.state == .shutdown, "invalid state, expected shutdown")
9292
}
9393

9494
private var state: LifecycleState {
@@ -105,63 +105,60 @@ public enum Lambda {
105105
}
106106
}
107107

108-
func start() -> EventLoopFuture<LambdaLifecycleResult> {
108+
func start() -> EventLoopFuture<Int> {
109109
self.state = .initializing
110+
let lifecycleId = NSUUID().uuidString
111+
let eventLoop = self.eventLoopGroup.next()
110112
var logger = self.logger
111-
logger[metadataKey: "lifecycleId"] = .string(NSUUID().uuidString)
112-
let runner = LambdaRunner(eventLoopGroup: eventLoopGroup, lambdaHandler: handler)
113-
let promise = self.eventLoopGroup.next().makePromise(of: LambdaLifecycleResult.self)
114-
self.logger.info("lambda lifecycle starting")
115-
116-
runner.initialize(logger: logger).whenComplete { result in
117-
switch result {
118-
case .failure(let error):
119-
promise.succeed(.failure(error))
120-
case .success:
121-
DispatchQueue.global().async {
122-
var lastError: Error?
123-
var counter = 0
124-
self.state = .active // initializtion completed successfully, we're good to go!
125-
while self.state == .active, lastError == nil, self.max == 0 || counter < self.max {
126-
do {
127-
logger[metadataKey: "lifecycleIteration"] = "\(counter)"
128-
// blocking! per aws lambda runtime spec the polling requests are to be done one at a time
129-
switch try runner.run(logger: logger).wait() {
130-
case .success:
131-
counter = counter + 1
132-
case .failure(let error):
133-
if self.state == .active {
134-
lastError = error
135-
}
136-
}
137-
} catch {
138-
if self.state == .active {
139-
lastError = error
140-
}
141-
}
142-
// flush the log streams so entries are printed with a single invocation
143-
// TODO: should we suuport a flush API in swift-log's default logger?
144-
fflush(stdout)
145-
fflush(stderr)
146-
}
147-
promise.succeed(lastError.map { .failure($0) } ?? .success(counter))
148-
}
149-
}
113+
logger[metadataKey: "lifecycleId"] = .string(lifecycleId)
114+
logger.info("lambda lifecycle starting")
115+
116+
let runner = LambdaRunner(eventLoop: eventLoop, lambdaHandler: handler, lifecycleId: lifecycleId)
117+
return runner.initialize(logger: logger).flatMap { _ in
118+
self.state = .active
119+
return self.run(logger: logger, eventLoop: eventLoop, runner: runner, count: 0)
150120
}
151-
return promise.futureResult
152121
}
153122

154123
func stop() {
155-
if self.state == .stopping {
124+
switch self.state {
125+
case .stopping:
156126
return self.logger.info("lambda lifecycle aready stopping")
157-
}
158-
if self.state == .shutdown {
127+
case .shutdown:
159128
return self.logger.info("lambda lifecycle aready shutdown")
129+
default:
130+
self.logger.info("lambda lifecycle stopping")
131+
self.state = .stopping
132+
try! self.eventLoopGroup.syncShutdownGracefully()
133+
self.state = .shutdown
134+
}
135+
}
136+
137+
private func run(logger: Logger, eventLoop: EventLoop, runner: LambdaRunner, count: Int) -> EventLoopFuture<Int> {
138+
var logger = logger
139+
logger[metadataKey: "lifecycleIteration"] = "\(count)"
140+
return runner.run(logger: logger).flatMap { _ in
141+
switch self.state {
142+
case .idle, .initializing:
143+
preconditionFailure("invalid run state: \(self.state)")
144+
case .active:
145+
if self.max > 0, count >= self.max {
146+
return eventLoop.makeSucceededFuture(count)
147+
}
148+
// recursive! per aws lambda runtime spec the polling requests are to be done one at a time
149+
return self.run(logger: logger, eventLoop: eventLoop, runner: runner, count: count + 1)
150+
case .stopping, .shutdown:
151+
return eventLoop.makeSucceededFuture(count)
152+
}
153+
}.flatMapErrorThrowing { error in
154+
// if we run into errors while shutting down, we ignore them
155+
switch self.state {
156+
case .stopping, .shutdown:
157+
return count
158+
default:
159+
throw error
160+
}
160161
}
161-
self.logger.info("lambda lifecycle stopping")
162-
self.state = .stopping
163-
try! self.eventLoopGroup.syncShutdownGracefully()
164-
self.state = .shutdown
165162
}
166163
}
167164

@@ -225,6 +222,12 @@ public struct LambdaContext {
225222
self.cognitoIdentity = cognitoIdentity
226223
self.clientContext = clientContext
227224
self.deadline = deadline
225+
// mutate logger with context
226+
var logger = logger
227+
logger[metadataKey: "awsRequestId"] = .string(requestId)
228+
if let traceId = traceId {
229+
logger[metadataKey: "awsTraceId"] = .string(traceId)
230+
}
228231
self.logger = logger
229232
}
230233
}

Sources/SwiftAwsLambda/LambdaRunner.swift

Lines changed: 28 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -20,102 +20,83 @@ import NIO
2020
internal final class LambdaRunner {
2121
private let runtimeClient: LambdaRuntimeClient
2222
private let lambdaHandler: LambdaHandler
23-
private let eventLoopGroup: EventLoopGroup
23+
private let eventLoop: EventLoop
24+
private let lifecycleId: String
2425

25-
init(eventLoopGroup: EventLoopGroup, lambdaHandler: LambdaHandler) {
26-
self.eventLoopGroup = eventLoopGroup
27-
self.runtimeClient = LambdaRuntimeClient(eventLoopGroup: self.eventLoopGroup)
26+
init(eventLoop: EventLoop, lambdaHandler: LambdaHandler, lifecycleId: String) {
27+
self.eventLoop = eventLoop
28+
self.runtimeClient = LambdaRuntimeClient(eventLoop: self.eventLoop)
2829
self.lambdaHandler = lambdaHandler
30+
self.lifecycleId = lifecycleId
2931
}
3032

3133
/// Run the user provided initializer. This *must* only be called once.
3234
///
3335
/// - Returns: An `EventLoopFuture<Void>` fulfilled with the outcome of the initialization.
3436
func initialize(logger: Logger) -> EventLoopFuture<Void> {
35-
let initPromise = self.eventLoopGroup.next().makePromise(of: Void.self)
36-
self.lambdaHandler.initialize(promise: initPromise)
37-
37+
logger.info("initializing lambda")
3838
// We need to use `flatMap` instead of `whenFailure` to ensure we complete reporting the result before stopping.
39-
return initPromise.futureResult.flatMapError { error in
40-
self.runtimeClient.reportInitError(logger: logger, error: error).flatMapResult { postResult -> Result<Void, Error> in
41-
switch postResult {
42-
case .failure(let postResultError):
39+
return self.lambdaHandler.initialize(eventLoop: self.eventLoop, lifecycleId: self.lifecycleId).flatMapError { error in
40+
self.runtimeClient.reportInitializationError(logger: logger, error: error).flatMapResult { result -> Result<Void, Error> in
41+
if case .failure(let reportingError) = result {
4342
// We're going to bail out because the init failed, so there's not a lot we can do other than log
4443
// that we couldn't report this error back to the runtime.
45-
logger.error("could not report initialization error to lambda runtime engine: \(postResultError)")
46-
case .success:
47-
logger.info("successfully reported initialization error")
44+
logger.error("failed reporting initialization error to lambda runtime engine: \(reportingError)")
4845
}
4946
// Always return the init error
5047
return .failure(error)
5148
}
5249
}
5350
}
5451

55-
func run(logger: Logger) -> EventLoopFuture<LambdaRunResult> {
56-
var logger = logger
52+
func run(logger: Logger) -> EventLoopFuture<Void> {
5753
logger.info("lambda invocation sequence starting")
5854
// 1. request work from lambda runtime engine
5955
return self.runtimeClient.requestWork(logger: logger).flatMap { workRequestResult in
6056
switch workRequestResult {
6157
case .failure(let error):
6258
logger.error("could not fetch work from lambda runtime engine: \(error)")
63-
return self.makeSucceededFuture(result: .failure(error))
59+
return self.eventLoop.makeFailedFuture(error)
6460
case .success(let context, let payload):
65-
logger[metadataKey: "awsRequestId"] = .string(context.requestId)
66-
if let traceId = context.traceId {
67-
logger[metadataKey: "awsTraceId"] = .string(traceId)
68-
}
6961
// 2. send work to handler
7062
logger.info("sending work to lambda handler \(self.lambdaHandler)")
71-
let promise = self.eventLoopGroup.next().makePromise(of: LambdaResult.self)
72-
self.lambdaHandler.handle(context: context, payload: payload, promise: promise)
73-
return promise.futureResult.flatMap { lambdaResult in
63+
return self.lambdaHandler.handle(eventLoop: self.eventLoop, lifecycleId: self.lifecycleId, context: context, payload: payload).flatMap { lambdaResult in
7464
// 3. report results to runtime engine
7565
self.runtimeClient.reportResults(logger: logger, context: context, result: lambdaResult).flatMap { postResultsResult in
7666
switch postResultsResult {
7767
case .failure(let error):
78-
logger.error("could not report results to lambda runtime engine: \(error)")
79-
return self.makeSucceededFuture(result: .failure(error))
68+
logger.error("failed reporting results to lambda runtime engine: \(error)")
69+
return self.eventLoop.makeFailedFuture(error)
8070
case .success():
8171
// we are done!
8272
logger.info("lambda invocation sequence completed successfully")
83-
return self.makeSucceededFuture(result: .success(()))
73+
return self.eventLoop.makeSucceededFuture(())
8474
}
8575
}
8676
}
8777
}
8878
}
8979
}
90-
91-
private func makeSucceededFuture<T>(result: T) -> EventLoopFuture<T> {
92-
return self.eventLoopGroup.next().makeSucceededFuture(result)
93-
}
9480
}
9581

96-
internal typealias LambdaRunResult = Result<Void, Error>
97-
9882
private extension LambdaHandler {
99-
func handle(context: LambdaContext, payload: [UInt8], promise: EventLoopPromise<LambdaResult>) {
83+
func initialize(eventLoop: EventLoop, lifecycleId: String) -> EventLoopFuture<Void> {
10084
// offloading so user code never blocks the eventloop
101-
DispatchQueue(label: "lambda-\(context.requestId)").async {
102-
self.handle(context: context, payload: payload) { (result: LambdaResult) in
103-
promise.succeed(result)
104-
}
85+
let promise = eventLoop.makePromise(of: Void.self)
86+
DispatchQueue(label: "lambda-\(lifecycleId)").async {
87+
self.initialize { promise.completeWith($0) }
10588
}
89+
return promise.futureResult
10690
}
10791

108-
func initialize(promise: EventLoopPromise<Void>) {
92+
func handle(eventLoop: EventLoop, lifecycleId: String, context: LambdaContext, payload: [UInt8]) -> EventLoopFuture<LambdaResult> {
10993
// offloading so user code never blocks the eventloop
110-
DispatchQueue(label: "lambda-initialize").async {
111-
self.initialize { result in
112-
switch result {
113-
case .failure(let error):
114-
return promise.fail(error)
115-
case .success:
116-
return promise.succeed(())
117-
}
94+
let promise = eventLoop.makePromise(of: LambdaResult.self)
95+
DispatchQueue(label: "lambda-\(lifecycleId)").async {
96+
self.handle(context: context, payload: payload) { result in
97+
promise.succeed(result)
11898
}
11999
}
100+
return promise.futureResult
120101
}
121102
}

0 commit comments

Comments
 (0)