Skip to content

Commit 5d235c0

Browse files
authored
Add ControlPlaneRequestEncoder (#239)
Add a new `ControlPlaneRequestEncoder` that encodes all control plane requests into an existing, reused buffer.
1 parent bc78f60 commit 5d235c0

File tree

2 files changed

+305
-0
lines changed

2 files changed

+305
-0
lines changed
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the SwiftAWSLambdaRuntime open source project
4+
//
5+
// Copyright (c) 2021 Apple Inc. and the SwiftAWSLambdaRuntime project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import NIOCore
16+
17+
struct ControlPlaneRequestEncoder: _EmittingChannelHandler {
18+
typealias OutboundOut = ByteBuffer
19+
20+
private var host: String
21+
private var byteBuffer: ByteBuffer!
22+
23+
init(host: String) {
24+
self.host = host
25+
}
26+
27+
mutating func writeRequest(_ request: ControlPlaneRequest, context: ChannelHandlerContext, promise: EventLoopPromise<Void>?) {
28+
self.byteBuffer.clear(minimumCapacity: self.byteBuffer.storageCapacity)
29+
30+
switch request {
31+
case .next:
32+
self.byteBuffer.writeString(.nextInvocationRequestLine)
33+
self.byteBuffer.writeHostHeader(host: self.host)
34+
self.byteBuffer.writeString(.userAgentHeader)
35+
self.byteBuffer.writeString(.CRLF) // end of head
36+
context.write(self.wrapOutboundOut(self.byteBuffer), promise: promise)
37+
context.flush()
38+
39+
case .invocationResponse(let requestID, let payload):
40+
let contentLength = payload?.readableBytes ?? 0
41+
self.byteBuffer.writeInvocationResultRequestLine(requestID)
42+
self.byteBuffer.writeHostHeader(host: self.host)
43+
self.byteBuffer.writeString(.userAgentHeader)
44+
self.byteBuffer.writeContentLengthHeader(length: contentLength)
45+
self.byteBuffer.writeString(.CRLF) // end of head
46+
if let payload = payload, contentLength > 0 {
47+
context.write(self.wrapOutboundOut(self.byteBuffer), promise: nil)
48+
context.write(self.wrapOutboundOut(payload), promise: promise)
49+
} else {
50+
context.write(self.wrapOutboundOut(self.byteBuffer), promise: promise)
51+
}
52+
context.flush()
53+
54+
case .invocationError(let requestID, let errorMessage):
55+
let payload = errorMessage.toJSONBytes()
56+
self.byteBuffer.writeInvocationErrorRequestLine(requestID)
57+
self.byteBuffer.writeContentLengthHeader(length: payload.count)
58+
self.byteBuffer.writeHostHeader(host: self.host)
59+
self.byteBuffer.writeString(.userAgentHeader)
60+
self.byteBuffer.writeString(.unhandledErrorHeader)
61+
self.byteBuffer.writeString(.CRLF) // end of head
62+
self.byteBuffer.writeBytes(payload)
63+
context.write(self.wrapOutboundOut(self.byteBuffer), promise: promise)
64+
context.flush()
65+
66+
case .initializationError(let errorMessage):
67+
let payload = errorMessage.toJSONBytes()
68+
self.byteBuffer.writeString(.runtimeInitErrorRequestLine)
69+
self.byteBuffer.writeContentLengthHeader(length: payload.count)
70+
self.byteBuffer.writeHostHeader(host: self.host)
71+
self.byteBuffer.writeString(.userAgentHeader)
72+
self.byteBuffer.writeString(.unhandledErrorHeader)
73+
self.byteBuffer.writeString(.CRLF) // end of head
74+
self.byteBuffer.writeBytes(payload)
75+
context.write(self.wrapOutboundOut(self.byteBuffer), promise: promise)
76+
context.flush()
77+
}
78+
}
79+
80+
mutating func writerAdded(context: ChannelHandlerContext) {
81+
self.byteBuffer = context.channel.allocator.buffer(capacity: 256)
82+
}
83+
84+
mutating func writerRemoved(context: ChannelHandlerContext) {
85+
self.byteBuffer = nil
86+
}
87+
}
88+
89+
extension String {
90+
static let CRLF: String = "\r\n"
91+
92+
static let userAgentHeader: String = "user-agent: Swift-Lambda/Unknown\r\n"
93+
static let unhandledErrorHeader: String = "lambda-runtime-function-error-type: Unhandled\r\n"
94+
95+
static let nextInvocationRequestLine: String =
96+
"GET /2018-06-01/runtime/invocation/next HTTP/1.1\r\n"
97+
98+
static let runtimeInitErrorRequestLine: String =
99+
"POST /2018-06-01/runtime/init/error HTTP/1.1\r\n"
100+
}
101+
102+
extension ByteBuffer {
103+
fileprivate mutating func writeInvocationResultRequestLine(_ requestID: String) {
104+
self.writeString("POST /2018-06-01/runtime/invocation/")
105+
self.writeString(requestID)
106+
self.writeString("/response HTTP/1.1\r\n")
107+
}
108+
109+
fileprivate mutating func writeInvocationErrorRequestLine(_ requestID: String) {
110+
self.writeString("POST /2018-06-01/runtime/invocation/")
111+
self.writeString(requestID)
112+
self.writeString("/error HTTP/1.1\r\n")
113+
}
114+
115+
fileprivate mutating func writeHostHeader(host: String) {
116+
self.writeString("host: ")
117+
self.writeString(host)
118+
self.writeString(.CRLF)
119+
}
120+
121+
fileprivate mutating func writeContentLengthHeader(length: Int) {
122+
self.writeString("content-length: ")
123+
self.writeString("\(length)")
124+
self.writeString(.CRLF)
125+
}
126+
}
Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the SwiftAWSLambdaRuntime open source project
4+
//
5+
// Copyright (c) 2021 Apple Inc. and the SwiftAWSLambdaRuntime project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
@testable import AWSLambdaRuntimeCore
16+
import NIOCore
17+
import NIOEmbedded
18+
import NIOHTTP1
19+
import XCTest
20+
21+
final class ControlPlaneRequestEncoderTests: XCTestCase {
22+
let host = "192.168.0.1"
23+
24+
var client: EmbeddedChannel!
25+
var server: EmbeddedChannel!
26+
27+
override func setUp() {
28+
self.client = EmbeddedChannel(handler: ControlPlaneRequestEncoderHandler(host: self.host))
29+
self.server = EmbeddedChannel(handlers: [
30+
ByteToMessageHandler(HTTPRequestDecoder(leftOverBytesStrategy: .dropBytes)),
31+
NIOHTTPServerRequestAggregator(maxContentLength: 1024 * 1024),
32+
])
33+
}
34+
35+
override func tearDown() {
36+
XCTAssertNoThrow(try self.client.finish(acceptAlreadyClosed: false))
37+
XCTAssertNoThrow(try self.server.finish(acceptAlreadyClosed: false))
38+
self.client = nil
39+
self.server = nil
40+
}
41+
42+
func testNextRequest() {
43+
var request: NIOHTTPServerRequestFull?
44+
XCTAssertNoThrow(request = try self.sendRequest(.next))
45+
46+
XCTAssertEqual(request?.head.isKeepAlive, true)
47+
XCTAssertEqual(request?.head.method, .GET)
48+
XCTAssertEqual(request?.head.uri, "/2018-06-01/runtime/invocation/next")
49+
XCTAssertEqual(request?.head.version, .http1_1)
50+
XCTAssertEqual(request?.head.headers["host"], [self.host])
51+
XCTAssertEqual(request?.head.headers["user-agent"], ["Swift-Lambda/Unknown"])
52+
53+
XCTAssertNil(try self.server.readInbound(as: NIOHTTPServerRequestFull.self))
54+
}
55+
56+
func testPostInvocationSuccessWithoutBody() {
57+
let requestID = UUID().uuidString
58+
var request: NIOHTTPServerRequestFull?
59+
XCTAssertNoThrow(request = try self.sendRequest(.invocationResponse(requestID, nil)))
60+
61+
XCTAssertEqual(request?.head.isKeepAlive, true)
62+
XCTAssertEqual(request?.head.method, .POST)
63+
XCTAssertEqual(request?.head.uri, "/2018-06-01/runtime/invocation/\(requestID)/response")
64+
XCTAssertEqual(request?.head.version, .http1_1)
65+
XCTAssertEqual(request?.head.headers["host"], [self.host])
66+
XCTAssertEqual(request?.head.headers["user-agent"], ["Swift-Lambda/Unknown"])
67+
XCTAssertEqual(request?.head.headers["content-length"], ["0"])
68+
69+
XCTAssertNil(try self.server.readInbound(as: NIOHTTPServerRequestFull.self))
70+
}
71+
72+
func testPostInvocationSuccessWithBody() {
73+
let requestID = UUID().uuidString
74+
let payload = ByteBuffer(string: "hello swift lambda!")
75+
76+
var request: NIOHTTPServerRequestFull?
77+
XCTAssertNoThrow(request = try self.sendRequest(.invocationResponse(requestID, payload)))
78+
79+
XCTAssertEqual(request?.head.isKeepAlive, true)
80+
XCTAssertEqual(request?.head.method, .POST)
81+
XCTAssertEqual(request?.head.uri, "/2018-06-01/runtime/invocation/\(requestID)/response")
82+
XCTAssertEqual(request?.head.version, .http1_1)
83+
XCTAssertEqual(request?.head.headers["host"], [self.host])
84+
XCTAssertEqual(request?.head.headers["user-agent"], ["Swift-Lambda/Unknown"])
85+
XCTAssertEqual(request?.head.headers["content-length"], ["\(payload.readableBytes)"])
86+
XCTAssertEqual(request?.body, payload)
87+
88+
XCTAssertNil(try self.server.readInbound(as: NIOHTTPServerRequestFull.self))
89+
}
90+
91+
func testPostInvocationErrorWithBody() {
92+
let requestID = UUID().uuidString
93+
let error = ErrorResponse(errorType: "SomeError", errorMessage: "An error happened")
94+
var request: NIOHTTPServerRequestFull?
95+
XCTAssertNoThrow(request = try self.sendRequest(.invocationError(requestID, error)))
96+
97+
XCTAssertEqual(request?.head.isKeepAlive, true)
98+
XCTAssertEqual(request?.head.method, .POST)
99+
XCTAssertEqual(request?.head.uri, "/2018-06-01/runtime/invocation/\(requestID)/error")
100+
XCTAssertEqual(request?.head.version, .http1_1)
101+
XCTAssertEqual(request?.head.headers["host"], [self.host])
102+
XCTAssertEqual(request?.head.headers["user-agent"], ["Swift-Lambda/Unknown"])
103+
XCTAssertEqual(request?.head.headers["lambda-runtime-function-error-type"], ["Unhandled"])
104+
let expectedBody = #"{"errorType":"SomeError","errorMessage":"An error happened"}"#
105+
106+
XCTAssertEqual(request?.head.headers["content-length"], ["\(expectedBody.utf8.count)"])
107+
XCTAssertEqual(try request?.body?.getString(at: 0, length: XCTUnwrap(request?.body?.readableBytes)),
108+
expectedBody)
109+
110+
XCTAssertNil(try self.server.readInbound(as: NIOHTTPServerRequestFull.self))
111+
}
112+
113+
func testPostStartupError() {
114+
let error = ErrorResponse(errorType: "StartupError", errorMessage: "Urgh! Startup failed. 😨")
115+
var request: NIOHTTPServerRequestFull?
116+
XCTAssertNoThrow(request = try self.sendRequest(.initializationError(error)))
117+
118+
XCTAssertEqual(request?.head.isKeepAlive, true)
119+
XCTAssertEqual(request?.head.method, .POST)
120+
XCTAssertEqual(request?.head.uri, "/2018-06-01/runtime/init/error")
121+
XCTAssertEqual(request?.head.version, .http1_1)
122+
XCTAssertEqual(request?.head.headers["host"], [self.host])
123+
XCTAssertEqual(request?.head.headers["user-agent"], ["Swift-Lambda/Unknown"])
124+
XCTAssertEqual(request?.head.headers["lambda-runtime-function-error-type"], ["Unhandled"])
125+
let expectedBody = #"{"errorType":"StartupError","errorMessage":"Urgh! Startup failed. 😨"}"#
126+
XCTAssertEqual(request?.head.headers["content-length"], ["\(expectedBody.utf8.count)"])
127+
XCTAssertEqual(try request?.body?.getString(at: 0, length: XCTUnwrap(request?.body?.readableBytes)),
128+
expectedBody)
129+
130+
XCTAssertNil(try self.server.readInbound(as: NIOHTTPServerRequestFull.self))
131+
}
132+
133+
func testMultipleNextAndResponseSuccessRequests() {
134+
for _ in 0 ..< 1000 {
135+
var nextRequest: NIOHTTPServerRequestFull?
136+
XCTAssertNoThrow(nextRequest = try self.sendRequest(.next))
137+
XCTAssertEqual(nextRequest?.head.method, .GET)
138+
XCTAssertEqual(nextRequest?.head.uri, "/2018-06-01/runtime/invocation/next")
139+
140+
let requestID = UUID().uuidString
141+
let payload = ByteBuffer(string: "hello swift lambda!")
142+
var successRequest: NIOHTTPServerRequestFull?
143+
XCTAssertNoThrow(successRequest = try self.sendRequest(.invocationResponse(requestID, payload)))
144+
XCTAssertEqual(successRequest?.head.method, .POST)
145+
XCTAssertEqual(successRequest?.head.uri, "/2018-06-01/runtime/invocation/\(requestID)/response")
146+
}
147+
}
148+
149+
func sendRequest(_ request: ControlPlaneRequest) throws -> NIOHTTPServerRequestFull? {
150+
try self.client.writeOutbound(request)
151+
while let part = try self.client.readOutbound(as: ByteBuffer.self) {
152+
XCTAssertNoThrow(try self.server.writeInbound(part))
153+
}
154+
return try self.server.readInbound(as: NIOHTTPServerRequestFull.self)
155+
}
156+
}
157+
158+
private final class ControlPlaneRequestEncoderHandler: ChannelOutboundHandler {
159+
typealias OutboundIn = ControlPlaneRequest
160+
typealias OutboundOut = ByteBuffer
161+
162+
private var encoder: ControlPlaneRequestEncoder
163+
164+
init(host: String) {
165+
self.encoder = ControlPlaneRequestEncoder(host: host)
166+
}
167+
168+
func handlerAdded(context: ChannelHandlerContext) {
169+
self.encoder.writerAdded(context: context)
170+
}
171+
172+
func handlerRemoved(context: ChannelHandlerContext) {
173+
self.encoder.writerRemoved(context: context)
174+
}
175+
176+
func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
177+
self.encoder.writeRequest(self.unwrapOutboundIn(data), context: context, promise: promise)
178+
}
179+
}

0 commit comments

Comments
 (0)