Skip to content

LambdaRuntimeClient works with Invocation #19

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion Sources/MockServer/main.swift
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,12 @@ internal final class HTTPHandler: ChannelInboundHandler {
case .json:
responseBody = "{ \"body\": \"\(requestId)\" }"
}
responseHeaders = [(AmazonHeaders.requestID, requestId)]
responseHeaders = [
(AmazonHeaders.requestID, requestId),
(AmazonHeaders.invokedFunctionARN, "arn:aws:lambda:us-east-1:123456789012:function:custom-runtime"),
(AmazonHeaders.traceID, "Root=1-5bef4de7-ad49b0e87f6ef6c87fc2e700;Parent=9a9197af755a6419;Sampled=1"),
(AmazonHeaders.deadline, String(Date(timeIntervalSinceNow: 60).timeIntervalSince1970 * 1000)),
]
} else if request.head.uri.hasSuffix("/response") {
responseStatus = .accepted
} else {
Expand Down
8 changes: 4 additions & 4 deletions Sources/SwiftAwsLambda/Lambda+Codable.swift
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,15 @@ public typealias LambdaCodableCallback<Out> = (LambdaCodableResult<Out>) -> Void

/// A processing closure for a Lambda that takes an `In` and returns an `Out` via `LambdaCodableCallback<Out>` asynchronously,
/// having `In` and `Out` extending `Decodable` and `Encodable` respectively.
public typealias LambdaCodableClosure<In, Out> = (LambdaContext, In, LambdaCodableCallback<Out>) -> Void
public typealias LambdaCodableClosure<In, Out> = (Lambda.Context, In, LambdaCodableCallback<Out>) -> Void

/// A processing protocol for a Lambda that takes an `In` and returns an `Out` via `LambdaCodableCallback<Out>` asynchronously,
/// having `In` and `Out` extending `Decodable` and `Encodable` respectively.
public protocol LambdaCodableHandler: LambdaHandler {
associatedtype In: Decodable
associatedtype Out: Encodable

func handle(context: LambdaContext, payload: In, callback: @escaping LambdaCodableCallback<Out>)
func handle(context: Lambda.Context, payload: In, callback: @escaping LambdaCodableCallback<Out>)
var codec: LambdaCodableCodec<In, Out> { get }
}

Expand All @@ -79,7 +79,7 @@ public class LambdaCodableCodec<In: Decodable, Out: Encodable> {

/// Default implementation of `Encodable` -> `[UInt8]` encoding and `[UInt8]` -> `Decodable' decoding
public extension LambdaCodableHandler {
func handle(context: LambdaContext, payload: [UInt8], callback: @escaping (LambdaResult) -> Void) {
func handle(context: Lambda.Context, payload: [UInt8], callback: @escaping (LambdaResult) -> Void) {
switch self.codec.decode(payload) {
case .failure(let error):
return callback(.failure(Errors.requestDecoding(error)))
Expand Down Expand Up @@ -133,7 +133,7 @@ private struct LambdaClosureWrapper<In: Decodable, Out: Encodable>: LambdaCodabl
self.closure = closure
}

public func handle(context: LambdaContext, payload: In, callback: @escaping LambdaCodableCallback<Out>) {
public func handle(context: Lambda.Context, payload: In, callback: @escaping LambdaCodableCallback<Out>) {
self.closure(context, payload, callback)
}
}
Expand Down
8 changes: 4 additions & 4 deletions Sources/SwiftAwsLambda/Lambda+String.swift
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,16 @@ public typealias LambdaStringResult = Result<String, Error>
public typealias LambdaStringCallback = (LambdaStringResult) -> Void

/// A processing closure for a Lambda that takes a `String` and returns a `LambdaStringResult` via `LambdaStringCallback` asynchronously.
public typealias LambdaStringClosure = (LambdaContext, String, LambdaStringCallback) -> Void
public typealias LambdaStringClosure = (Lambda.Context, String, LambdaStringCallback) -> Void

/// A processing protocol for a Lambda that takes a `String` and returns a `LambdaStringResult` via `LambdaStringCallback` asynchronously.
public protocol LambdaStringHandler: LambdaHandler {
func handle(context: LambdaContext, payload: String, callback: @escaping LambdaStringCallback)
func handle(context: Lambda.Context, payload: String, callback: @escaping LambdaStringCallback)
}

/// Default implementation of `String` -> `[UInt8]` encoding and `[UInt8]` -> `String' decoding
public extension LambdaStringHandler {
func handle(context: LambdaContext, payload: [UInt8], callback: @escaping LambdaCallback) {
func handle(context: Lambda.Context, payload: [UInt8], callback: @escaping LambdaCallback) {
self.handle(context: context, payload: String(decoding: payload, as: UTF8.self)) { result in
switch result {
case .success(let string):
Expand All @@ -73,7 +73,7 @@ private struct LambdaClosureWrapper: LambdaStringHandler {
self.closure = closure
}

func handle(context: LambdaContext, payload: String, callback: @escaping LambdaStringCallback) {
func handle(context: Lambda.Context, payload: String, callback: @escaping LambdaStringCallback) {
self.closure(context, payload, callback)
}
}
72 changes: 35 additions & 37 deletions Sources/SwiftAwsLambda/Lambda.swift
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,38 @@ public enum Lambda {
}
}

public class Context {
// from aws
public let requestId: String
public let traceId: String
public let invokedFunctionArn: String
public let deadline: String
public let cognitoIdentity: String?
public let clientContext: String?
// utility
public let logger: Logger

internal init(requestId: String,
traceId: String,
invokedFunctionArn: String,
deadline: String,
cognitoIdentity: String? = nil,
clientContext: String? = nil,
logger: Logger) {
self.requestId = requestId
self.traceId = traceId
self.invokedFunctionArn = invokedFunctionArn
self.cognitoIdentity = cognitoIdentity
self.clientContext = clientContext
self.deadline = deadline
// mutate logger with context
var logger = logger
logger[metadataKey: "awsRequestId"] = .string(requestId)
logger[metadataKey: "awsTraceId"] = .string(traceId)
self.logger = logger
}
}

private final class Lifecycle {
private let eventLoop: EventLoop
private let logger: Logger
Expand Down Expand Up @@ -258,7 +290,7 @@ public typealias LambdaResult = Result<[UInt8], Error>
public typealias LambdaCallback = (LambdaResult) -> Void

/// A processing closure for a Lambda that takes a `[UInt8]` and returns a `LambdaResult` result type asynchronously.
public typealias LambdaClosure = (LambdaContext, [UInt8], LambdaCallback) -> Void
public typealias LambdaClosure = (Lambda.Context, [UInt8], LambdaCallback) -> Void

/// A result type for a Lambda initialization.
public typealias LambdaInitResult = Result<Void, Error>
Expand All @@ -270,7 +302,7 @@ public typealias LambdaInitCallBack = (LambdaInitResult) -> Void
public protocol LambdaHandler {
/// Initializes the `LambdaHandler`.
func initialize(callback: @escaping LambdaInitCallBack)
func handle(context: LambdaContext, payload: [UInt8], callback: @escaping LambdaCallback)
func handle(context: Lambda.Context, payload: [UInt8], callback: @escaping LambdaCallback)
}

extension LambdaHandler {
Expand All @@ -280,40 +312,6 @@ extension LambdaHandler {
}
}

public struct LambdaContext {
// from aws
public let requestId: String
public let traceId: String?
public let invokedFunctionArn: String?
public let cognitoIdentity: String?
public let clientContext: String?
public let deadline: String?
// utliity
public let logger: Logger

public init(requestId: String,
traceId: String? = nil,
invokedFunctionArn: String? = nil,
cognitoIdentity: String? = nil,
clientContext: String? = nil,
deadline: String? = nil,
logger: Logger) {
self.requestId = requestId
self.traceId = traceId
self.invokedFunctionArn = invokedFunctionArn
self.cognitoIdentity = cognitoIdentity
self.clientContext = clientContext
self.deadline = deadline
// mutate logger with context
var logger = logger
logger[metadataKey: "awsRequestId"] = .string(requestId)
if let traceId = traceId {
logger[metadataKey: "awsTraceId"] = .string(traceId)
}
self.logger = logger
}
}

@usableFromInline
internal typealias LambdaLifecycleResult = Result<Int, Error>

Expand All @@ -323,7 +321,7 @@ private struct LambdaClosureWrapper: LambdaHandler {
self.closure = closure
}

func handle(context: LambdaContext, payload: [UInt8], callback: @escaping LambdaCallback) {
func handle(context: Lambda.Context, payload: [UInt8], callback: @escaping LambdaCallback) {
self.closure(context, payload, callback)
}
}
41 changes: 36 additions & 5 deletions Sources/SwiftAwsLambda/LambdaRunner.swift
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,36 @@ internal struct LambdaRunner {
// 1. request work from lambda runtime engine
return self.runtimeClient.requestWork(logger: logger).peekError { error in
logger.error("could not fetch work from lambda runtime engine: \(error)")
}.flatMap { context, payload in
}.flatMap { invocation, payload in
// 2. send work to handler
let context = Lambda.Context(logger: logger, eventLoop: self.eventLoop, invocation: invocation)
logger.debug("sending work to lambda handler \(self.lambdaHandler)")

// TODO: This is just for now, so that we can work with ByteBuffers only
// in the LambdaRuntimeClient
let bytes = [UInt8](payload.readableBytesView)
return self.lambdaHandler.handle(eventLoop: self.eventLoop,
lifecycleId: self.lifecycleId,
offload: self.offload,
context: context,
payload: payload).map { (context, $0) }
}.flatMap { context, result in
payload: bytes)
.map {
// TODO: This mapping shall be removed as soon as the LambdaHandler protocol
// works with ByteBuffer? instead of [UInt8]
let mappedResult: Result<ByteBuffer, Error>
switch $0 {
case .success(let bytes):
var buffer = ByteBufferAllocator().buffer(capacity: bytes.count)
buffer.writeBytes(bytes)
mappedResult = .success(buffer)
case .failure(let error):
mappedResult = .failure(error)
}
return (invocation, mappedResult)
}
}.flatMap { invocation, result in
// 3. report results to runtime engine
self.runtimeClient.reportResults(logger: logger, context: context, result: result).peekError { error in
self.runtimeClient.reportResults(logger: logger, invocation: invocation, result: result).peekError { error in
logger.error("failed reporting results to lambda runtime engine: \(error)")
}
}.always { result in
Expand All @@ -88,7 +107,7 @@ private extension LambdaHandler {
return promise.futureResult
}

func handle(eventLoop: EventLoop, lifecycleId: String, offload: Bool, context: LambdaContext, payload: [UInt8]) -> EventLoopFuture<LambdaResult> {
func handle(eventLoop: EventLoop, lifecycleId: String, offload: Bool, context: Lambda.Context, payload: [UInt8]) -> EventLoopFuture<LambdaResult> {
// offloading so user code never blocks the eventloop
let promise = eventLoop.makePromise(of: LambdaResult.self)
if offload {
Expand All @@ -106,6 +125,18 @@ private extension LambdaHandler {
}
}

private extension Lambda.Context {
convenience init(logger: Logger, eventLoop: EventLoop, invocation: Invocation) {
self.init(requestId: invocation.requestId,
traceId: invocation.traceId,
invokedFunctionArn: invocation.invokedFunctionArn,
deadline: invocation.deadlineDate,
cognitoIdentity: invocation.cognitoIdentity,
clientContext: invocation.clientContext,
logger: logger)
}
}

// TODO: move to nio?
private extension EventLoopFuture {
// callback does not have side effects, failing with original result
Expand Down
68 changes: 38 additions & 30 deletions Sources/SwiftAwsLambda/LambdaRuntimeClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,18 @@ internal struct LambdaRuntimeClient {
}

/// Requests work from the Runtime Engine.
func requestWork(logger: Logger) -> EventLoopFuture<(LambdaContext, [UInt8])> {
func requestWork(logger: Logger) -> EventLoopFuture<(Invocation, ByteBuffer)> {
let url = Consts.invocationURLPrefix + Consts.requestWorkURLSuffix
logger.debug("requesting work from lambda runtime engine using \(url)")
return self.httpClient.get(url: url).flatMapThrowing { response in
guard response.status == .ok else {
throw LambdaRuntimeClientError.badStatusCode(response.status)
}
guard let payload = response.readWholeBody() else {
let invocation = try Invocation(headers: response.headers)
guard let payload = response.body else {
throw LambdaRuntimeClientError.noBody
}
guard let context = LambdaContext(logger: logger, response: response) else {
throw LambdaRuntimeClientError.noContext
}
return (context, payload)
return (invocation, payload)
}.flatMapErrorThrowing { error in
switch error {
case HTTPClient.Errors.timeout:
Expand All @@ -60,14 +58,13 @@ internal struct LambdaRuntimeClient {
}

/// Reports a result to the Runtime Engine.
func reportResults(logger: Logger, context: LambdaContext, result: LambdaResult) -> EventLoopFuture<Void> {
var url = Consts.invocationURLPrefix + "/" + context.requestId
func reportResults(logger: Logger, invocation: Invocation, result: Result<ByteBuffer, Error>) -> EventLoopFuture<Void> {
var url = Consts.invocationURLPrefix + "/" + invocation.requestId
var body: ByteBuffer
switch result {
case .success(let data):
case .success(let buffer):
url += Consts.postResponseURLSuffix
body = self.allocator.buffer(capacity: data.count)
body.writeBytes(data)
body = buffer
case .failure(let error):
url += Consts.postErrorURLSuffix
// TODO: make FunctionError a const
Expand Down Expand Up @@ -132,8 +129,8 @@ internal struct LambdaRuntimeClient {
internal enum LambdaRuntimeClientError: Error, Equatable {
case badStatusCode(HTTPResponseStatus)
case upstreamError(String)
case invocationMissingHeader(String)
case noBody
case noContext
case json(JsonCodecError)
}

Expand Down Expand Up @@ -182,25 +179,36 @@ private extension HTTPClient.Response {
}
}

private extension LambdaContext {
init?(logger: Logger, response: HTTPClient.Response) {
guard let requestId = response.headerValue(AmazonHeaders.requestID) else {
return nil
internal struct Invocation {
let requestId: String
let deadlineDate: String
let invokedFunctionArn: String
let traceId: String
let clientContext: String?
let cognitoIdentity: String?

init(headers: HTTPHeaders) throws {
guard let requestId = headers.first(name: AmazonHeaders.requestID), !requestId.isEmpty else {
throw LambdaRuntimeClientError.invocationMissingHeader(AmazonHeaders.requestID)
}
if requestId.isEmpty {
return nil

guard let unixTimeMilliseconds = headers.first(name: AmazonHeaders.deadline) else {
throw LambdaRuntimeClientError.invocationMissingHeader(AmazonHeaders.deadline)
}
let traceId = response.headerValue(AmazonHeaders.traceID)
let invokedFunctionArn = response.headerValue(AmazonHeaders.invokedFunctionARN)
let cognitoIdentity = response.headerValue(AmazonHeaders.cognitoIdentity)
let clientContext = response.headerValue(AmazonHeaders.clientContext)
let deadline = response.headerValue(AmazonHeaders.deadline)
self = LambdaContext(requestId: requestId,
traceId: traceId,
invokedFunctionArn: invokedFunctionArn,
cognitoIdentity: cognitoIdentity,
clientContext: clientContext,
deadline: deadline,
logger: logger)

guard let invokedFunctionArn = headers.first(name: AmazonHeaders.invokedFunctionARN) else {
throw LambdaRuntimeClientError.invocationMissingHeader(AmazonHeaders.invokedFunctionARN)
}

guard let traceId = headers.first(name: AmazonHeaders.traceID) else {
throw LambdaRuntimeClientError.invocationMissingHeader(AmazonHeaders.traceID)
}

self.requestId = requestId
self.deadlineDate = unixTimeMilliseconds
self.invokedFunctionArn = invokedFunctionArn
self.traceId = traceId
self.clientContext = headers["Lambda-Runtime-Client-Context"].first
self.cognitoIdentity = headers["Lambda-Runtime-Cognito-Identity"].first
}
}
2 changes: 1 addition & 1 deletion Tests/SwiftAwsLambdaTests/Lambda+CodeableTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ private struct Response: Codable {
}

private struct CodableEchoHandler: LambdaCodableHandler {
func handle(context: LambdaContext, payload: Request, callback: @escaping LambdaCodableCallback<Response>) {
func handle(context: Lambda.Context, payload: Request, callback: @escaping LambdaCodableCallback<Response>) {
callback(.success(Response(requestId: payload.requestId)))
}
}
2 changes: 1 addition & 1 deletion Tests/SwiftAwsLambdaTests/Lambda+StringTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ private struct BadBehavior: LambdaServerBehavior {
}

private struct StringEchoHandler: LambdaStringHandler {
func handle(context: LambdaContext, payload: String, callback: @escaping LambdaStringCallback) {
func handle(context: Lambda.Context, payload: String, callback: @escaping LambdaStringCallback) {
callback(.success(payload))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ extension LambdaRuntimeClientTest {
return [
("testGetWorkServerInternalError", testGetWorkServerInternalError),
("testGetWorkServerNoBodyError", testGetWorkServerNoBodyError),
("testGetWorkServerNoContextError", testGetWorkServerNoContextError),
("testGetWorkServerMissingHeaderRequestIDError", testGetWorkServerMissingHeaderRequestIDError),
("testProcessResponseInternalServerError", testProcessResponseInternalServerError),
("testProcessErrorInternalServerError", testProcessErrorInternalServerError),
("testProcessInitErrorInternalServerError", testProcessInitErrorInternalServerError),
Expand Down
Loading