Skip to content

Commit 490c546

Browse files
committed
Final design
1 parent 83f284f commit 490c546

File tree

6 files changed

+690
-78
lines changed

6 files changed

+690
-78
lines changed

Sources/AWSLambdaRuntimeCore/Lambda.swift

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -32,20 +32,12 @@ public enum Lambda {
3232
return String(cString: value)
3333
}
3434

35-
/// Run a Lambda defined by implementing the ``ByteBufferLambdaHandler`` protocol.
36-
/// The Runtime will manage the Lambdas application lifecycle automatically. It will invoke the
37-
/// ``ByteBufferLambdaHandler/makeHandler(context:)`` to create a new Handler.
38-
///
39-
/// - parameters:
40-
/// - configuration: A Lambda runtime configuration object
41-
/// - handlerType: The Handler to create and invoke.
42-
///
43-
/// - note: This is a blocking operation that will run forever, as its lifecycle is managed by the AWS Lambda Runtime Engine.
35+
// for testing and internal use
4436
internal static func run<Handler: ByteBufferLambdaHandler>(
4537
configuration: Configuration = .init(),
4638
handlerType: Handler.Type
4739
) -> Result<Int, Error> {
48-
let _run = { (configuration: Configuration) -> Result<Int, Error> in
40+
let _run = { (configuration: Configuration, handlerType: Handler.Type) -> Result<Int, Error> in
4941
Backtrace.install()
5042
var logger = Logger(label: "Lambda")
5143
logger.logLevel = configuration.general.logLevel
@@ -84,16 +76,17 @@ public enum Lambda {
8476
if Lambda.env("LOCAL_LAMBDA_SERVER_ENABLED").flatMap(Bool.init) ?? false {
8577
do {
8678
return try Lambda.withLocalServer {
87-
_run(configuration)
79+
_run(configuration, handlerType)
8880
}
8981
} catch {
9082
return .failure(error)
9183
}
9284
} else {
93-
return _run(configuration)
85+
return _run(configuration, handlerType)
9486
}
9587
#else
96-
return _run(configuration)
88+
return _run(configuration, factory)
9789
#endif
9890
}
91+
9992
}
Lines changed: 300 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,300 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the SwiftAWSLambdaRuntime open source project
4+
//
5+
// Copyright (c) 2022 Apple Inc. and the SwiftAWSLambdaRuntime project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import NIOCore
16+
17+
extension NewLambdaRuntime {
18+
struct Connection {
19+
var channel: Channel
20+
var handler: NewLambdaChannelHandler<NewLambdaRuntime>
21+
}
22+
23+
struct StateMachine {
24+
enum Action {
25+
case none
26+
case createHandler(andConnection: Bool)
27+
28+
case requestNextInvocation(NewLambdaChannelHandler<NewLambdaRuntime>, succeedStartPromise: EventLoopPromise<Void>?)
29+
30+
case reportInvocationResult(LambdaRequestID, Result<ByteBuffer?, Error>, pipelineNextInvocationRequest: Bool, NewLambdaChannelHandler<NewLambdaRuntime>)
31+
case reportStartupError(Error, NewLambdaChannelHandler<NewLambdaRuntime>)
32+
33+
case invokeHandler(Handler, Invocation, ByteBuffer)
34+
35+
case failRuntime(Error)
36+
}
37+
38+
private enum State {
39+
case initialized
40+
case starting(EventLoopPromise<Void>?)
41+
case connected(Connection, EventLoopPromise<Void>?)
42+
case handlerCreated(Handler, EventLoopPromise<Void>?)
43+
case handlerCreationFailed(Error, EventLoopPromise<Void>?)
44+
case reportingStartupError(Connection, Error, EventLoopPromise<Void>?)
45+
46+
case waitingForInvocation(Connection, Handler)
47+
case executingInvocation(Connection, Handler, LambdaRequestID)
48+
case reportingInvocationResult(Connection, Handler, nextInvocationRequestPipelined: Bool)
49+
50+
case failed(Error)
51+
}
52+
53+
private var markShutdown: Bool
54+
private var state: State
55+
56+
init() {
57+
self.markShutdown = false
58+
self.state = .initialized
59+
}
60+
61+
mutating func start(connection: Connection?, promise: EventLoopPromise<Void>?) -> Action {
62+
switch self.state {
63+
case .initialized:
64+
if let connection = connection {
65+
self.state = .connected(connection, promise)
66+
return .createHandler(andConnection: false)
67+
}
68+
69+
self.state = .starting(promise)
70+
return .createHandler(andConnection: true)
71+
72+
case .starting,
73+
.connected,
74+
.handlerCreated,
75+
.handlerCreationFailed,
76+
.reportingStartupError,
77+
.waitingForInvocation,
78+
.executingInvocation,
79+
.reportingInvocationResult,
80+
.failed:
81+
preconditionFailure("Invalid state: \(self.state)")
82+
}
83+
}
84+
85+
mutating func handlerCreated(_ handler: Handler) -> Action {
86+
switch self.state {
87+
case .initialized,
88+
.handlerCreated,
89+
.handlerCreationFailed,
90+
.waitingForInvocation,
91+
.executingInvocation,
92+
.reportingInvocationResult,
93+
.reportingStartupError:
94+
preconditionFailure("Invalid state: \(self.state)")
95+
96+
case .starting(let promise):
97+
self.state = .handlerCreated(handler, promise)
98+
return .none
99+
100+
case .connected(let connection, let promise):
101+
self.state = .waitingForInvocation(connection, handler)
102+
return .requestNextInvocation(connection.handler, succeedStartPromise: promise)
103+
104+
case .failed:
105+
return .none
106+
}
107+
}
108+
109+
mutating func handlerCreationFailed(_ error: Error) -> Action {
110+
switch self.state {
111+
case .initialized,
112+
.handlerCreated,
113+
.handlerCreationFailed,
114+
.waitingForInvocation,
115+
.executingInvocation,
116+
.reportingInvocationResult,
117+
.reportingStartupError:
118+
preconditionFailure("Invalid state: \(self.state)")
119+
120+
case .starting(let promise):
121+
self.state = .handlerCreationFailed(error, promise)
122+
return .none
123+
124+
case .connected(let connection, let promise):
125+
self.state = .reportingStartupError(connection, error, promise)
126+
return .reportStartupError(error, connection.handler)
127+
128+
case .failed:
129+
return .none
130+
}
131+
}
132+
133+
mutating func httpConnectionCreated(
134+
_ connection: Connection
135+
) -> Action {
136+
switch self.state {
137+
case .initialized,
138+
.connected,
139+
.waitingForInvocation,
140+
.executingInvocation,
141+
.reportingInvocationResult,
142+
.reportingStartupError:
143+
preconditionFailure("Invalid state: \(self.state)")
144+
145+
case .starting(let promise):
146+
self.state = .connected(connection, promise)
147+
return .none
148+
149+
case .handlerCreated(let handler, let promise):
150+
self.state = .waitingForInvocation(connection, handler)
151+
return .requestNextInvocation(connection.handler, succeedStartPromise: promise)
152+
153+
case .handlerCreationFailed(let error, let promise):
154+
self.state = .reportingStartupError(connection, error, promise)
155+
return .reportStartupError(error, connection.handler)
156+
157+
case .failed:
158+
return .none
159+
}
160+
}
161+
162+
mutating func httpChannelConnectFailed(_ error: Error) -> Action {
163+
switch self.state {
164+
case .initialized,
165+
.connected,
166+
.waitingForInvocation,
167+
.executingInvocation,
168+
.reportingInvocationResult,
169+
.reportingStartupError:
170+
preconditionFailure("Invalid state: \(self.state)")
171+
172+
case .starting:
173+
self.state = .failed(error)
174+
return .failRuntime(error)
175+
176+
case .handlerCreated(let handler, let promise):
177+
self.state = .failed(error)
178+
return .failRuntime(error)
179+
180+
case .handlerCreationFailed(let error, let promise):
181+
self.state = .failed(error)
182+
return .failRuntime(error)
183+
184+
case .failed:
185+
return .none
186+
}
187+
}
188+
189+
mutating func newInvocationReceived(_ invocation: Invocation, _ body: ByteBuffer) -> Action {
190+
switch self.state {
191+
case .initialized,
192+
.starting,
193+
.connected,
194+
.handlerCreated,
195+
.handlerCreationFailed,
196+
.executingInvocation,
197+
.reportingInvocationResult,
198+
.reportingStartupError:
199+
preconditionFailure("Invalid state: \(self.state)")
200+
201+
case .waitingForInvocation(let connection, let handler):
202+
self.state = .executingInvocation(connection, handler, .init(uuidString: invocation.requestID)!)
203+
return .invokeHandler(handler, invocation, body)
204+
205+
case .failed:
206+
return .none
207+
}
208+
}
209+
210+
mutating func acceptedReceived() -> Action {
211+
switch self.state {
212+
case .initialized,
213+
.starting,
214+
.connected,
215+
.handlerCreated,
216+
.handlerCreationFailed,
217+
.executingInvocation:
218+
preconditionFailure("Invalid state: \(self.state)")
219+
220+
case .waitingForInvocation:
221+
preconditionFailure("TODO: fixme")
222+
223+
case .reportingStartupError(_, let error, let promise):
224+
self.state = .failed(error)
225+
return .failRuntime(error)
226+
227+
case .reportingInvocationResult(let connection, let handler, true):
228+
self.state = .waitingForInvocation(connection, handler)
229+
return .none
230+
231+
case .reportingInvocationResult(let connection, let handler, false):
232+
self.state = .waitingForInvocation(connection, handler)
233+
return .requestNextInvocation(connection.handler, succeedStartPromise: nil)
234+
235+
case .failed:
236+
return .none
237+
}
238+
}
239+
240+
mutating func errorResponseReceived(_ errorResponse: ErrorResponse) -> Action {
241+
switch self.state {
242+
case .initialized,
243+
.starting,
244+
.connected,
245+
.handlerCreated,
246+
.handlerCreationFailed,
247+
.executingInvocation:
248+
preconditionFailure("Invalid state: \(self.state)")
249+
250+
case .waitingForInvocation:
251+
let error = LambdaRuntimeError.controlPlaneErrorResponse(errorResponse)
252+
self.state = .failed(error)
253+
return .failRuntime(error)
254+
255+
case .reportingStartupError(_, let error, let promise):
256+
let error = LambdaRuntimeError.controlPlaneErrorResponse(errorResponse)
257+
self.state = .failed(error)
258+
return .failRuntime(error)
259+
260+
case .reportingInvocationResult(let connection, let handler, _):
261+
let error = LambdaRuntimeError.controlPlaneErrorResponse(errorResponse)
262+
self.state = .failed(error)
263+
return .failRuntime(error)
264+
265+
case .failed:
266+
return .none
267+
}
268+
}
269+
270+
mutating func handlerError(_ error: Error) {
271+
272+
}
273+
274+
mutating func channelInactive() {
275+
276+
}
277+
278+
mutating func invocationFinished(_ result: Result<ByteBuffer?, Error>) -> Action {
279+
switch self.state {
280+
case .initialized,
281+
.starting,
282+
.handlerCreated,
283+
.handlerCreationFailed,
284+
.connected,
285+
.waitingForInvocation,
286+
.reportingStartupError,
287+
.reportingInvocationResult:
288+
preconditionFailure("Invalid state: \(self.state)")
289+
290+
case .failed:
291+
return .none
292+
293+
case .executingInvocation(let connection, let handler, let requestID):
294+
let pipelining = true
295+
self.state = .reportingInvocationResult(connection, handler, nextInvocationRequestPipelined: pipelining)
296+
return .reportInvocationResult(requestID, result, pipelineNextInvocationRequest: pipelining, connection.handler)
297+
}
298+
}
299+
}
300+
}

Sources/AWSLambdaRuntimeCore/NewLambdaChannelHandler.swift

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ final class NewLambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>: Cha
2929
typealias OutboundOut = ByteBuffer
3030

3131
private let delegate: Delegate
32-
private let requestsInFlight: CircularBuffer<ControlPlaneRequest>
32+
private var requestsInFlight: CircularBuffer<ControlPlaneRequest>
3333

3434
private var context: ChannelHandlerContext!
3535

@@ -45,6 +45,7 @@ final class NewLambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>: Cha
4545
}
4646

4747
func sendRequest(_ request: ControlPlaneRequest) {
48+
self.requestsInFlight.append(request)
4849
self.encoder.writeRequest(request, context: self.context, promise: nil)
4950
}
5051

@@ -61,12 +62,14 @@ final class NewLambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>: Cha
6162
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
6263
do {
6364
try self.decoder.process(buffer: self.unwrapInboundIn(data)) { response in
64-
// TODO: The response matches the request
65+
guard self.requestsInFlight.popFirst() != nil else {
66+
throw LambdaRuntimeError.unsolicitedResponse
67+
}
6568

6669
self.delegate.responseReceived(response)
6770
}
6871
} catch {
69-
72+
self.delegate.errorCaught(error)
7073
}
7174
}
7275

0 commit comments

Comments
 (0)