Skip to content

Commit a93ed90

Browse files
tomerdGitHub Enterprise
authored and
GitHub Enterprise
committed
shutdown handler, lambda api adjustment, latest swift-nio (#5)
* shutdown handler, lambda api adjustment, latest swift-nio motivation: * shutdown: while not required, demonstrate better example for shutdown * latest nio and api adjustments: fixes runtime issues observed when running for realz changes: * add signal based shutdown hook that terminates and cleans up resources * pull latest swift-nio to get fix to thread renaming * adjust aws lambda api prefix and headers * reformat linux tests code
1 parent 498c8b6 commit a93ed90

22 files changed

+308
-199
lines changed

Package.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ let package = Package(
1919
.executable(name: "SwiftAwsLambdaCodableSample", targets: ["SwiftAwsLambdaCodableSample"]),
2020
],
2121
dependencies: [
22-
.package(url: "https://github.com/apple/swift-nio.git", .upToNextMajor(from: "1.9.5")),
22+
.package(url: "https://github.com/apple/swift-nio.git", .branch("master")),
2323
],
2424
targets: targets
2525
)

Sources/SwiftAwsLambda/HttpClient.swift

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -25,16 +25,16 @@ internal class HTTPClient {
2525

2626
func get(url: String) -> EventLoopFuture<HTTPResponse> {
2727
guard let request = HTTPRequest(url: url, method: .GET) else {
28-
return eventLoop.newFailedFuture(error: HTTPClientError.invalidRequest)
28+
return self.eventLoop.newFailedFuture(error: HTTPClientError.invalidRequest)
2929
}
30-
return execute(request)
30+
return self.execute(request)
3131
}
3232

3333
func post(url: String, body: ByteBuffer? = nil) -> EventLoopFuture<HTTPResponse> {
3434
guard let request = HTTPRequest(url: url, method: .POST, body: body) else {
35-
return eventLoop.newFailedFuture(error: HTTPClientError.invalidRequest)
35+
return self.eventLoop.newFailedFuture(error: HTTPClientError.invalidRequest)
3636
}
37-
return execute(request)
37+
return self.execute(request)
3838
}
3939

4040
func execute(_ request: HTTPRequest) -> EventLoopFuture<HTTPResponse> {
@@ -83,9 +83,9 @@ internal struct HTTPRequest: Equatable {
8383

8484
self.version = version
8585
self.method = method
86-
target = url.path + (url.query.map { "?" + $0 } ?? "")
86+
self.target = url.path + (url.query.map { "?" + $0 } ?? "")
8787
self.host = host
88-
port = url.port ?? 80
88+
self.port = url.port ?? 80
8989
self.headers = headers
9090
self.body = body
9191
}
@@ -118,9 +118,9 @@ private struct HTTPResponseAccumulator {
118118
var state = State.idle
119119

120120
mutating func handle(_ head: HTTPResponseHead) {
121-
switch state {
121+
switch self.state {
122122
case .idle:
123-
state = .head(head)
123+
self.state = .head(head)
124124
case .head:
125125
preconditionFailure("head already set")
126126
case .body:
@@ -131,11 +131,11 @@ private struct HTTPResponseAccumulator {
131131
}
132132

133133
mutating func handle(_ part: ByteBuffer) {
134-
switch state {
134+
switch self.state {
135135
case .idle:
136136
preconditionFailure("no head received before body")
137137
case let .head(head):
138-
state = .body(head, part)
138+
self.state = .body(head, part)
139139
case .body(let head, var body):
140140
var part = part
141141
body.write(buffer: &part)
@@ -197,19 +197,19 @@ private class HTTPPartsHandler: ChannelInboundHandler, ChannelOutboundHandler {
197197

198198
switch response {
199199
case let .head(head):
200-
accumulator.handle(head)
200+
self.accumulator.handle(head)
201201
case let .body(body):
202-
accumulator.handle(body)
202+
self.accumulator.handle(body)
203203
case .end:
204-
switch accumulator.state {
204+
switch self.accumulator.state {
205205
case .idle:
206206
preconditionFailure("no head received before end")
207207
case let .head(head):
208208
ctx.fireChannelRead(wrapInboundOut(HTTPResponse(status: head.status, headers: head.headers, body: nil)))
209-
accumulator.state = .end
209+
self.accumulator.state = .end
210210
case let .body(head, body):
211211
ctx.fireChannelRead(wrapInboundOut(HTTPResponse(status: head.status, headers: head.headers, body: body)))
212-
accumulator.state = .end
212+
self.accumulator.state = .end
213213
case .end:
214214
preconditionFailure("request already processed")
215215
}
@@ -244,17 +244,17 @@ private class UnaryHTTPHandler: ChannelInboundHandler, ChannelOutboundHandler {
244244
self.failAllPromises(error: error)
245245
ctx.close(promise: nil)
246246
}
247-
247+
248248
func channelInactive(ctx: ChannelHandlerContext) {
249249
// Fail all promises
250250
self.failAllPromises(error: HTTPClientError.connectionClosed)
251251
ctx.fireChannelInactive()
252252
}
253-
253+
254254
private func failAllPromises(error: Error) {
255255
while let promise = buffer.first {
256256
promise.fail(error: error)
257-
buffer.removeFirst()
257+
self.buffer.removeFirst()
258258
}
259259
}
260260
}

Sources/SwiftAwsLambda/Lambda+Codable.swift

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,21 +16,21 @@ import Foundation
1616

1717
extension Lambda {
1818
public static func run<In: Decodable, Out: Encodable>(_ closure: @escaping LambdaCodableClosure<In, Out>) -> LambdaLifecycleResult {
19-
return run(LambdaClosureWrapper(closure))
19+
return self.run(LambdaClosureWrapper(closure))
2020
}
2121

2222
public static func run<T>(_ handler: T) -> LambdaLifecycleResult where T: LambdaCodableHandler {
23-
return run(handler as LambdaHandler)
23+
return self.run(handler as LambdaHandler)
2424
}
2525

2626
// for testing
2727
internal static func run<In: Decodable, Out: Encodable>(closure: @escaping LambdaCodableClosure<In, Out>, maxTimes: Int) -> LambdaLifecycleResult {
28-
return run(handler: LambdaClosureWrapper(closure), maxTimes: maxTimes)
28+
return self.run(handler: LambdaClosureWrapper(closure), maxTimes: maxTimes)
2929
}
3030

3131
// for testing
3232
internal static func run<T>(handler: T, maxTimes: Int) -> LambdaLifecycleResult where T: LambdaCodableHandler {
33-
return run(handler: handler as LambdaHandler, maxTimes: maxTimes)
33+
return self.run(handler: handler as LambdaHandler, maxTimes: maxTimes)
3434
}
3535
}
3636

@@ -66,7 +66,7 @@ public extension LambdaCodableHandler {
6666
guard let payloadAsCodable = codec.decode(payload) else {
6767
return callback(.failure("failed decoding payload (in)"))
6868
}
69-
handle(context: context, payload: payloadAsCodable, callback: { result in
69+
self.handle(context: context, payload: payloadAsCodable, callback: { result in
7070
switch result {
7171
case let .success(encodable):
7272
guard let codableAsBytes = self.codec.encode(encodable) else {
@@ -80,16 +80,16 @@ public extension LambdaCodableHandler {
8080
}
8181
}
8282

83-
// This is a class as encoder amd decoder are a class, which means its cheaper to hold a reference to both in a class then a struct.
83+
// This is a class as encoder amd decoder are a class, which means its cheaper to hold a reference to both in a class then a struct.
8484
private class LambdaCodableJsonCodec<In: Decodable, Out: Encodable>: LambdaCodableCodec<In, Out> {
8585
private let encoder = JSONEncoder()
8686
private let decoder = JSONDecoder()
8787
public override func encode(_ value: Out) -> [UInt8]? {
88-
return try? [UInt8](encoder.encode(value))
88+
return try? [UInt8](self.encoder.encode(value))
8989
}
9090

9191
public override func decode(_ data: [UInt8]) -> In? {
92-
return try? decoder.decode(In.self, from: Data(data))
92+
return try? self.decoder.decode(In.self, from: Data(data))
9393
}
9494
}
9595

@@ -102,6 +102,6 @@ private struct LambdaClosureWrapper<In: Decodable, Out: Encodable>: LambdaCodabl
102102
}
103103

104104
public func handle(context: LambdaContext, payload: In, callback: @escaping LambdaCodableCallback<Out>) {
105-
closure(context, payload, callback)
105+
self.closure(context, payload, callback)
106106
}
107107
}

Sources/SwiftAwsLambda/Lambda+String.swift

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,21 +14,21 @@
1414

1515
extension Lambda {
1616
public static func run(_ closure: @escaping LambdaStringClosure) -> LambdaLifecycleResult {
17-
return run(LambdaClosureWrapper(closure))
17+
return self.run(LambdaClosureWrapper(closure))
1818
}
1919

2020
public static func run(_ handler: LambdaStringHandler) -> LambdaLifecycleResult {
21-
return run(handler as LambdaHandler)
21+
return self.run(handler as LambdaHandler)
2222
}
2323

2424
// for testing
2525
internal static func run(closure: @escaping LambdaStringClosure, maxTimes: Int) -> LambdaLifecycleResult {
26-
return run(handler: LambdaClosureWrapper(closure), maxTimes: maxTimes)
26+
return self.run(handler: LambdaClosureWrapper(closure), maxTimes: maxTimes)
2727
}
2828

2929
// for testing
3030
internal static func run(handler: LambdaStringHandler, maxTimes: Int) -> LambdaLifecycleResult {
31-
return run(handler: handler as LambdaHandler, maxTimes: maxTimes)
31+
return self.run(handler: handler as LambdaHandler, maxTimes: maxTimes)
3232
}
3333
}
3434

@@ -47,7 +47,7 @@ public extension LambdaStringHandler {
4747
guard let payloadAsString = String(bytes: payload, encoding: .utf8) else {
4848
return callback(.failure("failed casting payload to String"))
4949
}
50-
handle(context: context, payload: payloadAsString, callback: { result in
50+
self.handle(context: context, payload: payloadAsString, callback: { result in
5151
switch result {
5252
case let .success(string):
5353
return callback(.success([UInt8](string.utf8)))
@@ -65,6 +65,6 @@ private struct LambdaClosureWrapper: LambdaStringHandler {
6565
}
6666

6767
func handle(context: LambdaContext, payload: String, callback: @escaping LambdaStringCallback) {
68-
closure(context, payload, callback)
68+
self.closure(context, payload, callback)
6969
}
7070
}

Sources/SwiftAwsLambda/Lambda.swift

Lines changed: 95 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -12,57 +12,127 @@
1212
//
1313
//===----------------------------------------------------------------------===//
1414

15+
import Foundation
1516
import NIO
1617

1718
public enum Lambda {
1819
public static func run(_ closure: @escaping LambdaClosure) -> LambdaLifecycleResult {
19-
return run(LambdaClosureWrapper(closure))
20+
return self._run(handler: LambdaClosureWrapper(closure))
2021
}
2122

2223
public static func run(_ handler: LambdaHandler) -> LambdaLifecycleResult {
23-
return Lifecycle(handler: handler).start()
24+
return self._run(handler: handler)
2425
}
2526

2627
// for testing
2728
internal static func run(closure: @escaping LambdaClosure, maxTimes: Int) -> LambdaLifecycleResult {
28-
return run(handler: LambdaClosureWrapper(closure), maxTimes: maxTimes)
29+
return self._run(handler: LambdaClosureWrapper(closure), maxTimes: maxTimes)
2930
}
3031

3132
// for testing
3233
internal static func run(handler: LambdaHandler, maxTimes: Int) -> LambdaLifecycleResult {
33-
return Lifecycle(handler: handler, maxTimes: maxTimes).start()
34+
return self._run(handler: handler, maxTimes: maxTimes)
35+
}
36+
37+
internal static func _run(handler: LambdaHandler, maxTimes: Int = 0, stopSignal: Signal = .INT) -> LambdaLifecycleResult {
38+
do {
39+
return try self._run(handler: handler, maxTimes: maxTimes, stopSignal: stopSignal).wait()
40+
} catch {
41+
return .failure(error)
42+
}
43+
}
44+
45+
internal static func _run(handler: LambdaHandler, maxTimes: Int = 0, stopSignal: Signal = .INT) -> EventLoopFuture<LambdaLifecycleResult> {
46+
let lifecycle = Lifecycle(handler: handler, maxTimes: maxTimes)
47+
let signalSource = trap(signal: stopSignal) { signal in
48+
print("intercepted signal: \(signal)")
49+
lifecycle.stop()
50+
}
51+
let future = lifecycle.start()
52+
future.whenComplete {
53+
signalSource.cancel()
54+
}
55+
return future
3456
}
3557

3658
private class Lifecycle {
59+
private let eventLoop = MultiThreadedEventLoopGroup(numberOfThreads: 1).next()
3760
private let handler: LambdaHandler
3861
private let max: Int
39-
private var counter: Int = 0
4062

41-
init(handler: LambdaHandler, maxTimes: Int = 0) {
63+
private var _state = LifecycleState.initialized
64+
private let stateQueue = DispatchQueue(label: "LifecycleState")
65+
66+
init(handler: LambdaHandler, maxTimes: Int) {
67+
print("lambda lifecycle init")
4268
self.handler = handler
43-
max = maxTimes
44-
assert(max >= 0)
69+
self.max = maxTimes
70+
assert(self.max >= 0)
71+
}
72+
73+
deinit {
74+
print("lambda lifecycle deinit")
75+
assert(state == .shutdown)
76+
}
77+
78+
private var state: LifecycleState {
79+
get {
80+
return self.stateQueue.sync {
81+
self._state
82+
}
83+
}
84+
set {
85+
self.stateQueue.sync {
86+
assert(newValue.rawValue > _state.rawValue, "invalid state \(newValue) after \(_state)")
87+
self._state = newValue
88+
}
89+
}
4590
}
4691

47-
func start() -> LambdaLifecycleResult {
48-
var err: Error?
49-
let runner = LambdaRunner(handler)
50-
while nil == err && (0 == max || counter < max) {
51-
do {
52-
// blocking! per aws lambda runtime spec the polling requets are to be done one at a time
53-
let result = try runner.run().wait()
54-
switch result {
55-
case .success:
56-
counter = counter + 1
57-
case let .failure(e):
58-
err = e
92+
func start() -> EventLoopFuture<LambdaLifecycleResult> {
93+
self.state = .active
94+
let runner = LambdaRunner(eventLoop: eventLoop, lambdaHandler: handler)
95+
let promise: EventLoopPromise<LambdaLifecycleResult> = eventLoop.newPromise()
96+
print("lambda lifecycle statring")
97+
DispatchQueue.global().async {
98+
var err: Error?
99+
var counter = 0
100+
while .active == self.state && nil == err && (0 == self.max || counter < self.max) {
101+
do {
102+
// blocking! per aws lambda runtime spec the polling requets are to be done one at a time
103+
let result = try runner.run().wait()
104+
switch result {
105+
case .success:
106+
counter = counter + 1
107+
case let .failure(e):
108+
err = e
109+
}
110+
} catch {
111+
err = error
59112
}
60-
} catch {
61-
err = error
62113
}
114+
promise.succeed(result: err.map { _ in .failure(err!) } ?? .success(counter))
115+
self.shutdown()
63116
}
64-
return err.map { _ in .failure(err!) } ?? .success(counter)
117+
return promise.futureResult
65118
}
119+
120+
func stop() {
121+
print("lambda lifecycle stopping")
122+
self.state = .stopping
123+
}
124+
125+
private func shutdown() {
126+
try! self.eventLoop.syncShutdownGracefully()
127+
self.state = .shutdown
128+
}
129+
}
130+
131+
private enum LifecycleState: Int {
132+
case initialized = 0
133+
case active = 1
134+
case stopping = 2
135+
case shutdown = 3
66136
}
67137
}
68138

@@ -84,7 +154,7 @@ public struct LambdaContext {
84154
public let invokedFunctionArn: String?
85155
public let cognitoIdentity: String?
86156
public let clientContext: String?
87-
public let deadlineNs: String?
157+
public let deadline: String?
88158
}
89159

90160
private struct LambdaClosureWrapper: LambdaHandler {
@@ -94,6 +164,6 @@ private struct LambdaClosureWrapper: LambdaHandler {
94164
}
95165

96166
func handle(context: LambdaContext, payload: [UInt8], callback: @escaping LambdaCallback) {
97-
closure(context, payload, callback)
167+
self.closure(context, payload, callback)
98168
}
99169
}

0 commit comments

Comments
 (0)