Skip to content

Commit f4d5739

Browse files
tomerdGitHub Enterprise
authored and
GitHub Enterprise
committed
more refactoring (#13)
motivation: make lambda execution sequence easier to reason about changes: * add `peekFailure` extension to `EventLoopFuture` to handle error without side effects * update process chain to make use of `peekFailure`
1 parent a954310 commit f4d5739

File tree

3 files changed

+75
-51
lines changed

3 files changed

+75
-51
lines changed

Sources/SwiftAwsLambda/LambdaRunner.swift

Lines changed: 52 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -36,45 +36,32 @@ internal final class LambdaRunner {
3636
func initialize(logger: Logger) -> EventLoopFuture<Void> {
3737
logger.info("initializing lambda")
3838
// We need to use `flatMap` instead of `whenFailure` to ensure we complete reporting the result before stopping.
39-
return self.lambdaHandler.initialize(eventLoop: self.eventLoop, lifecycleId: self.lifecycleId).flatMapError { error in
40-
self.runtimeClient.reportInitializationError(logger: logger, error: error).flatMapResult { result -> Result<Void, Error> in
41-
if case .failure(let reportingError) = result {
42-
// We're going to bail out because the init failed, so there's not a lot we can do other than log
43-
// that we couldn't report this error back to the runtime.
44-
logger.error("failed reporting initialization error to lambda runtime engine: \(reportingError)")
45-
}
46-
// Always return the init error
47-
return .failure(error)
39+
return self.lambdaHandler.initialize(eventLoop: self.eventLoop, lifecycleId: self.lifecycleId).peekError { error in
40+
self.runtimeClient.reportInitializationError(logger: logger, error: error).peekError { reportingError in
41+
// We're going to bail out because the init failed, so there's not a lot we can do other than log
42+
// that we couldn't report this error back to the runtime.
43+
logger.error("failed reporting initialization error to lambda runtime engine: \(reportingError)")
4844
}
4945
}
5046
}
5147

5248
func run(logger: Logger) -> EventLoopFuture<Void> {
5349
logger.info("lambda invocation sequence starting")
5450
// 1. request work from lambda runtime engine
55-
return self.runtimeClient.requestWork(logger: logger).flatMap { workRequestResult in
56-
switch workRequestResult {
57-
case .failure(let error):
58-
logger.error("could not fetch work from lambda runtime engine: \(error)")
59-
return self.eventLoop.makeFailedFuture(error)
60-
case .success(let context, let payload):
61-
// 2. send work to handler
62-
logger.info("sending work to lambda handler \(self.lambdaHandler)")
63-
return self.lambdaHandler.handle(eventLoop: self.eventLoop, lifecycleId: self.lifecycleId, context: context, payload: payload).flatMap { lambdaResult in
64-
// 3. report results to runtime engine
65-
self.runtimeClient.reportResults(logger: logger, context: context, result: lambdaResult).flatMap { postResultsResult in
66-
switch postResultsResult {
67-
case .failure(let error):
68-
logger.error("failed reporting results to lambda runtime engine: \(error)")
69-
return self.eventLoop.makeFailedFuture(error)
70-
case .success():
71-
// we are done!
72-
logger.info("lambda invocation sequence completed successfully")
73-
return self.eventLoop.makeSucceededFuture(())
74-
}
75-
}
76-
}
77-
}
51+
return self.runtimeClient.requestWork(logger: logger).peekError { error in
52+
logger.error("could not fetch work from lambda runtime engine: \(error)")
53+
}.flatMap { context, payload in
54+
// 2. send work to handler
55+
logger.info("sending work to lambda handler \(self.lambdaHandler)")
56+
return self.lambdaHandler.handle(eventLoop: self.eventLoop, lifecycleId: self.lifecycleId, context: context, payload: payload).map { (context, $0) }
57+
}.flatMap { context, result in
58+
// 3. report results to runtime engine
59+
self.runtimeClient.reportResults(logger: logger, context: context, result: result)
60+
}.peekError { error in
61+
logger.error("failed reporting results to lambda runtime engine: \(error)")
62+
}.always { result in
63+
// we are done!
64+
logger.info("lambda invocation sequence completed \(result.successful ? "successfully" : "with failure")")
7865
}
7966
}
8067
}
@@ -100,3 +87,36 @@ private extension LambdaHandler {
10087
return promise.futureResult
10188
}
10289
}
90+
91+
// TODO: move to nio?
92+
private extension EventLoopFuture {
93+
// callback does not have side effects, failing with original result
94+
func peekError(_ callback: @escaping (Error) -> Void) -> EventLoopFuture<Value> {
95+
return self.flatMapError { error in
96+
callback(error)
97+
return self
98+
}
99+
}
100+
101+
// callback does not have side effects, failing with original result
102+
func peekError(_ callback: @escaping (Error) -> EventLoopFuture<Void>) -> EventLoopFuture<Value> {
103+
return self.flatMapError { error in
104+
let promise = self.eventLoop.makePromise(of: Value.self)
105+
callback(error).whenComplete { _ in
106+
promise.completeWith(self)
107+
}
108+
return promise.futureResult
109+
}
110+
}
111+
}
112+
113+
private extension Result {
114+
var successful: Bool {
115+
switch self {
116+
case .success:
117+
return true
118+
default:
119+
return false
120+
}
121+
}
122+
}

Sources/SwiftAwsLambda/LambdaRuntimeClient.swift

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -35,24 +35,26 @@ internal class LambdaRuntimeClient {
3535
self.allocator = ByteBufferAllocator()
3636
}
3737

38-
func requestWork(logger: Logger) -> EventLoopFuture<RequestWorkResult> {
38+
/// Requests work from the Runtime Engine.
39+
func requestWork(logger: Logger) -> EventLoopFuture<(LambdaContext, [UInt8])> {
3940
let url = self.baseUrl + Consts.invokationURLPrefix + Consts.requestWorkURLSuffix
4041
logger.info("requesting work from lambda runtime engine using \(url)")
41-
return self.httpClient.get(url: url).map { response in
42+
return self.httpClient.get(url: url).flatMapThrowing { response in
4243
guard response.status == .ok else {
43-
return .failure(.badStatusCode(response.status))
44+
throw LambdaRuntimeClientError.badStatusCode(response.status)
4445
}
4546
guard let payload = response.readWholeBody() else {
46-
return .failure(.noBody)
47+
throw LambdaRuntimeClientError.noBody
4748
}
4849
guard let context = LambdaContext(logger: logger, response: response) else {
49-
return .failure(.noContext)
50+
throw LambdaRuntimeClientError.noContext
5051
}
51-
return .success((context, payload))
52+
return (context, payload)
5253
}
5354
}
5455

55-
func reportResults(logger: Logger, context: LambdaContext, result: LambdaResult) -> EventLoopFuture<PostResultsResult> {
56+
/// Reports a result to the Runtime Engine.
57+
func reportResults(logger: Logger, context: LambdaContext, result: LambdaResult) -> EventLoopFuture<Void> {
5658
var url = self.baseUrl + Consts.invokationURLPrefix + "/" + context.requestId
5759
var body: ByteBuffer
5860
switch result {
@@ -66,41 +68,43 @@ internal class LambdaRuntimeClient {
6668
let error = ErrorResponse(errorType: "FunctionError", errorMessage: "\(error)")
6769
switch error.toJson() {
6870
case .failure(let jsonError):
69-
return self.eventLoop.makeSucceededFuture(.failure(.json(jsonError)))
71+
return self.eventLoop.makeFailedFuture(LambdaRuntimeClientError.json(jsonError))
7072
case .success(let json):
7173
body = self.allocator.buffer(capacity: json.utf8.count)
7274
body.writeString(json)
7375
}
7476
}
7577
logger.info("reporting results to lambda runtime engine using \(url)")
76-
return self.httpClient.post(url: url, body: body).map { response in
77-
response.status != .accepted ? .failure(.badStatusCode(response.status)) : .success(())
78+
return self.httpClient.post(url: url, body: body).flatMapThrowing { response in
79+
guard response.status == .accepted else {
80+
throw LambdaRuntimeClientError.badStatusCode(response.status)
81+
}
82+
return ()
7883
}
7984
}
8085

8186
/// Reports an initialization error to the Runtime Engine.
82-
func reportInitializationError(logger: Logger, error: Error) -> EventLoopFuture<PostInitializationErrorResult> {
87+
func reportInitializationError(logger: Logger, error: Error) -> EventLoopFuture<Void> {
8388
let url = self.baseUrl + Consts.postInitErrorURL
8489
let errorResponse = ErrorResponse(errorType: "InitializationError", errorMessage: "\(error)")
8590
var body: ByteBuffer
8691
switch errorResponse.toJson() {
8792
case .failure(let jsonError):
88-
return self.eventLoop.makeSucceededFuture(.failure(.json(jsonError)))
93+
return self.eventLoop.makeFailedFuture(LambdaRuntimeClientError.json(jsonError))
8994
case .success(let json):
9095
body = self.allocator.buffer(capacity: json.utf8.count)
9196
body.writeString(json)
9297
logger.info("reporting initialization error to lambda runtime engine using \(url)")
93-
return self.httpClient.post(url: url, body: body).map { response in
94-
response.status != .accepted ? .failure(.badStatusCode(response.status)) : .success(())
98+
return self.httpClient.post(url: url, body: body).flatMapThrowing { response in
99+
guard response.status == .accepted else {
100+
throw LambdaRuntimeClientError.badStatusCode(response.status)
101+
}
102+
return ()
95103
}
96104
}
97105
}
98106
}
99107

100-
internal typealias RequestWorkResult = Result<(LambdaContext, [UInt8]), LambdaRuntimeClientError>
101-
internal typealias PostResultsResult = Result<Void, LambdaRuntimeClientError>
102-
internal typealias PostInitializationErrorResult = Result<Void, LambdaRuntimeClientError>
103-
104108
internal enum LambdaRuntimeClientError: Error, Equatable {
105109
case badStatusCode(HTTPResponseStatus)
106110
case noBody

Tests/SwiftAwsLambdaTests/LambdaTest.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ class LambdaTest: XCTestCase {
8484
kill(getpid(), signal.rawValue)
8585
}
8686
let result = try future.wait()
87-
XCTAssertGreaterThan(result, 0, "should have stopped before any reuqetsst made")
87+
XCTAssertGreaterThan(result, 0, "should have stopped before any request made")
8888
XCTAssertLessThan(result, max, "should have stopped before \(max)")
8989
try server.stop().wait()
9090
}

0 commit comments

Comments
 (0)