Skip to content

Commit dee3570

Browse files
committed
terminator handler
motivation: make it simpler to register shutdown hooks changes: * introduce Terminaotr helper that allow registering and de-registaring shutdown handlers * expose the new terminator hanler on the InitializationContext and deprecate ShutdownContext * deprecate the Handler ::shutdown protocol requirment * update the runtime code to use the new terminator instead of calling shutdown on the handler * add and adjust tests
1 parent 51c27f9 commit dee3570

File tree

8 files changed

+153
-21
lines changed

8 files changed

+153
-21
lines changed

Sources/AWSLambdaRuntimeCore/LambdaContext.swift

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,14 @@ extension Lambda {
3838
/// `ByteBufferAllocator` to allocate `ByteBuffer`
3939
public let allocator: ByteBufferAllocator
4040

41-
init(logger: Logger, eventLoop: EventLoop, allocator: ByteBufferAllocator) {
41+
/// `Terminator` to register shutdown operations
42+
public let terminator: Terminator
43+
44+
init(logger: Logger, eventLoop: EventLoop, allocator: ByteBufferAllocator, terminator: Terminator) {
4245
self.eventLoop = eventLoop
4346
self.logger = logger
4447
self.allocator = allocator
48+
self.terminator = terminator
4549
}
4650

4751
/// This interface is not part of the public API and must not be used by adopters. This API is not part of semver versioning.
@@ -52,7 +56,8 @@ extension Lambda {
5256
InitializationContext(
5357
logger: logger,
5458
eventLoop: eventLoop,
55-
allocator: ByteBufferAllocator()
59+
allocator: ByteBufferAllocator(),
60+
terminator: Lambda.Terminator()
5661
)
5762
}
5863
}
@@ -208,6 +213,7 @@ public struct LambdaContext: CustomDebugStringConvertible {
208213

209214
// MARK: - ShutdownContext
210215

216+
/*
211217
extension Lambda {
212218
/// Lambda runtime shutdown context.
213219
/// The Lambda runtime generates and passes the `ShutdownContext` to the Lambda handler as an argument.
@@ -229,3 +235,4 @@ extension Lambda {
229235
}
230236
}
231237
}
238+
*/

Sources/AWSLambdaRuntimeCore/LambdaHandler.swift

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -182,14 +182,15 @@ public protocol ByteBufferLambdaHandler {
182182
///
183183
/// - Note: In case your Lambda fails while creating your LambdaHandler in the `HandlerFactory`, this method
184184
/// **is not invoked**. In this case you must cleanup the created resources immediately in the `HandlerFactory`.
185-
func shutdown(context: Lambda.ShutdownContext) -> EventLoopFuture<Void>
185+
//func shutdown(context: Lambda.ShutdownContext) -> EventLoopFuture<Void>
186186
}
187187

188+
/*
188189
extension ByteBufferLambdaHandler {
189190
public func shutdown(context: Lambda.ShutdownContext) -> EventLoopFuture<Void> {
190191
context.eventLoop.makeSucceededFuture(())
191192
}
192-
}
193+
}*/
193194

194195
extension ByteBufferLambdaHandler {
195196
/// Initializes and runs the lambda function.

Sources/AWSLambdaRuntimeCore/LambdaRunner.swift

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,16 @@ extension Lambda {
3434
/// Run the user provided initializer. This *must* only be called once.
3535
///
3636
/// - Returns: An `EventLoopFuture<LambdaHandler>` fulfilled with the outcome of the initialization.
37-
func initialize<Handler: ByteBufferLambdaHandler>(logger: Logger, handlerType: Handler.Type) -> EventLoopFuture<Handler> {
37+
func initialize<Handler: ByteBufferLambdaHandler>(logger: Logger, terminator: Terminator, handlerType: Handler.Type) -> EventLoopFuture<Handler> {
3838
logger.debug("initializing lambda")
3939
// 1. create the handler from the factory
40-
// 2. report initialization error if one occured
41-
let context = InitializationContext(logger: logger,
42-
eventLoop: self.eventLoop,
43-
allocator: self.allocator)
40+
// 2. report initialization error if one occurred
41+
let context = InitializationContext(
42+
logger: logger,
43+
eventLoop: self.eventLoop,
44+
allocator: self.allocator,
45+
terminator: terminator
46+
)
4447
return Handler.makeHandler(context: context)
4548
// Hopping back to "our" EventLoop is important in case the factory returns a future
4649
// that originated from a foreign EventLoop/EventLoopGroup.

Sources/AWSLambdaRuntimeCore/LambdaRuntime.swift

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,9 +74,10 @@ public final class LambdaRuntime<Handler: ByteBufferLambdaHandler> {
7474

7575
var logger = self.logger
7676
logger[metadataKey: "lifecycleId"] = .string(self.configuration.lifecycle.id)
77+
let terminator = Lambda.Terminator()
7778
let runner = Lambda.Runner(eventLoop: self.eventLoop, configuration: self.configuration)
7879

79-
let startupFuture = runner.initialize(logger: logger, handlerType: Handler.self)
80+
let startupFuture = runner.initialize(logger: logger, terminator: terminator, handlerType: Handler.self)
8081
startupFuture.flatMap { handler -> EventLoopFuture<(Handler, Result<Int, Error>)> in
8182
// after the startup future has succeeded, we have a handler that we can use
8283
// to `run` the lambda.
@@ -88,16 +89,29 @@ public final class LambdaRuntime<Handler: ByteBufferLambdaHandler> {
8889
.flatMap { handler, runnerResult -> EventLoopFuture<Int> in
8990
// after the lambda finishPromise has succeeded or failed we need to
9091
// shutdown the handler
91-
let shutdownContext = Lambda.ShutdownContext(logger: logger, eventLoop: self.eventLoop)
92-
return handler.shutdown(context: shutdownContext).flatMapErrorThrowing { error in
92+
//let shutdownContext = Lambda.ShutdownContext(logger: logger, eventLoop: self.eventLoop)
93+
/*return handler.shutdown(context: shutdownContext).flatMapErrorThrowing { error in
9394
// if, we had an error shuting down the lambda, we want to concatenate it with
9495
// the runner result
9596
logger.error("Error shutting down handler: \(error)")
9697
throw Lambda.RuntimeError.shutdownError(shutdownError: error, runnerResult: runnerResult)
9798
}.flatMapResult { _ -> Result<Int, Error> in
9899
// we had no error shutting down the lambda. let's return the runner's result
99100
runnerResult
101+
}*/
102+
103+
// after the lambda finishPromise has succeeded or failed we need to
104+
// shutdown the handler
105+
return terminator.terminate(eventLoop: self.eventLoop).flatMapErrorThrowing { error in
106+
// if, we had an error shutting down the handler, we want to concatenate it with
107+
// the runner result
108+
logger.error("Error shutting down handler: \(error)")
109+
throw Lambda.RuntimeError.shutdownError(shutdownError: error, runnerResult: runnerResult)
110+
}.flatMapResult { _ -> Result<Int, Error> in
111+
// we had no error shutting down the lambda. let's return the runner's result
112+
runnerResult
100113
}
114+
101115
}.always { _ in
102116
// triggered when the Lambda has finished its last run or has a startup failure.
103117
self.markShutdown()
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the SwiftAWSLambdaRuntime open source project
4+
//
5+
// Copyright (c) 2022 Apple Inc. and the SwiftAWSLambdaRuntime project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
16+
import NIOConcurrencyHelpers
17+
import NIOCore
18+
19+
extension Lambda {
20+
/// Lambda terminator.
21+
/// Helper utility to manage the lambda shutdown sequence
22+
#warning("API docs, impl, async")
23+
public final class Terminator {
24+
public typealias Handler = (EventLoop) -> EventLoopFuture<Void>
25+
public typealias RegistrationKey = String
26+
27+
private let lock: Lock
28+
private var handlers: [String: (name: String, handler: Handler)]
29+
30+
public init() {
31+
self.lock = .init()
32+
self.handlers = [:]
33+
}
34+
35+
#warning("API docs")
36+
@discardableResult
37+
public func register(name: String, handler: @escaping Handler) -> RegistrationKey {
38+
let key = LambdaRequestID().uuidString // UUID basically
39+
self.lock.withLock {
40+
self.handlers[key] = (name: name, handler: handler)
41+
}
42+
return key
43+
}
44+
45+
#warning("API docs")
46+
public func deregister(_ key: RegistrationKey) {
47+
self.lock.withLock {
48+
self.handlers[key] = nil
49+
}
50+
}
51+
52+
#warning("API docs")
53+
internal func terminate(eventLoop: EventLoop) -> EventLoopFuture<Void> {
54+
let handlers = self.lock.withLock {
55+
self.handlers
56+
}
57+
58+
#warning("FIXME: cascade in reverse order?")
59+
return EventLoopFuture.whenAllComplete(handlers.values.map{ $0.handler(eventLoop) }, on: eventLoop).flatMap{ results in
60+
let errors = results.compactMap { $0.error }
61+
if errors.isEmpty {
62+
return eventLoop.makeSucceededVoidFuture()
63+
} else {
64+
return eventLoop.makeFailedFuture(TerminationError(underlying: errors))
65+
}
66+
}
67+
}
68+
}
69+
70+
struct TerminationError: Error {
71+
let underlying: [Error]
72+
}
73+
}
74+
75+
private extension Result {
76+
var error: Error? {
77+
switch self {
78+
case .failure(let error):
79+
return error
80+
case .success:
81+
return .none
82+
}
83+
}
84+
}

Tests/AWSLambdaRuntimeCoreTests/LambdaRuntimeTest.swift

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -66,23 +66,34 @@ class LambdaRuntimeTest: XCTestCase {
6666
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
6767
defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) }
6868

69-
struct ShutdownError: Error {}
69+
struct ShutdownError: Error {
70+
let description: String
71+
}
7072

7173
struct ShutdownErrorHandler: EventLoopLambdaHandler {
7274
typealias Event = String
7375
typealias Output = Void
7476

7577
static func makeHandler(context: Lambda.InitializationContext) -> EventLoopFuture<ShutdownErrorHandler> {
76-
context.eventLoop.makeSucceededFuture(ShutdownErrorHandler())
78+
// register shutdown operation
79+
context.terminator.register(name: "test 1", handler: { eventLoop in
80+
return eventLoop.makeFailedFuture(ShutdownError(description: "1"))
81+
})
82+
context.terminator.register(name: "test 2", handler: { eventLoop in
83+
return eventLoop.makeSucceededVoidFuture()
84+
})
85+
context.terminator.register(name: "test 3", handler: { eventLoop in
86+
return eventLoop.makeFailedFuture(ShutdownError(description: "2"))
87+
})
88+
context.terminator.register(name: "test 3", handler: { eventLoop in
89+
return eventLoop.makeSucceededVoidFuture()
90+
})
91+
return context.eventLoop.makeSucceededFuture(ShutdownErrorHandler())
7792
}
7893

7994
func handle(_ event: String, context: LambdaContext) -> EventLoopFuture<Void> {
8095
context.eventLoop.makeSucceededVoidFuture()
8196
}
82-
83-
func shutdown(context: Lambda.ShutdownContext) -> EventLoopFuture<Void> {
84-
context.eventLoop.makeFailedFuture(ShutdownError())
85-
}
8697
}
8798

8899
let eventLoop = eventLoopGroup.next()
@@ -95,7 +106,7 @@ class LambdaRuntimeTest: XCTestCase {
95106
XCTFail("Unexpected error: \(error)"); return
96107
}
97108

98-
XCTAssert(shutdownError is ShutdownError)
109+
XCTAssertEqual(shutdownError as? Lambda.TerminationError, Lambda.TerminationError(underlying: [ShutdownError(description: "1"), ShutdownError(description: "2")]))
99110
XCTAssertEqual(runtimeError as? Lambda.RuntimeError, .badStatusCode(.internalServerError))
100111
}
101112
}

Tests/AWSLambdaRuntimeCoreTests/Utils.swift

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,11 @@ func runLambda<Handler: ByteBufferLambdaHandler>(behavior: LambdaServerBehavior,
2323
defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) }
2424
let logger = Logger(label: "TestLogger")
2525
let configuration = Lambda.Configuration(runtimeEngine: .init(requestTimeout: .milliseconds(100)))
26+
let terminator = Lambda.Terminator()
2627
let runner = Lambda.Runner(eventLoop: eventLoopGroup.next(), configuration: configuration)
2728
let server = try MockLambdaServer(behavior: behavior).start().wait()
2829
defer { XCTAssertNoThrow(try server.stop().wait()) }
29-
try runner.initialize(logger: logger, handlerType: handlerType).flatMap { handler in
30+
try runner.initialize(logger: logger, terminator: terminator, handlerType: handlerType).flatMap { handler in
3031
runner.run(logger: logger, handler: handler)
3132
}.wait()
3233
}
@@ -66,3 +67,13 @@ extension Lambda.RuntimeError: Equatable {
6667
String(describing: lhs) == String(describing: rhs)
6768
}
6869
}
70+
71+
extension Lambda.TerminationError: Equatable {
72+
public static func == (lhs: Lambda.TerminationError, rhs: Lambda.TerminationError) -> Bool {
73+
guard lhs.underlying.count == rhs.underlying.count else {
74+
return false
75+
}
76+
// technically incorrect, but good enough for our tests
77+
return String(describing: lhs) == String(describing: rhs)
78+
}
79+
}

Tests/AWSLambdaRuntimeTests/Lambda+CodableTest.swift

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,8 @@ class CodableLambdaTest: XCTestCase {
172172
Lambda.InitializationContext(
173173
logger: Logger(label: "test"),
174174
eventLoop: self.eventLoopGroup.next(),
175-
allocator: ByteBufferAllocator()
175+
allocator: ByteBufferAllocator(),
176+
terminator: Lambda.Terminator()
176177
)
177178
}
178179
}

0 commit comments

Comments
 (0)