Skip to content

Commit 53930fe

Browse files
fabianfetttomerd
andauthored
add debug functionality to test with mock server (#73)
motivation: allow end to end testing locally changes: * add a Lambda+LocalServer which exposes Lambda.withLocalServer available only in DEBUG mode * local server can receive POST requests with payloads on a configurable endpoint and and send them to the Lambda * move Lifecycle completely into EventLoop * remove all locks since running in one EventLoop * Use UUID for requestId Co-authored-by: tom doron <tomer@apple.com>
1 parent 64ee5a2 commit 53930fe

12 files changed

+394
-46
lines changed

Package.swift

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -47,20 +47,12 @@ let package = Package(
4747
.byName(name: "AWSLambdaRuntime"),
4848
.product(name: "NIO", package: "swift-nio"),
4949
]),
50-
.testTarget(name: "AWSLambdaTestingTests", dependencies: [
51-
.byName(name: "AWSLambdaTesting"),
52-
.byName(name: "AWSLambdaRuntime"),
53-
]),
54-
// samples
55-
.target(name: "StringSample", dependencies: [
56-
.byName(name: "AWSLambdaRuntime"),
57-
]),
58-
.target(name: "CodableSample", dependencies: [
59-
.byName(name: "AWSLambdaRuntime"),
60-
]),
61-
// perf tests
50+
.testTarget(name: "AWSLambdaTestingTests", dependencies: ["AWSLambdaTesting"]),
51+
// for perf testing
6252
.target(name: "MockServer", dependencies: [
6353
.product(name: "NIOHTTP1", package: "swift-nio"),
6454
]),
55+
.target(name: "StringSample", dependencies: ["AWSLambdaRuntime"]),
56+
.target(name: "CodableSample", dependencies: ["AWSLambdaRuntime"]),
6557
]
6658
)

Sources/AWSLambdaRuntimeCore/HTTPClient.swift

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ internal final class HTTPClient {
2525
private let targetHost: String
2626

2727
private var state = State.disconnected
28-
private let executing = NIOAtomic.makeAtomic(value: false)
28+
private var executing = false
2929

3030
init(eventLoop: EventLoop, configuration: Lambda.Configuration.RuntimeEngine) {
3131
self.eventLoop = eventLoop
@@ -48,9 +48,26 @@ internal final class HTTPClient {
4848
timeout: timeout ?? self.configuration.requestTimeout))
4949
}
5050

51+
/// cancels the current request if there is one
52+
func cancel() {
53+
guard self.executing else {
54+
// there is no request running. nothing to cancel
55+
return
56+
}
57+
58+
guard case .connected(let channel) = self.state else {
59+
preconditionFailure("if we are executing, we expect to have an open channel")
60+
}
61+
62+
channel.triggerUserOutboundEvent(RequestCancelEvent(), promise: nil)
63+
}
64+
5165
// TODO: cap reconnect attempt
5266
private func execute(_ request: Request, validate: Bool = true) -> EventLoopFuture<Response> {
53-
precondition(!validate || self.executing.compareAndExchange(expected: false, desired: true), "expecting single request at a time")
67+
if validate {
68+
precondition(self.executing == false, "expecting single request at a time")
69+
self.executing = true
70+
}
5471

5572
switch self.state {
5673
case .disconnected:
@@ -66,7 +83,8 @@ internal final class HTTPClient {
6683

6784
let promise = channel.eventLoop.makePromise(of: Response.self)
6885
promise.futureResult.whenComplete { _ in
69-
precondition(self.executing.compareAndExchange(expected: true, desired: false), "invalid execution state")
86+
precondition(self.executing == true, "invalid execution state")
87+
self.executing = false
7088
}
7189
let wrapper = HTTPRequestWrapper(request: request, promise: promise)
7290
channel.writeAndFlush(wrapper).cascadeFailure(to: promise)
@@ -120,6 +138,7 @@ internal final class HTTPClient {
120138
internal enum Errors: Error {
121139
case connectionResetByPeer
122140
case timeout
141+
case cancelled
123142
}
124143

125144
private enum State {
@@ -284,6 +303,17 @@ private final class UnaryHandler: ChannelDuplexHandler {
284303
context.fireChannelInactive()
285304
}
286305

306+
func triggerUserOutboundEvent(context: ChannelHandlerContext, event: Any, promise: EventLoopPromise<Void>?) {
307+
switch event {
308+
case is RequestCancelEvent:
309+
if self.pending != nil {
310+
self.completeWith(.failure(HTTPClient.Errors.cancelled))
311+
}
312+
default:
313+
context.triggerUserOutboundEvent(event, promise: promise)
314+
}
315+
}
316+
287317
private func completeWith(_ result: Result<HTTPClient.Response, Error>) {
288318
guard let pending = self.pending else {
289319
preconditionFailure("invalid state, no pending request")
@@ -299,3 +329,5 @@ private struct HTTPRequestWrapper {
299329
let request: HTTPClient.Request
300330
let promise: EventLoopPromise<HTTPClient.Response>
301331
}
332+
333+
private struct RequestCancelEvent {}
Lines changed: 268 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,268 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the SwiftAWSLambdaRuntime open source project
4+
//
5+
// Copyright (c) 2020 Apple Inc. and the SwiftAWSLambdaRuntime project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
#if DEBUG
16+
import Dispatch
17+
import Logging
18+
import NIO
19+
import NIOConcurrencyHelpers
20+
import NIOHTTP1
21+
22+
// This functionality is designed for local testing hence beind a #if DEBUG flag.
23+
// For example:
24+
//
25+
// try Lambda.withLocalServer {
26+
// Lambda.run { (context: Lambda.Context, payload: String, callback: @escaping (Result<String, Error>) -> Void) in
27+
// callback(.success("Hello, \(payload)!"))
28+
// }
29+
// }
30+
extension Lambda {
31+
/// Execute code in the context of a mock Lambda server.
32+
///
33+
/// - parameters:
34+
/// - invocationEndpoint: The endpoint to post payloads to.
35+
/// - body: Code to run within the context of the mock server. Typically this would be a Lambda.run function call.
36+
///
37+
/// - note: This API is designed stricly for local testing and is behind a DEBUG flag
38+
public static func withLocalServer(invocationEndpoint: String? = nil, _ body: @escaping () -> Void) throws {
39+
let server = LocalLambda.Server(invocationEndpoint: invocationEndpoint)
40+
try server.start().wait()
41+
defer { try! server.stop() } // FIXME:
42+
body()
43+
}
44+
}
45+
46+
// MARK: - Local Mock Server
47+
48+
private enum LocalLambda {
49+
struct Server {
50+
private let logger: Logger
51+
private let group: EventLoopGroup
52+
private let host: String
53+
private let port: Int
54+
private let invocationEndpoint: String
55+
56+
public init(invocationEndpoint: String?) {
57+
let configuration = Lambda.Configuration()
58+
var logger = Logger(label: "LocalLambdaServer")
59+
logger.logLevel = configuration.general.logLevel
60+
self.logger = logger
61+
self.group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
62+
self.host = configuration.runtimeEngine.ip
63+
self.port = configuration.runtimeEngine.port
64+
self.invocationEndpoint = invocationEndpoint ?? "/invoke"
65+
}
66+
67+
func start() -> EventLoopFuture<Void> {
68+
let bootstrap = ServerBootstrap(group: group)
69+
.serverChannelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
70+
.childChannelInitializer { channel in
71+
channel.pipeline.configureHTTPServerPipeline(withErrorHandling: true).flatMap { _ in
72+
channel.pipeline.addHandler(HTTPHandler(logger: self.logger, invocationEndpoint: self.invocationEndpoint))
73+
}
74+
}
75+
return bootstrap.bind(host: self.host, port: self.port).flatMap { channel -> EventLoopFuture<Void> in
76+
guard channel.localAddress != nil else {
77+
return channel.eventLoop.makeFailedFuture(ServerError.cantBind)
78+
}
79+
self.logger.info("LocalLambdaServer started and listening on \(self.host):\(self.port), receiving payloads on \(self.invocationEndpoint)")
80+
return channel.eventLoop.makeSucceededFuture(())
81+
}
82+
}
83+
84+
func stop() throws {
85+
try self.group.syncShutdownGracefully()
86+
}
87+
}
88+
89+
final class HTTPHandler: ChannelInboundHandler {
90+
public typealias InboundIn = HTTPServerRequestPart
91+
public typealias OutboundOut = HTTPServerResponsePart
92+
93+
private var pending = CircularBuffer<(head: HTTPRequestHead, body: ByteBuffer?)>()
94+
95+
private static var invocations = CircularBuffer<Invocation>()
96+
private static var invocationState = InvocationState.waitingForLambdaRequest
97+
98+
private let logger: Logger
99+
private let invocationEndpoint: String
100+
101+
init(logger: Logger, invocationEndpoint: String) {
102+
self.logger = logger
103+
self.invocationEndpoint = invocationEndpoint
104+
}
105+
106+
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
107+
let requestPart = unwrapInboundIn(data)
108+
109+
switch requestPart {
110+
case .head(let head):
111+
self.pending.append((head: head, body: nil))
112+
case .body(var buffer):
113+
var request = self.pending.removeFirst()
114+
if request.body == nil {
115+
request.body = buffer
116+
} else {
117+
request.body!.writeBuffer(&buffer)
118+
}
119+
self.pending.prepend(request)
120+
case .end:
121+
let request = self.pending.removeFirst()
122+
self.processRequest(context: context, request: request)
123+
}
124+
}
125+
126+
func processRequest(context: ChannelHandlerContext, request: (head: HTTPRequestHead, body: ByteBuffer?)) {
127+
switch (request.head.method, request.head.uri) {
128+
// this endpoint is called by the client invoking the lambda
129+
case (.POST, let url) where url.hasSuffix(self.invocationEndpoint):
130+
guard let work = request.body else {
131+
return self.writeResponse(context: context, response: .init(status: .badRequest))
132+
}
133+
let requestID = "\(DispatchTime.now().uptimeNanoseconds)" // FIXME:
134+
let promise = context.eventLoop.makePromise(of: Response.self)
135+
promise.futureResult.whenComplete { result in
136+
switch result {
137+
case .failure(let error):
138+
self.logger.error("invocation error: \(error)")
139+
self.writeResponse(context: context, response: .init(status: .internalServerError))
140+
case .success(let response):
141+
self.writeResponse(context: context, response: response)
142+
}
143+
}
144+
let invocation = Invocation(requestID: requestID, request: work, responsePromise: promise)
145+
switch Self.invocationState {
146+
case .waitingForInvocation(let promise):
147+
promise.succeed(invocation)
148+
case .waitingForLambdaRequest, .waitingForLambdaResponse:
149+
Self.invocations.append(invocation)
150+
}
151+
// /next endpoint is called by the lambda polling for work
152+
case (.GET, let url) where url.hasSuffix(Consts.getNextInvocationURLSuffix):
153+
// check if our server is in the correct state
154+
guard case .waitingForLambdaRequest = Self.invocationState else {
155+
self.logger.error("invalid invocation state \(Self.invocationState)")
156+
self.writeResponse(context: context, response: .init(status: .unprocessableEntity))
157+
return
158+
}
159+
160+
// pop the first task from the queue
161+
switch Self.invocations.popFirst() {
162+
case .none:
163+
// if there is nothing in the queue,
164+
// create a promise that we can fullfill when we get a new task
165+
let promise = context.eventLoop.makePromise(of: Invocation.self)
166+
promise.futureResult.whenComplete { result in
167+
switch result {
168+
case .failure(let error):
169+
self.logger.error("invocation error: \(error)")
170+
self.writeResponse(context: context, response: .init(status: .internalServerError))
171+
case .success(let invocation):
172+
Self.invocationState = .waitingForLambdaResponse(invocation)
173+
self.writeResponse(context: context, response: invocation.makeResponse())
174+
}
175+
}
176+
Self.invocationState = .waitingForInvocation(promise)
177+
case .some(let invocation):
178+
// if there is a task pending, we can immediatly respond with it.
179+
Self.invocationState = .waitingForLambdaResponse(invocation)
180+
self.writeResponse(context: context, response: invocation.makeResponse())
181+
}
182+
// :requestID/response endpoint is called by the lambda posting the response
183+
case (.POST, let url) where url.hasSuffix(Consts.postResponseURLSuffix):
184+
let parts = request.head.uri.split(separator: "/")
185+
guard let requestID = parts.count > 2 ? String(parts[parts.count - 2]) : nil else {
186+
// the request is malformed, since we were expecting a requestId in the path
187+
return self.writeResponse(context: context, response: .init(status: .badRequest))
188+
}
189+
guard case .waitingForLambdaResponse(let invocation) = Self.invocationState else {
190+
// a response was send, but we did not expect to receive one
191+
self.logger.error("invalid invocation state \(Self.invocationState)")
192+
return self.writeResponse(context: context, response: .init(status: .unprocessableEntity))
193+
}
194+
guard requestID == invocation.requestID else {
195+
// the request's requestId is not matching the one we are expecting
196+
self.logger.error("invalid invocation state request ID \(requestID) does not match expected \(invocation.requestID)")
197+
return self.writeResponse(context: context, response: .init(status: .badRequest))
198+
}
199+
200+
invocation.responsePromise.succeed(.init(status: .ok, body: request.body))
201+
self.writeResponse(context: context, response: .init(status: .accepted))
202+
Self.invocationState = .waitingForLambdaRequest
203+
// unknown call
204+
default:
205+
self.writeResponse(context: context, response: .init(status: .notFound))
206+
}
207+
}
208+
209+
func writeResponse(context: ChannelHandlerContext, response: Response) {
210+
var headers = HTTPHeaders(response.headers ?? [])
211+
headers.add(name: "content-length", value: "\(response.body?.readableBytes ?? 0)")
212+
let head = HTTPResponseHead(version: HTTPVersion(major: 1, minor: 1), status: response.status, headers: headers)
213+
214+
context.write(wrapOutboundOut(.head(head))).whenFailure { error in
215+
self.logger.error("\(self) write error \(error)")
216+
}
217+
218+
if let buffer = response.body {
219+
context.write(wrapOutboundOut(.body(.byteBuffer(buffer)))).whenFailure { error in
220+
self.logger.error("\(self) write error \(error)")
221+
}
222+
}
223+
224+
context.writeAndFlush(wrapOutboundOut(.end(nil))).whenComplete { result in
225+
if case .failure(let error) = result {
226+
self.logger.error("\(self) write error \(error)")
227+
}
228+
}
229+
}
230+
231+
struct Response {
232+
var status: HTTPResponseStatus = .ok
233+
var headers: [(String, String)]?
234+
var body: ByteBuffer?
235+
}
236+
237+
struct Invocation {
238+
let requestID: String
239+
let request: ByteBuffer
240+
let responsePromise: EventLoopPromise<Response>
241+
242+
func makeResponse() -> Response {
243+
var response = Response()
244+
response.body = self.request
245+
// required headers
246+
response.headers = [
247+
(AmazonHeaders.requestID, self.requestID),
248+
(AmazonHeaders.invokedFunctionARN, "arn:aws:lambda:us-east-1:\(Int16.random(in: Int16.min ... Int16.max)):function:custom-runtime"),
249+
(AmazonHeaders.traceID, "Root=\(Int16.random(in: Int16.min ... Int16.max));Parent=\(Int16.random(in: Int16.min ... Int16.max));Sampled=1"),
250+
(AmazonHeaders.deadline, "\(DispatchWallTime.distantFuture.millisSinceEpoch)"),
251+
]
252+
return response
253+
}
254+
}
255+
256+
enum InvocationState {
257+
case waitingForInvocation(EventLoopPromise<Invocation>)
258+
case waitingForLambdaRequest
259+
case waitingForLambdaResponse(Invocation)
260+
}
261+
}
262+
263+
enum ServerError: Error {
264+
case notReady
265+
case cantBind
266+
}
267+
}
268+
#endif

Sources/AWSLambdaRuntimeCore/Lambda.swift

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,15 +104,19 @@ public enum Lambda {
104104
var result: Result<Int, Error>!
105105
MultiThreadedEventLoopGroup.withCurrentThreadAsEventLoop { eventLoop in
106106
let lifecycle = Lifecycle(eventLoop: eventLoop, logger: logger, configuration: configuration, factory: factory)
107+
#if DEBUG
107108
let signalSource = trap(signal: configuration.lifecycle.stopSignal) { signal in
108109
logger.info("intercepted signal: \(signal)")
109110
lifecycle.shutdown()
110111
}
112+
#endif
111113

112114
lifecycle.start().flatMap {
113115
lifecycle.shutdownFuture
114116
}.whenComplete { lifecycleResult in
117+
#if DEBUG
115118
signalSource.cancel()
119+
#endif
116120
eventLoop.shutdownGracefully { error in
117121
if let error = error {
118122
preconditionFailure("Failed to shutdown eventloop: \(error)")

0 commit comments

Comments
 (0)