Skip to content

Add new LambdaRuntime #353

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Sep 5, 2024
Merged
6 changes: 1 addition & 5 deletions Sources/AWSLambdaRuntimeCore/NewLambda.swift
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,5 @@ extension Lambda {
}

/// The default EventLoop the Lambda is scheduled on.
package static var defaultEventLoop: any EventLoop {
get {
NIOSingletons.posixEventLoopGroup.next()
}
}
package static var defaultEventLoop: any EventLoop = NIOSingletons.posixEventLoopGroup.next()
}
32 changes: 31 additions & 1 deletion Sources/AWSLambdaRuntimeCore/NewLambdaRuntime.swift
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import NIOCore
import Synchronization

package final class NewLambdaRuntime<Handler>: Sendable where Handler: StreamingLambdaHandler {
let handlerMutex: Mutex<Handler>
let handlerMutex: Mutex<Handler?>
let logger: Logger
let eventLoop: EventLoop

Expand All @@ -34,5 +34,35 @@ package final class NewLambdaRuntime<Handler>: Sendable where Handler: Streaming
}

package func run() async throws {
guard let runtimeEndpoint = Lambda.env("AWS_LAMBDA_RUNTIME_API") else {
throw NewLambdaRuntimeError(code: .cannotStartLambdaRuntime)
}

let ipAndPort = runtimeEndpoint.split(separator: ":", maxSplits: 1)
let ip = String(ipAndPort[0])
let port = Int(ipAndPort[1])!
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should throw an error here, if the second part is not an Int. .invalidPort.


let handler = self.handlerMutex.withLock { maybeHandler in
defer {
maybeHandler = nil
}
return maybeHandler
}

guard let handler else {
throw NewLambdaRuntimeError(code: .runtimeCanOnlyBeStartedOnce)
}

try await NewLambdaRuntimeClient.withRuntimeClient(
configuration: .init(ip: ip, port: port),
eventLoop: self.eventLoop,
logger: self.logger
) { runtimeClient in
try await Lambda.runLoop(
runtimeClient: runtimeClient,
handler: handler,
logger: self.logger
)
}
}
}
54 changes: 29 additions & 25 deletions Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ final actor NewLambdaRuntimeClient: LambdaRuntimeClientProtocol {
NIOHTTPClientResponseAggregator(maxContentLength: 6 * 1024 * 1024)
)
try channel.pipeline.syncOperations.addHandler(
LambdaChannelHandler(delegate: self, logger: self.logger)
LambdaChannelHandler(delegate: self, logger: self.logger, configuration: self.configuration)
)
return channel.eventLoop.makeSucceededFuture(())
} catch {
Expand Down Expand Up @@ -433,10 +433,32 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>
private var reusableErrorBuffer: ByteBuffer?
private let logger: Logger
private let delegate: Delegate
private let configuration: NewLambdaRuntimeClient.Configuration

init(delegate: Delegate, logger: Logger) {
let defaultHeaders: HTTPHeaders
/// These headers must be sent along an invocation or initialization error report
let errorHeaders: HTTPHeaders
/// These headers must be sent along an invocation or initialization error report
Copy link
Member

@fabianfett fabianfett Sep 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

code comment is not true.

let streamingHeaders: HTTPHeaders

init(delegate: Delegate, logger: Logger, configuration: NewLambdaRuntimeClient.Configuration) {
self.delegate = delegate
self.logger = logger
self.configuration = configuration
self.defaultHeaders = [
"host": "\(self.configuration.ip):\(self.configuration.port)",
"user-agent": "Swift-Lambda/Unknown",
]
self.errorHeaders = [
"host": "\(self.configuration.ip):\(self.configuration.port)",
"user-agent": "Swift-Lambda/Unknown",
"lambda-runtime-function-error-type": "Unhandled",
]
self.streamingHeaders = [
"host": "\(self.configuration.ip):\(self.configuration.port)",
"user-agent": "Swift-Lambda/Unknown",
"transfer-encoding": "streaming",
]
}

func nextInvocation(isolation: isolated (any Actor)? = #isolation) async throws -> Invocation {
Expand Down Expand Up @@ -578,7 +600,7 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>
version: .http1_1,
method: .POST,
uri: url,
headers: NewLambdaRuntimeClient.streamingHeaders
headers: self.streamingHeaders
)

context.write(self.wrapOutboundOut(.head(httpRequest)), promise: nil)
Expand All @@ -604,11 +626,12 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>
let headers: HTTPHeaders =
if byteBuffer?.readableBytes ?? 0 < 6_000_000 {
[
"host": "\(self.configuration.ip):\(self.configuration.port)",
"user-agent": "Swift-Lambda/Unknown",
"content-length": "\(byteBuffer?.readableBytes ?? 0)",
]
} else {
NewLambdaRuntimeClient.streamingHeaders
self.streamingHeaders
}

let httpRequest = HTTPRequestHead(
Expand All @@ -634,7 +657,7 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>
version: .http1_1,
method: .GET,
uri: self.nextInvocationPath,
headers: NewLambdaRuntimeClient.defaultHeaders
headers: self.defaultHeaders
)

context.write(self.wrapOutboundOut(.head(httpRequest)), promise: nil)
Expand All @@ -650,7 +673,7 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>
version: .http1_1,
method: .POST,
uri: url,
headers: NewLambdaRuntimeClient.errorHeaders
headers: self.errorHeaders
)

if self.reusableErrorBuffer == nil {
Expand Down Expand Up @@ -797,22 +820,3 @@ extension LambdaChannelHandler: ChannelInboundHandler {
context.fireChannelInactive()
}
}

extension NewLambdaRuntimeClient {
static let defaultHeaders: HTTPHeaders = [
"user-agent": "Swift-Lambda/Unknown"
]

/// These headers must be sent along an invocation or initialization error report
static let errorHeaders: HTTPHeaders = [
"user-agent": "Swift-Lambda/Unknown",
"lambda-runtime-function-error-type": "Unhandled",
]

/// These headers must be sent along an invocation or initialization error report
static let streamingHeaders: HTTPHeaders = [
"user-agent": "Swift-Lambda/Unknown",
"transfer-encoding": "streaming",
]

}
2 changes: 2 additions & 0 deletions Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeError.swift
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ package struct NewLambdaRuntimeError: Error {
case nextInvocationMissingHeaderDeadline
case nextInvocationMissingHeaderInvokeFuctionARN

case cannotStartLambdaRuntime
case runtimeCanOnlyBeStartedOnce
}

package init(code: Code, underlying: (any Error)? = nil) {
Expand Down
Loading