Skip to content

Commit 83f284f

Browse files
committed
New runtime
1 parent 99cfae8 commit 83f284f

File tree

6 files changed

+280
-11
lines changed

6 files changed

+280
-11
lines changed

Sources/AWSLambdaRuntimeCore/ControlPlaneRequest.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@ import NIOHTTP1
1717

1818
enum ControlPlaneRequest: Hashable {
1919
case next
20-
case invocationResponse(String, ByteBuffer?)
21-
case invocationError(String, ErrorResponse)
20+
case invocationResponse(LambdaRequestID, ByteBuffer?)
21+
case invocationError(LambdaRequestID, ErrorResponse)
2222
case initializationError(ErrorResponse)
2323
}
2424

Sources/AWSLambdaRuntimeCore/ControlPlaneRequestEncoder.swift

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -100,15 +100,15 @@ extension String {
100100
}
101101

102102
extension ByteBuffer {
103-
fileprivate mutating func writeInvocationResultRequestLine(_ requestID: String) {
103+
fileprivate mutating func writeInvocationResultRequestLine(_ requestID: LambdaRequestID) {
104104
self.writeString("POST /2018-06-01/runtime/invocation/")
105-
self.writeString(requestID)
105+
self.writeRequestID(requestID)
106106
self.writeString("/response HTTP/1.1\r\n")
107107
}
108108

109-
fileprivate mutating func writeInvocationErrorRequestLine(_ requestID: String) {
109+
fileprivate mutating func writeInvocationErrorRequestLine(_ requestID: LambdaRequestID) {
110110
self.writeString("POST /2018-06-01/runtime/invocation/")
111-
self.writeString(requestID)
111+
self.writeRequestID(requestID)
112112
self.writeString("/error HTTP/1.1\r\n")
113113
}
114114

Sources/AWSLambdaRuntimeCore/LambdaRequestID.swift

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -340,12 +340,13 @@ extension ByteBuffer {
340340

341341
@discardableResult
342342
mutating func setRequestID(_ requestID: LambdaRequestID, at index: Int) -> Int {
343-
var localBytes = requestID.toAsciiBytesOnStack(characters: LambdaRequestID.lowercaseLookup)
343+
var localBytes = requestID.toAsciiBytesOnStack(characters: LambdaRequestID.uppercaseLookup)
344344
return withUnsafeBytes(of: &localBytes) {
345345
self.setBytes($0, at: index)
346346
}
347347
}
348348

349+
@discardableResult
349350
mutating func writeRequestID(_ requestID: LambdaRequestID) -> Int {
350351
let length = self.setRequestID(requestID, at: self.writerIndex)
351352
self.moveWriterIndex(forwardBy: length)
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the SwiftAWSLambdaRuntime open source project
4+
//
5+
// Copyright (c) 2022 Apple Inc. and the SwiftAWSLambdaRuntime project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import NIOCore
16+
17+
protocol LambdaChannelHandlerDelegate {
18+
19+
func responseReceived(_: ControlPlaneResponse)
20+
21+
func errorCaught(_: Error)
22+
23+
func channelInactive()
24+
25+
}
26+
27+
final class NewLambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>: ChannelInboundHandler {
28+
typealias InboundIn = ByteBuffer
29+
typealias OutboundOut = ByteBuffer
30+
31+
private let delegate: Delegate
32+
private let requestsInFlight: CircularBuffer<ControlPlaneRequest>
33+
34+
private var context: ChannelHandlerContext!
35+
36+
private var encoder: ControlPlaneRequestEncoder
37+
private var decoder: NIOSingleStepByteToMessageProcessor<ControlPlaneResponseDecoder>
38+
39+
init(delegate: Delegate, host: String) {
40+
self.delegate = delegate
41+
self.requestsInFlight = CircularBuffer<ControlPlaneRequest>(initialCapacity: 4)
42+
43+
self.encoder = ControlPlaneRequestEncoder(host: host)
44+
self.decoder = NIOSingleStepByteToMessageProcessor(ControlPlaneResponseDecoder(), maximumBufferSize: 7 * 1024 * 1024)
45+
}
46+
47+
func sendRequest(_ request: ControlPlaneRequest) {
48+
self.encoder.writeRequest(request, context: self.context, promise: nil)
49+
}
50+
51+
func handlerAdded(context: ChannelHandlerContext) {
52+
self.context = context
53+
self.encoder.writerAdded(context: context)
54+
}
55+
56+
func handlerRemoved(context: ChannelHandlerContext) {
57+
self.context = context
58+
self.encoder.writerRemoved(context: context)
59+
}
60+
61+
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
62+
do {
63+
try self.decoder.process(buffer: self.unwrapInboundIn(data)) { response in
64+
// TODO: The response matches the request
65+
66+
self.delegate.responseReceived(response)
67+
}
68+
} catch {
69+
70+
}
71+
}
72+
73+
func channelInactive(context: ChannelHandlerContext) {
74+
self.delegate.channelInactive()
75+
}
76+
77+
func errorCaught(context: ChannelHandlerContext, error: Error) {
78+
self.delegate.errorCaught(error)
79+
}
80+
}
Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the SwiftAWSLambdaRuntime open source project
4+
//
5+
// Copyright (c) 2022 Apple Inc. and the SwiftAWSLambdaRuntime project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import Logging
16+
import NIOConcurrencyHelpers
17+
import NIOCore
18+
import NIOPosix
19+
20+
/// `LambdaRuntime` manages the Lambda process lifecycle.
21+
///
22+
/// - note: It is intended to be used within a single `EventLoop`. For this reason this class is not thread safe.
23+
public final class NewLambdaRuntime<Handler: ByteBufferLambdaHandler> {
24+
private let eventLoop: EventLoop
25+
private let shutdownPromise: EventLoopPromise<Void>
26+
private let logger: Logger
27+
private let configuration: Lambda.Configuration
28+
private let factory: (Lambda.InitializationContext) -> EventLoopFuture<Handler>
29+
30+
private var state: StateMachine
31+
32+
init(eventLoop: EventLoop,
33+
logger: Logger,
34+
configuration: Lambda.Configuration,
35+
factory: @escaping (Lambda.InitializationContext) -> EventLoopFuture<Handler>
36+
) {
37+
self.state = StateMachine()
38+
self.eventLoop = eventLoop
39+
self.shutdownPromise = eventLoop.makePromise(of: Void.self)
40+
self.logger = logger
41+
self.configuration = configuration
42+
self.factory = factory
43+
}
44+
45+
deinit {
46+
// TODO: Verify is shutdown
47+
}
48+
49+
/// The `Lifecycle` shutdown future.
50+
///
51+
/// - Returns: An `EventLoopFuture` that is fulfilled after the Lambda lifecycle has fully shutdown.
52+
public var shutdownFuture: EventLoopFuture<Void> {
53+
self.shutdownPromise.futureResult
54+
}
55+
56+
/// Start the `LambdaRuntime`.
57+
///
58+
/// - Returns: An `EventLoopFuture` that is fulfilled after the Lambda hander has been created and initiliazed, and a first run has been scheduled.
59+
public func start(promise: EventLoopPromise<Void>?) {
60+
if self.eventLoop.inEventLoop {
61+
self.start0(promise: promise)
62+
} else {
63+
self.eventLoop.execute {
64+
self.start0(promise: promise)
65+
}
66+
}
67+
}
68+
69+
/// Begin the `LambdaRuntime` shutdown. Only needed for debugging purposes, hence behind a `DEBUG` flag.
70+
public func shutdown(promise: EventLoopPromise<Void>?) {
71+
if self.eventLoop.inEventLoop {
72+
self.shutdown0(promise: promise)
73+
} else {
74+
self.eventLoop.execute {
75+
self.shutdown0(promise: promise)
76+
}
77+
}
78+
}
79+
80+
// MARK: - Private
81+
82+
private func start0(promise: EventLoopPromise<Void>?) {
83+
self.eventLoop.assertInEventLoop()
84+
85+
// when starting we want to do thing in parallel:
86+
// 1. start the connection to the control plane
87+
// 2. create the lambda handler
88+
89+
self.logger.debug("initializing lambda")
90+
// 1. create the handler from the factory
91+
// 2. report initialization error if one occured
92+
let context = Lambda.InitializationContext(
93+
logger: self.logger,
94+
eventLoop: self.eventLoop,
95+
allocator: ByteBufferAllocator()
96+
)
97+
98+
self.factory(context).hop(to: self.eventLoop).whenComplete { result in
99+
let action: StateMachine.Action
100+
switch result {
101+
case .success(let handler):
102+
action = self.state.handlerCreated(handler)
103+
case .failure(let error):
104+
action = self.state.handlerCreationFailed(error)
105+
}
106+
self.run(action)
107+
}
108+
109+
let connectFuture = ClientBootstrap(group: self.eventLoop).connect(
110+
host: self.configuration.runtimeEngine.ip,
111+
port: self.configuration.runtimeEngine.port
112+
)
113+
114+
connectFuture.whenComplete { result in
115+
let action: StateMachine.Action
116+
switch result {
117+
case .success(let channel):
118+
action = self.state.httpChannelConnected(channel)
119+
case .failure(let error):
120+
action = self.state.httpChannelConnectFailed(error)
121+
}
122+
self.run(action)
123+
}
124+
}
125+
126+
private func shutdown0(promise: EventLoopPromise<Void>?) {
127+
128+
}
129+
130+
private func run(_ action: StateMachine.Action) {
131+
132+
}
133+
}
134+
135+
extension LambdaRuntime: LambdaChannelHandlerDelegate {
136+
func responseReceived(_ response: ControlPlaneResponse) {
137+
138+
}
139+
140+
func errorCaught(_: Error) {
141+
142+
}
143+
144+
func channelInactive() {
145+
146+
}
147+
}
148+
149+
extension NewLambdaRuntime {
150+
151+
struct StateMachine {
152+
enum Action {
153+
case none
154+
}
155+
156+
private enum State {
157+
case initialized
158+
case starting
159+
case channelConnected(Channel, NewLambdaChannelHandler<LambdaRuntime>)
160+
case handlerCreated(Handler)
161+
case running(Channel, NewLambdaChannelHandler<LambdaRuntime>, Handler)
162+
}
163+
164+
private var markShutdown: Bool
165+
private var state: State
166+
167+
init() {
168+
self.markShutdown = false
169+
self.state = .initialized
170+
}
171+
172+
func handlerCreated(_ handler: Handler) -> Action {
173+
return .none
174+
}
175+
176+
func handlerCreationFailed(_ error: Error) -> Action {
177+
return .none
178+
}
179+
180+
func httpChannelConnected(_ channel: Channel) -> Action {
181+
return .none
182+
}
183+
184+
func httpChannelConnectFailed(_ error: Error) -> Action {
185+
return .none
186+
}
187+
}
188+
}

Tests/AWSLambdaRuntimeCoreTests/ControlPlaneRequestEncoderTests.swift

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ final class ControlPlaneRequestEncoderTests: XCTestCase {
5454
}
5555

5656
func testPostInvocationSuccessWithoutBody() {
57-
let requestID = UUID().uuidString
57+
let requestID = LambdaRequestID()
5858
var request: NIOHTTPServerRequestFull?
5959
XCTAssertNoThrow(request = try self.sendRequest(.invocationResponse(requestID, nil)))
6060

@@ -70,7 +70,7 @@ final class ControlPlaneRequestEncoderTests: XCTestCase {
7070
}
7171

7272
func testPostInvocationSuccessWithBody() {
73-
let requestID = UUID().uuidString
73+
let requestID = LambdaRequestID()
7474
let payload = ByteBuffer(string: "hello swift lambda!")
7575

7676
var request: NIOHTTPServerRequestFull?
@@ -89,7 +89,7 @@ final class ControlPlaneRequestEncoderTests: XCTestCase {
8989
}
9090

9191
func testPostInvocationErrorWithBody() {
92-
let requestID = UUID().uuidString
92+
let requestID = LambdaRequestID()
9393
let error = ErrorResponse(errorType: "SomeError", errorMessage: "An error happened")
9494
var request: NIOHTTPServerRequestFull?
9595
XCTAssertNoThrow(request = try self.sendRequest(.invocationError(requestID, error)))
@@ -137,7 +137,7 @@ final class ControlPlaneRequestEncoderTests: XCTestCase {
137137
XCTAssertEqual(nextRequest?.head.method, .GET)
138138
XCTAssertEqual(nextRequest?.head.uri, "/2018-06-01/runtime/invocation/next")
139139

140-
let requestID = UUID().uuidString
140+
let requestID = LambdaRequestID()
141141
let payload = ByteBuffer(string: "hello swift lambda!")
142142
var successRequest: NIOHTTPServerRequestFull?
143143
XCTAssertNoThrow(successRequest = try self.sendRequest(.invocationResponse(requestID, payload)))

0 commit comments

Comments
 (0)