@@ -49,46 +49,48 @@ public enum Lambda {
49
49
@discardableResult
50
50
internal static func run( handler: LambdaHandler , maxTimes: Int = 0 , stopSignal: Signal = . TERM) -> LambdaLifecycleResult {
51
51
do {
52
- return try self . runAsync ( handler: handler, maxTimes: maxTimes, stopSignal: stopSignal) . map { . success( $0) } . wait ( )
52
+ let eventLoopGroup = MultiThreadedEventLoopGroup ( numberOfThreads: System . coreCount)
53
+ defer { try ! eventLoopGroup. syncShutdownGracefully ( ) }
54
+ let result = try self . runAsync ( eventLoopGroup: eventLoopGroup, handler: handler, maxTimes: maxTimes, stopSignal: stopSignal) . wait ( )
55
+ return . success( result)
53
56
} catch {
54
57
return . failure( error)
55
58
}
56
59
}
57
60
58
- internal static func runAsync( handler: LambdaHandler , maxTimes: Int = 0 , stopSignal: Signal = . TERM) -> EventLoopFuture < Int > {
61
+ internal static func runAsync( eventLoopGroup : EventLoopGroup , handler: LambdaHandler , maxTimes: Int = 0 , stopSignal: Signal = . TERM) -> EventLoopFuture < Int > {
59
62
Backtrace . install ( )
60
63
let logger = Logger ( label: " Lambda " )
61
- let lifecycle = Lifecycle ( logger: logger, handler: handler, maxTimes: maxTimes)
64
+ let config = Config ( lifecycle: . init( maxTimes: maxTimes) )
65
+ let lifecycle = Lifecycle ( eventLoop: eventLoopGroup. next ( ) , logger: logger, config: config, handler: handler)
62
66
let signalSource = trap ( signal: stopSignal) { signal in
63
67
logger. info ( " intercepted signal: \( signal) " )
64
68
lifecycle. stop ( )
65
69
}
66
70
return lifecycle. start ( ) . always { _ in
67
- lifecycle. stop ( )
71
+ lifecycle. shutdown ( )
68
72
signalSource. cancel ( )
69
73
}
70
74
}
71
75
72
76
private class Lifecycle {
77
+ private let eventLoop : EventLoop
73
78
private let logger : Logger
74
- private let eventLoopGroup = MultiThreadedEventLoopGroup ( numberOfThreads : System . coreCount )
79
+ private let config : Lambda . Config
75
80
private let handler : LambdaHandler
76
- private let max : Int
77
81
78
82
private var _state = LifecycleState . idle
79
83
private let stateLock = Lock ( )
80
84
81
- init ( logger: Logger , handler : LambdaHandler , maxTimes : Int ) {
82
- assert ( maxTimes >= 0 , " maxTimes must be larger than 0 " )
85
+ init ( eventLoop : EventLoop , logger: Logger , config : Lambda . Config , handler : LambdaHandler ) {
86
+ self . eventLoop = eventLoop
83
87
self . logger = logger
88
+ self . config = config
84
89
self . handler = handler
85
- self . max = maxTimes
86
- self . logger. info ( " lambda lifecycle init " )
87
90
}
88
91
89
92
deinit {
90
- self . logger. info ( " lambda lifecycle deinit " )
91
- assert ( self . state == . shutdown, " invalid state, expected shutdown " )
93
+ precondition ( self . state == . shutdown, " invalid state \( self . state) " )
92
94
}
93
95
94
96
private var state : LifecycleState {
@@ -99,66 +101,139 @@ public enum Lambda {
99
101
}
100
102
set {
101
103
self . stateLock. withLockVoid {
102
- assert ( newValue. rawValue > _state. rawValue, " invalid state \( newValue) after \( _state) " )
104
+ precondition ( newValue. rawValue > _state. rawValue, " invalid state \( newValue) after \( _state) " )
103
105
self . _state = newValue
104
106
}
105
107
}
106
108
}
107
109
108
110
func start( ) -> EventLoopFuture < Int > {
111
+ logger. info ( " lambda lifecycle starting with \( self . config) " )
109
112
self . state = . initializing
110
- let lifecycleId = NSUUID ( ) . uuidString
111
- let eventLoop = self . eventLoopGroup. next ( )
112
113
var logger = self . logger
113
- logger [ metadataKey: " lifecycleId " ] = . string( lifecycleId)
114
- logger. info ( " lambda lifecycle starting " )
115
-
116
- let runner = LambdaRunner ( eventLoop: eventLoop, lambdaHandler: handler, lifecycleId: lifecycleId)
114
+ logger [ metadataKey: " lifecycleId " ] = . string( self . config. lifecycle. id)
115
+ let runner = LambdaRunner ( eventLoop: self . eventLoop, config: self . config, lambdaHandler: self . handler)
117
116
return runner. initialize ( logger: logger) . flatMap { _ in
118
117
self . state = . active
119
- return self . run ( logger: logger, eventLoop : eventLoop , runner: runner, count: 0 )
118
+ return self . run ( logger: logger, runner: runner, count: 0 )
120
119
}
121
120
}
122
121
123
122
func stop( ) {
123
+ self . logger. info ( " lambda lifecycle stopping " )
124
+ self . state = . stopping
125
+ }
126
+
127
+ func shutdown( ) {
128
+ self . logger. info ( " lambda lifecycle shutdown " )
129
+ self . state = . shutdown
130
+ }
131
+
132
+ private func run( logger: Logger , runner: LambdaRunner , count: Int ) -> EventLoopFuture < Int > {
124
133
switch self . state {
125
- case . stopping:
126
- return self . logger. info ( " lambda lifecycle aready stopping " )
127
- case . shutdown:
128
- return self . logger. info ( " lambda lifecycle aready shutdown " )
134
+ case . active:
135
+ if self . config. lifecycle. maxTimes > 0 , count >= self . config. lifecycle. maxTimes {
136
+ return self . eventLoop. makeSucceededFuture ( count)
137
+ }
138
+ var logger = logger
139
+ logger [ metadataKey: " lifecycleIteration " ] = " \( count) "
140
+ return runner. run ( logger: logger) . flatMap { _ in
141
+ // recursive! per aws lambda runtime spec the polling requests are to be done one at a time
142
+ self . run ( logger: logger, runner: runner, count: count + 1 )
143
+ }
144
+ case . stopping, . shutdown:
145
+ return self . eventLoop. makeSucceededFuture ( count)
129
146
default :
130
- self . logger. info ( " lambda lifecycle stopping " )
131
- self . state = . stopping
132
- try ! self . eventLoopGroup. syncShutdownGracefully ( )
133
- self . state = . shutdown
147
+ preconditionFailure ( " invalid run state: \( self . state) " )
134
148
}
135
149
}
150
+ }
136
151
137
- private func run( logger: Logger , eventLoop: EventLoop , runner: LambdaRunner , count: Int ) -> EventLoopFuture < Int > {
138
- var logger = logger
139
- logger [ metadataKey: " lifecycleIteration " ] = " \( count) "
140
- return runner. run ( logger: logger) . flatMap { _ in
141
- switch self . state {
142
- case . idle, . initializing:
143
- preconditionFailure ( " invalid run state: \( self . state) " )
144
- case . active:
145
- if self . max > 0 , count >= self . max {
146
- return eventLoop. makeSucceededFuture ( count)
147
- }
148
- // recursive! per aws lambda runtime spec the polling requests are to be done one at a time
149
- return self . run ( logger: logger, eventLoop: eventLoop, runner: runner, count: count + 1 )
150
- case . stopping, . shutdown:
151
- return eventLoop. makeSucceededFuture ( count)
152
- }
153
- } . flatMapErrorThrowing { error in
154
- // if we run into errors while shutting down, we ignore them
155
- switch self . state {
156
- case . stopping, . shutdown:
157
- return count
158
- default :
159
- throw error
160
- }
152
+ internal struct Config : CustomStringConvertible {
153
+ let lifecycle : Lifecycle
154
+ let runtimeEngine : RuntimeEngine
155
+
156
+ var description : String {
157
+ return " \( Config . self) : \n \( self . lifecycle) \n \( self . runtimeEngine) "
158
+ }
159
+
160
+ init ( lifecycle: Lifecycle = . init( ) , runtimeEngine: RuntimeEngine = . init( ) ) {
161
+ self . lifecycle = lifecycle
162
+ self . runtimeEngine = runtimeEngine
163
+ }
164
+
165
+ struct Lifecycle : CustomStringConvertible {
166
+ let id : String
167
+ let maxTimes : Int
168
+
169
+ init ( id: String ? = nil , maxTimes: Int ? = nil ) {
170
+ self . id = id ?? NSUUID ( ) . uuidString
171
+ self . maxTimes = maxTimes ?? 0
172
+ precondition ( self . maxTimes >= 0 , " maxTimes must be equal or larger than 0 " )
173
+ }
174
+
175
+ var description : String {
176
+ return " \( Lifecycle . self) (id: \( self . id) , maxTimes: \( self . maxTimes) ) "
177
+ }
178
+ }
179
+
180
+ struct RuntimeEngine : CustomStringConvertible {
181
+ let baseURL : HTTPURL
182
+ let keepAlive : Bool
183
+ let requestTimeout : TimeAmount ?
184
+
185
+ init ( baseURL: String ? = nil , keepAlive: Bool ? = nil , requestTimeout: TimeAmount ? = nil ) {
186
+ self . baseURL = HTTPURL ( baseURL ?? Environment . string ( Consts . hostPortEnvVariableName) . flatMap { " http:// \( $0) " } ?? " http:// \( Defaults . host) : \( Defaults . port) " )
187
+ self . keepAlive = keepAlive ?? true
188
+ self . requestTimeout = requestTimeout ?? Environment . int ( Consts . requestTimeoutEnvVariableName) . flatMap { . milliseconds( Int64 ( $0) ) }
189
+ }
190
+
191
+ var description : String {
192
+ return " \( RuntimeEngine . self) (baseURL: \( self . baseURL) , keepAlive: \( self . keepAlive) , requestTimeout: \( String ( describing: self . requestTimeout) ) ) "
193
+ }
194
+ }
195
+ }
196
+
197
+ internal struct HTTPURL : Equatable , CustomStringConvertible {
198
+ private let url : URL
199
+ let host : String
200
+ let port : Int
201
+
202
+ init ( _ url: String ) {
203
+ guard let url = Foundation . URL ( string: url) else {
204
+ preconditionFailure ( " invalid url " )
205
+ }
206
+ guard let host = url. host else {
207
+ preconditionFailure ( " invalid url host " )
161
208
}
209
+ guard let port = url. port else {
210
+ preconditionFailure ( " invalid url port " )
211
+ }
212
+ self . url = url
213
+ self . host = host
214
+ self . port = port
215
+ }
216
+
217
+ init ( url: URL , host: String , port: Int ) {
218
+ self . url = url
219
+ self . host = host
220
+ self . port = port
221
+ }
222
+
223
+ func appendingPathComponent( _ pathComponent: String ) -> HTTPURL {
224
+ return . init( url: self . url. appendingPathComponent ( pathComponent) , host: self . host, port: self . port)
225
+ }
226
+
227
+ var path : String {
228
+ return self . url. path
229
+ }
230
+
231
+ var query : String ? {
232
+ return self . url. query
233
+ }
234
+
235
+ var description : String {
236
+ return self . url. description
162
237
}
163
238
}
164
239
@@ -193,6 +268,7 @@ public protocol LambdaHandler {
193
268
}
194
269
195
270
extension LambdaHandler {
271
+ @inlinable
196
272
public func initialize( callback: @escaping LambdaInitCallBack ) {
197
273
callback ( . success( ( ) ) )
198
274
}
0 commit comments