@@ -36,45 +36,32 @@ internal final class LambdaRunner {
36
36
func initialize( logger: Logger ) -> EventLoopFuture < Void > {
37
37
logger. info ( " initializing lambda " )
38
38
// We need to use `flatMap` instead of `whenFailure` to ensure we complete reporting the result before stopping.
39
- return self . lambdaHandler. initialize ( eventLoop: self . eventLoop, lifecycleId: self . lifecycleId) . flatMapError { error in
40
- self . runtimeClient. reportInitializationError ( logger: logger, error: error) . flatMapResult { result -> Result < Void , Error > in
41
- if case . failure( let reportingError) = result {
42
- // We're going to bail out because the init failed, so there's not a lot we can do other than log
43
- // that we couldn't report this error back to the runtime.
44
- logger. error ( " failed reporting initialization error to lambda runtime engine: \( reportingError) " )
45
- }
46
- // Always return the init error
47
- return . failure( error)
39
+ return self . lambdaHandler. initialize ( eventLoop: self . eventLoop, lifecycleId: self . lifecycleId) . peekError { error in
40
+ self . runtimeClient. reportInitializationError ( logger: logger, error: error) . peekError { reportingError in
41
+ // We're going to bail out because the init failed, so there's not a lot we can do other than log
42
+ // that we couldn't report this error back to the runtime.
43
+ logger. error ( " failed reporting initialization error to lambda runtime engine: \( reportingError) " )
48
44
}
49
45
}
50
46
}
51
47
52
48
func run( logger: Logger ) -> EventLoopFuture < Void > {
53
49
logger. info ( " lambda invocation sequence starting " )
54
50
// 1. request work from lambda runtime engine
55
- return self . runtimeClient. requestWork ( logger: logger) . flatMap { workRequestResult in
56
- switch workRequestResult {
57
- case . failure( let error) :
58
- logger. error ( " could not fetch work from lambda runtime engine: \( error) " )
59
- return self . eventLoop. makeFailedFuture ( error)
60
- case . success( let context, let payload) :
61
- // 2. send work to handler
62
- logger. info ( " sending work to lambda handler \( self . lambdaHandler) " )
63
- return self . lambdaHandler. handle ( eventLoop: self . eventLoop, lifecycleId: self . lifecycleId, context: context, payload: payload) . flatMap { lambdaResult in
64
- // 3. report results to runtime engine
65
- self . runtimeClient. reportResults ( logger: logger, context: context, result: lambdaResult) . flatMap { postResultsResult in
66
- switch postResultsResult {
67
- case . failure( let error) :
68
- logger. error ( " failed reporting results to lambda runtime engine: \( error) " )
69
- return self . eventLoop. makeFailedFuture ( error)
70
- case . success( ) :
71
- // we are done!
72
- logger. info ( " lambda invocation sequence completed successfully " )
73
- return self . eventLoop. makeSucceededFuture ( ( ) )
74
- }
75
- }
76
- }
77
- }
51
+ return self . runtimeClient. requestWork ( logger: logger) . peekError { error in
52
+ logger. error ( " could not fetch work from lambda runtime engine: \( error) " )
53
+ } . flatMap { context, payload in
54
+ // 2. send work to handler
55
+ logger. info ( " sending work to lambda handler \( self . lambdaHandler) " )
56
+ return self . lambdaHandler. handle ( eventLoop: self . eventLoop, lifecycleId: self . lifecycleId, context: context, payload: payload) . map { ( context, $0) }
57
+ } . flatMap { context, result in
58
+ // 3. report results to runtime engine
59
+ self . runtimeClient. reportResults ( logger: logger, context: context, result: result)
60
+ } . peekError { error in
61
+ logger. error ( " failed reporting results to lambda runtime engine: \( error) " )
62
+ } . always { result in
63
+ // we are done!
64
+ logger. info ( " lambda invocation sequence completed \( result. successful ? " successfully " : " with failure " ) " )
78
65
}
79
66
}
80
67
}
@@ -100,3 +87,36 @@ private extension LambdaHandler {
100
87
return promise. futureResult
101
88
}
102
89
}
90
+
91
+ // TODO: move to nio?
92
+ private extension EventLoopFuture {
93
+ // callback does not have side effects, failing with original result
94
+ func peekError( _ callback: @escaping ( Error ) -> Void ) -> EventLoopFuture < Value > {
95
+ return self . flatMapError { error in
96
+ callback ( error)
97
+ return self
98
+ }
99
+ }
100
+
101
+ // callback does not have side effects, failing with original result
102
+ func peekError( _ callback: @escaping ( Error ) -> EventLoopFuture < Void > ) -> EventLoopFuture < Value > {
103
+ return self . flatMapError { error in
104
+ let promise = self . eventLoop. makePromise ( of: Value . self)
105
+ callback ( error) . whenComplete { _ in
106
+ promise. completeWith ( self )
107
+ }
108
+ return promise. futureResult
109
+ }
110
+ }
111
+ }
112
+
113
+ private extension Result {
114
+ var successful : Bool {
115
+ switch self {
116
+ case . success:
117
+ return true
118
+ default :
119
+ return false
120
+ }
121
+ }
122
+ }
0 commit comments