Skip to content

Commit fc5e632

Browse files
glbrntttomerd
authored andcommitted
Add lambda initialization (#11)
Motivation: When initialization is expensive it's useful for it to be done upfront instead of for each invocation of the lambda. Similarly, it's helpful for errors thrown during initialization to be reported to the runtime. Modifications: - Add support for initialization in 'LambdaHandler' - Report init errors to '/init/error' Result: Implementers of 'LambdaHandler' can run their own initialization and have errors reported.
1 parent 2334ad5 commit fc5e632

13 files changed

+318
-35
lines changed

Sources/SwiftAwsLambda/Lambda.swift

Lines changed: 55 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ public enum Lambda {
7575
private let handler: LambdaHandler
7676
private let max: Int
7777

78-
private var _state = LifecycleState.initialized
78+
private var _state = LifecycleState.idle
7979
private let stateLock = Lock()
8080

8181
init(logger: Logger, handler: LambdaHandler, maxTimes: Int) {
@@ -106,38 +106,47 @@ public enum Lambda {
106106
}
107107

108108
func start() -> EventLoopFuture<LambdaLifecycleResult> {
109-
self.state = .active
110-
let runner = LambdaRunner(eventLoopGroup: eventLoopGroup, lambdaHandler: handler)
111-
let promise = self.eventLoopGroup.next().makePromise(of: LambdaLifecycleResult.self)
109+
self.state = .initializing
112110
var logger = self.logger
113111
logger[metadataKey: "lifecycleId"] = .string(NSUUID().uuidString)
114-
self.logger.info("lambda lifecycle statring")
115-
DispatchQueue.global().async {
116-
var lastError: Error?
117-
var counter = 0
118-
while self.state == .active, lastError == nil, self.max == 0 || counter < self.max {
119-
do {
120-
logger[metadataKey: "lifecycleIteration"] = "\(counter)"
121-
// blocking! per aws lambda runtime spec the polling requets are to be done one at a time
122-
switch try runner.run(logger: logger).wait() {
123-
case .success:
124-
counter = counter + 1
125-
case .failure(let error):
126-
if self.state == .active {
127-
lastError = error
112+
let runner = LambdaRunner(eventLoopGroup: eventLoopGroup, lambdaHandler: handler)
113+
let promise = self.eventLoopGroup.next().makePromise(of: LambdaLifecycleResult.self)
114+
self.logger.info("lambda lifecycle starting")
115+
116+
runner.initialize(logger: logger).whenComplete { result in
117+
switch result {
118+
case .failure(let error):
119+
promise.succeed(.failure(error))
120+
case .success:
121+
DispatchQueue.global().async {
122+
var lastError: Error?
123+
var counter = 0
124+
self.state = .active // initializtion completed successfully, we're good to go!
125+
while self.state == .active, lastError == nil, self.max == 0 || counter < self.max {
126+
do {
127+
logger[metadataKey: "lifecycleIteration"] = "\(counter)"
128+
// blocking! per aws lambda runtime spec the polling requests are to be done one at a time
129+
switch try runner.run(logger: logger).wait() {
130+
case .success:
131+
counter = counter + 1
132+
case .failure(let error):
133+
if self.state == .active {
134+
lastError = error
135+
}
136+
}
137+
} catch {
138+
if self.state == .active {
139+
lastError = error
140+
}
128141
}
142+
// flush the log streams so entries are printed with a single invocation
143+
// TODO: should we suuport a flush API in swift-log's default logger?
144+
fflush(stdout)
145+
fflush(stderr)
129146
}
130-
} catch {
131-
if self.state == .active {
132-
lastError = error
133-
}
147+
promise.succeed(lastError.map { .failure($0) } ?? .success(counter))
134148
}
135-
// flush the log streams so entries are printed with a single invocation
136-
// TODO: should we suuport a flush API in swift-log's default logger?
137-
fflush(stdout)
138-
fflush(stderr)
139149
}
140-
promise.succeed(lastError.map { .failure($0) } ?? .success(counter))
141150
}
142151
return promise.futureResult
143152
}
@@ -157,10 +166,11 @@ public enum Lambda {
157166
}
158167

159168
private enum LifecycleState: Int {
160-
case initialized = 0
161-
case active = 1
162-
case stopping = 2
163-
case shutdown = 3
169+
case idle
170+
case initializing
171+
case active
172+
case stopping
173+
case shutdown
164174
}
165175
}
166176

@@ -172,11 +182,25 @@ public typealias LambdaCallback = (LambdaResult) -> Void
172182
/// A processing closure for a Lambda that takes a `[UInt8]` and returns a `LambdaResult` result type asynchronously.
173183
public typealias LambdaClosure = (LambdaContext, [UInt8], LambdaCallback) -> Void
174184

185+
/// A result type for a Lambda initialization.
186+
public typealias LambdaInitResult = Result<Void, Error>
187+
188+
/// A callback to provide the result of Lambda initialization.
189+
public typealias LambdaInitCallBack = (LambdaInitResult) -> Void
190+
175191
/// A processing protocol for a Lambda that takes a `[UInt8]` and returns a `LambdaResult` result type asynchronously.
176192
public protocol LambdaHandler {
193+
/// Initializes the `LambdaHandler`.
194+
func initialize(callback: @escaping LambdaInitCallBack)
177195
func handle(context: LambdaContext, payload: [UInt8], callback: @escaping LambdaCallback)
178196
}
179197

198+
extension LambdaHandler {
199+
public func initialize(callback: @escaping LambdaInitCallBack) {
200+
callback(.success(()))
201+
}
202+
}
203+
180204
public struct LambdaContext {
181205
// from aws
182206
public let requestId: String

Sources/SwiftAwsLambda/LambdaRunner.swift

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,30 @@ internal final class LambdaRunner {
2828
self.lambdaHandler = lambdaHandler
2929
}
3030

31+
/// Run the user provided initializer. This *must* only be called once.
32+
///
33+
/// - Returns: An `EventLoopFuture<Void>` fulfilled with the outcome of the initialization.
34+
func initialize(logger: Logger) -> EventLoopFuture<Void> {
35+
let initPromise = self.eventLoopGroup.next().makePromise(of: Void.self)
36+
self.lambdaHandler.initialize(promise: initPromise)
37+
38+
// We need to use `flatMap` instead of `whenFailure` to ensure we complete reporting the result before stopping.
39+
return initPromise.futureResult.flatMapError { error in
40+
self.runtimeClient.reportInitError(logger: logger, error: error).flatMapResult { postResult -> Result<Void, Error> in
41+
switch postResult {
42+
case .failure(let postResultError):
43+
// We're going to bail out because the init failed, so there's not a lot we can do other than log
44+
// that we couldn't report this error back to the runtime.
45+
logger.error("could not report initialization error to lambda runtime engine: \(postResultError)")
46+
case .success:
47+
logger.info("successfully reported initialization error")
48+
}
49+
// Always return the init error
50+
return .failure(error)
51+
}
52+
}
53+
}
54+
3155
func run(logger: Logger) -> EventLoopFuture<LambdaRunResult> {
3256
var logger = logger
3357
logger.info("lambda invocation sequence starting")
@@ -80,4 +104,18 @@ private extension LambdaHandler {
80104
}
81105
}
82106
}
107+
108+
func initialize(promise: EventLoopPromise<Void>) {
109+
// offloading so user code never blocks the eventloop
110+
DispatchQueue(label: "lambda-initialize").async {
111+
self.initialize { result in
112+
switch result {
113+
case .failure(let error):
114+
return promise.fail(error)
115+
case .success:
116+
return promise.succeed(())
117+
}
118+
}
119+
}
120+
}
83121
}

Sources/SwiftAwsLambda/LambdaRuntimeClient.swift

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import NIOHTTP1
2121
/// * /runtime/invocation/next
2222
/// * /runtime/invocation/response
2323
/// * /runtime/invocation/error
24+
/// * /runtime/init/error
2425
internal class LambdaRuntimeClient {
2526
private let baseUrl: String
2627
private let httpClient: HTTPClient
@@ -76,10 +77,29 @@ internal class LambdaRuntimeClient {
7677
response.status != .accepted ? .failure(.badStatusCode(response.status)) : .success(())
7778
}
7879
}
80+
81+
/// Reports an initialization error to the Runtime Engine.
82+
func reportInitError(logger: Logger, error: Error) -> EventLoopFuture<PostInitErrorResult> {
83+
let url = self.baseUrl + Consts.postInitErrorURL
84+
let errorResponse = ErrorResponse(errorType: "InitializationError", errorMessage: "\(error)")
85+
var body: ByteBuffer
86+
switch errorResponse.toJson() {
87+
case .failure(let jsonError):
88+
return self.eventLoopGroup.next().makeSucceededFuture(.failure(.json(jsonError)))
89+
case .success(let json):
90+
body = self.allocator.buffer(capacity: json.utf8.count)
91+
body.writeString(json)
92+
logger.info("reporting initialization error to lambda runtime engine using \(url)")
93+
return self.httpClient.post(url: url, body: body).map { response in
94+
response.status != .accepted ? .failure(.badStatusCode(response.status)) : .success(())
95+
}
96+
}
97+
}
7998
}
8099

81100
internal typealias RequestWorkResult = Result<(LambdaContext, [UInt8]), LambdaRuntimeClientError>
82101
internal typealias PostResultsResult = Result<Void, LambdaRuntimeClientError>
102+
internal typealias PostInitErrorResult = Result<Void, LambdaRuntimeClientError>
83103

84104
internal enum LambdaRuntimeClientError: Error {
85105
case badStatusCode(HTTPResponseStatus)

Sources/SwiftAwsLambda/Utils.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ internal enum Consts {
2828
static let requestWorkURLSuffix = "/next"
2929
static let postResponseURLSuffix = "/response"
3030
static let postErrorURLSuffix = "/error"
31+
static let postInitErrorURL = "\(apiPrefix)/runtime/init/error"
3132
}
3233

3334
/// AWS Lambda HTTP Headers, used to populate the `LambdaContext` object.

Tests/SwiftAwsLambdaTests/Lambda+CodeableTest.swift

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,11 @@ private class GoodBehavior: LambdaServerBehavior {
100100
XCTFail("should not report error")
101101
return .failure(.internalServerError)
102102
}
103+
104+
func processInitError(error: ErrorResponse) -> ProcessInitErrorResult {
105+
XCTFail("should not report init error")
106+
return .failure(.internalServerError)
107+
}
103108
}
104109

105110
private class BadBehavior: LambdaServerBehavior {
@@ -114,6 +119,10 @@ private class BadBehavior: LambdaServerBehavior {
114119
func processError(requestId: String, error: ErrorResponse) -> ProcessErrorResult {
115120
return .failure(.internalServerError)
116121
}
122+
123+
func processInitError(error: ErrorResponse) -> ProcessInitErrorResult {
124+
return .failure(.internalServerError)
125+
}
117126
}
118127

119128
private class Request: Codable {

Tests/SwiftAwsLambdaTests/Lambda+StringTest.swift

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,11 @@ private class GoodBehavior: LambdaServerBehavior {
8484
XCTFail("should not report error")
8585
return .failure(.internalServerError)
8686
}
87+
88+
func processInitError(error: ErrorResponse) -> ProcessInitErrorResult {
89+
XCTFail("should not report init error")
90+
return .failure(.internalServerError)
91+
}
8792
}
8893

8994
private class BadBehavior: LambdaServerBehavior {
@@ -98,6 +103,10 @@ private class BadBehavior: LambdaServerBehavior {
98103
func processError(requestId: String, error: ErrorResponse) -> ProcessErrorResult {
99104
return .failure(.internalServerError)
100105
}
106+
107+
func processInitError(error: ErrorResponse) -> ProcessInitErrorResult {
108+
return .failure(.internalServerError)
109+
}
101110
}
102111

103112
private class StringEchoHandler: LambdaStringHandler {

Tests/SwiftAwsLambdaTests/LambdaRunnerTest.swift

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,11 @@ class LambdaRunnerTest: XCTestCase {
3434
XCTFail("should not report error")
3535
return .failure(.internalServerError)
3636
}
37+
38+
func processInitError(error: ErrorResponse) -> ProcessInitErrorResult {
39+
XCTFail("should not report init error")
40+
return .failure(.internalServerError)
41+
}
3742
}
3843
let result = try runLambda(behavior: Behavior(), handler: EchoHandler()) // .wait()
3944
assertRunLambdaResult(result: result)
@@ -57,6 +62,11 @@ class LambdaRunnerTest: XCTestCase {
5762
XCTAssertEqual(Behavior.error, error.errorMessage, "expecting error to match")
5863
return .success(())
5964
}
65+
66+
func processInitError(error: ErrorResponse) -> ProcessInitErrorResult {
67+
XCTFail("should not report init error")
68+
return .failure(.internalServerError)
69+
}
6070
}
6171
let result = try runLambda(behavior: Behavior(), handler: FailedHandler(Behavior.error)) // .wait()
6272
assertRunLambdaResult(result: result)

Tests/SwiftAwsLambdaTests/LambdaRuntimeClientTest+XCTest.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ extension LambdaRuntimeClientTest {
3030
("testGetWorkServerNoContextError", testGetWorkServerNoContextError),
3131
("testProcessResponseInternalServerError", testProcessResponseInternalServerError),
3232
("testProcessErrorInternalServerError", testProcessErrorInternalServerError),
33+
("testProcessInitErrorInternalServerError", testProcessInitErrorInternalServerError),
3334
]
3435
}
3536
}

Tests/SwiftAwsLambdaTests/LambdaRuntimeClientTest.swift

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@ class LambdaRuntimeClientTest: XCTestCase {
3131
XCTFail("should not report error")
3232
return .failure(.internalServerError)
3333
}
34+
35+
func processInitError(error: ErrorResponse) -> ProcessInitErrorResult {
36+
XCTFail("should not report init error")
37+
return .failure(.internalServerError)
38+
}
3439
}
3540
let result = try runLambda(behavior: Behavior(), handler: EchoHandler())
3641
assertRunLambdaResult(result: result, shouldFailWithError: LambdaRuntimeClientError.badStatusCode(.internalServerError))
@@ -51,6 +56,11 @@ class LambdaRuntimeClientTest: XCTestCase {
5156
XCTFail("should not report error")
5257
return .failure(.internalServerError)
5358
}
59+
60+
func processInitError(error: ErrorResponse) -> ProcessInitErrorResult {
61+
XCTFail("should not report init error")
62+
return .failure(.internalServerError)
63+
}
5464
}
5565
let result = try runLambda(behavior: Behavior(), handler: EchoHandler())
5666
assertRunLambdaResult(result: result, shouldFailWithError: LambdaRuntimeClientError.noBody)
@@ -72,6 +82,11 @@ class LambdaRuntimeClientTest: XCTestCase {
7282
XCTFail("should not report error")
7383
return .failure(.internalServerError)
7484
}
85+
86+
func processInitError(error: ErrorResponse) -> ProcessInitErrorResult {
87+
XCTFail("should not report init error")
88+
return .failure(.internalServerError)
89+
}
7590
}
7691
let result = try runLambda(behavior: Behavior(), handler: EchoHandler())
7792
assertRunLambdaResult(result: result, shouldFailWithError: LambdaRuntimeClientError.noContext)
@@ -91,6 +106,11 @@ class LambdaRuntimeClientTest: XCTestCase {
91106
XCTFail("should not report error")
92107
return .failure(.internalServerError)
93108
}
109+
110+
func processInitError(error: ErrorResponse) -> ProcessInitErrorResult {
111+
XCTFail("should not report init error")
112+
return .failure(.internalServerError)
113+
}
94114
}
95115
let result = try runLambda(behavior: Behavior(), handler: EchoHandler())
96116
assertRunLambdaResult(result: result, shouldFailWithError: LambdaRuntimeClientError.badStatusCode(.internalServerError))
@@ -110,8 +130,39 @@ class LambdaRuntimeClientTest: XCTestCase {
110130
func processError(requestId: String, error: ErrorResponse) -> ProcessErrorResult {
111131
return .failure(.internalServerError)
112132
}
133+
134+
func processInitError(error: ErrorResponse) -> ProcessInitErrorResult {
135+
XCTFail("should not report init error")
136+
return .failure(.internalServerError)
137+
}
113138
}
114139
let result = try runLambda(behavior: Behavior(), handler: FailedHandler("boom"))
115140
assertRunLambdaResult(result: result, shouldFailWithError: LambdaRuntimeClientError.badStatusCode(.internalServerError))
116141
}
142+
143+
func testProcessInitErrorInternalServerError() throws {
144+
class Behavior: LambdaServerBehavior {
145+
func getWork() -> GetWorkResult {
146+
XCTFail("should not get work")
147+
return .failure(.internalServerError)
148+
}
149+
150+
func processResponse(requestId: String, response: String) -> ProcessResponseResult {
151+
XCTFail("should not report results")
152+
return .failure(.internalServerError)
153+
}
154+
155+
func processError(requestId: String, error: ErrorResponse) -> ProcessErrorResult {
156+
XCTFail("should not report error")
157+
return .failure(.internalServerError)
158+
}
159+
160+
func processInitError(error: ErrorResponse) -> ProcessInitErrorResult {
161+
return .failure(.internalServerError)
162+
}
163+
}
164+
let handler = FailedInitializerHandler("boom")
165+
let result = try runLambda(behavior: Behavior(), handler: handler)
166+
assertRunLambdaResult(result: result, shouldFailWithError: handler.initError)
167+
}
117168
}

Tests/SwiftAwsLambdaTests/LambdaTest+XCTest.swift

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ extension LambdaTest {
2727
return [
2828
("testSuceess", testSuceess),
2929
("testFailure", testFailure),
30+
("testInitFailure", testInitFailure),
31+
("testInitFailureAndReportErrorFailure", testInitFailureAndReportErrorFailure),
3032
("testClosureSuccess", testClosureSuccess),
3133
("testClosureFailure", testClosureFailure),
3234
("testStartStop", testStartStop),

0 commit comments

Comments
 (0)