Skip to content

Commit c31cd9e

Browse files
committed
Add ControlPlaneRequestEncoder
1 parent b8d89ca commit c31cd9e

File tree

2 files changed

+290
-0
lines changed

2 files changed

+290
-0
lines changed
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the SwiftAWSLambdaRuntime open source project
4+
//
5+
// Copyright (c) 2021 Apple Inc. and the SwiftAWSLambdaRuntime project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import NIOCore
16+
17+
struct ControlPlaneRequestEncoder: _EmittingChannelHandler {
18+
typealias OutboundOut = ByteBuffer
19+
20+
private var host: String
21+
private var byteBuffer: ByteBuffer!
22+
23+
init(host: String) {
24+
self.host = host
25+
}
26+
27+
mutating func writeRequest(_ request: ControlPlaneRequest, context: ChannelHandlerContext, promise: EventLoopPromise<Void>?) {
28+
self.byteBuffer.clear(minimumCapacity: self.byteBuffer.storageCapacity)
29+
30+
switch request {
31+
case .next:
32+
self.byteBuffer.writeString(.nextInvocationRequestLine)
33+
self.byteBuffer.writeHostHeader(host: self.host)
34+
self.byteBuffer.writeString(.userAgentHeader)
35+
self.byteBuffer.writeString(.CRLF) // end of head
36+
context.write(self.wrapOutboundOut(self.byteBuffer), promise: promise)
37+
context.flush()
38+
39+
case .invocationResponse(let requestID, let payload):
40+
let contentLength = payload?.readableBytes ?? 0
41+
self.byteBuffer.writeInvocationResultRequestLine(requestID)
42+
self.byteBuffer.writeHostHeader(host: self.host)
43+
self.byteBuffer.writeString(.userAgentHeader)
44+
self.byteBuffer.writeContentLengthHeader(length: contentLength)
45+
self.byteBuffer.writeString(.CRLF) // end of head
46+
if contentLength > 0 {
47+
context.write(self.wrapOutboundOut(self.byteBuffer), promise: nil)
48+
context.write(self.wrapOutboundOut(payload!), promise: promise)
49+
} else {
50+
context.write(self.wrapOutboundOut(self.byteBuffer), promise: promise)
51+
}
52+
context.flush()
53+
54+
case .invocationError(let requestID, let errorMessage):
55+
let payload = errorMessage.toJSONBytes()
56+
self.byteBuffer.writeInvocationErrorRequestLine(requestID)
57+
self.byteBuffer.writeContentLengthHeader(length: payload.count)
58+
self.byteBuffer.writeHostHeader(host: self.host)
59+
self.byteBuffer.writeString(.userAgentHeader)
60+
self.byteBuffer.writeString(.unhandledErrorHeader)
61+
self.byteBuffer.writeString(.CRLF) // end of head
62+
self.byteBuffer.writeBytes(payload)
63+
context.write(self.wrapOutboundOut(self.byteBuffer), promise: promise)
64+
context.flush()
65+
66+
case .initializationError(let errorMessage):
67+
let payload = errorMessage.toJSONBytes()
68+
self.byteBuffer.writeString(.runtimeInitErrorRequestLine)
69+
self.byteBuffer.writeContentLengthHeader(length: payload.count)
70+
self.byteBuffer.writeHostHeader(host: self.host)
71+
self.byteBuffer.writeString(.userAgentHeader)
72+
self.byteBuffer.writeString(.unhandledErrorHeader)
73+
self.byteBuffer.writeString(.CRLF) // end of head
74+
self.byteBuffer.writeBytes(payload)
75+
context.write(self.wrapOutboundOut(self.byteBuffer), promise: promise)
76+
context.flush()
77+
}
78+
}
79+
80+
mutating func writerAdded(context: ChannelHandlerContext) {
81+
self.byteBuffer = context.channel.allocator.buffer(capacity: 256)
82+
}
83+
84+
mutating func writerRemoved(context: ChannelHandlerContext) {
85+
self.byteBuffer = nil
86+
}
87+
}
88+
89+
extension String {
90+
static let CRLF: String = "\r\n"
91+
92+
static let userAgentHeader: String = "user-agent: Swift-Lambda/Unknown\r\n"
93+
static let unhandledErrorHeader: String = "lambda-runtime-function-error-type: Unhandled\r\n"
94+
95+
static let nextInvocationRequestLine: String =
96+
"GET /2018-06-01/runtime/invocation/next HTTP/1.1\r\n"
97+
98+
static let runtimeInitErrorRequestLine: String =
99+
"POST /2018-06-01/runtime/init/error HTTP/1.1\r\n"
100+
}
101+
102+
extension ByteBuffer {
103+
104+
fileprivate mutating func writeInvocationResultRequestLine(_ requestID: String) {
105+
self.writeString("POST /2018-06-01/runtime/invocation/")
106+
self.writeString(requestID)
107+
self.writeString("/response HTTP/1.1\r\n")
108+
}
109+
110+
fileprivate mutating func writeInvocationErrorRequestLine(_ requestID: String) {
111+
self.writeString("POST /2018-06-01/runtime/invocation/")
112+
self.writeString(requestID)
113+
self.writeString("/error HTTP/1.1\r\n")
114+
}
115+
116+
fileprivate mutating func writeHostHeader(host: String) {
117+
self.writeString("host: ")
118+
self.writeString(host)
119+
self.writeString(.CRLF)
120+
}
121+
122+
fileprivate mutating func writeContentLengthHeader(length: Int) {
123+
self.writeString("content-length: ")
124+
self.writeString("\(length)")
125+
self.writeString(.CRLF)
126+
}
127+
}
Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the SwiftAWSLambdaRuntime open source project
4+
//
5+
// Copyright (c) 2021 Apple Inc. and the SwiftAWSLambdaRuntime project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
@testable import AWSLambdaRuntimeCore
16+
import NIOCore
17+
import NIOHTTP1
18+
import XCTest
19+
import NIOEmbedded
20+
21+
final class ControlPlaneRequestEncoderTests: XCTestCase {
22+
let host = "192.168.0.1"
23+
24+
var client: EmbeddedChannel!
25+
var server: EmbeddedChannel!
26+
27+
override func setUp() {
28+
self.client = EmbeddedChannel(handler: ControlPlaneRequestEncoderHandler(host: self.host))
29+
self.server = EmbeddedChannel(handlers: [
30+
ByteToMessageHandler(HTTPRequestDecoder(leftOverBytesStrategy: .dropBytes)),
31+
NIOHTTPServerRequestAggregator(maxContentLength: 1024 * 1024)
32+
])
33+
}
34+
35+
override func tearDown() {
36+
XCTAssertNoThrow(try self.client.finish(acceptAlreadyClosed: false))
37+
XCTAssertNoThrow(try self.server.finish(acceptAlreadyClosed: false))
38+
self.client = nil
39+
self.server = nil
40+
}
41+
42+
func testNextRequest() {
43+
var request: NIOHTTPServerRequestFull?
44+
XCTAssertNoThrow(request = try self.sendRequest(.next))
45+
46+
XCTAssertEqual(request?.head.isKeepAlive, true)
47+
XCTAssertEqual(request?.head.method, .GET)
48+
XCTAssertEqual(request?.head.uri, "/2018-06-01/runtime/invocation/next")
49+
XCTAssertEqual(request?.head.version, .http1_1)
50+
XCTAssertEqual(request?.head.headers["host"], [self.host])
51+
XCTAssertEqual(request?.head.headers["user-agent"], ["Swift-Lambda/Unknown"])
52+
53+
XCTAssertNil(try self.server.readInbound(as: NIOHTTPServerRequestFull.self))
54+
}
55+
56+
func testPostInvocationSuccessWithoutBody() {
57+
let requestID = UUID().uuidString
58+
var request: NIOHTTPServerRequestFull?
59+
XCTAssertNoThrow(request = try self.sendRequest(.invocationResponse(requestID, nil)))
60+
61+
XCTAssertEqual(request?.head.isKeepAlive, true)
62+
XCTAssertEqual(request?.head.method, .POST)
63+
XCTAssertEqual(request?.head.uri, "/2018-06-01/runtime/invocation/\(requestID)/response")
64+
XCTAssertEqual(request?.head.version, .http1_1)
65+
XCTAssertEqual(request?.head.headers["host"], [self.host])
66+
XCTAssertEqual(request?.head.headers["user-agent"], ["Swift-Lambda/Unknown"])
67+
XCTAssertEqual(request?.head.headers["content-length"], ["0"])
68+
69+
XCTAssertNil(try self.server.readInbound(as: NIOHTTPServerRequestFull.self))
70+
}
71+
72+
func testPostInvocationSuccessWithBody() {
73+
let requestID = UUID().uuidString
74+
let payload = ByteBuffer(string: "hello swift lambda!")
75+
76+
var request: NIOHTTPServerRequestFull?
77+
XCTAssertNoThrow(request = try self.sendRequest(.invocationResponse(requestID, payload)))
78+
79+
XCTAssertEqual(request?.head.isKeepAlive, true)
80+
XCTAssertEqual(request?.head.method, .POST)
81+
XCTAssertEqual(request?.head.uri, "/2018-06-01/runtime/invocation/\(requestID)/response")
82+
XCTAssertEqual(request?.head.version, .http1_1)
83+
XCTAssertEqual(request?.head.headers["host"], [self.host])
84+
XCTAssertEqual(request?.head.headers["user-agent"], ["Swift-Lambda/Unknown"])
85+
XCTAssertEqual(request?.head.headers["content-length"], ["\(payload.readableBytes)"])
86+
XCTAssertEqual(request?.body, payload)
87+
88+
XCTAssertNil(try self.server.readInbound(as: NIOHTTPServerRequestFull.self))
89+
}
90+
91+
func testPostInvocationErrorWithBody() {
92+
let requestID = UUID().uuidString
93+
let error = ErrorResponse(errorType: "SomeError", errorMessage: "An error happened")
94+
var request: NIOHTTPServerRequestFull?
95+
XCTAssertNoThrow(request = try self.sendRequest(.invocationError(requestID, error)))
96+
97+
XCTAssertEqual(request?.head.isKeepAlive, true)
98+
XCTAssertEqual(request?.head.method, .POST)
99+
XCTAssertEqual(request?.head.uri, "/2018-06-01/runtime/invocation/\(requestID)/error")
100+
XCTAssertEqual(request?.head.version, .http1_1)
101+
XCTAssertEqual(request?.head.headers["host"], [self.host])
102+
XCTAssertEqual(request?.head.headers["user-agent"], ["Swift-Lambda/Unknown"])
103+
XCTAssertEqual(request?.head.headers["lambda-runtime-function-error-type"], ["Unhandled"])
104+
let expectedBody = #"{"errorType":"SomeError","errorMessage":"An error happened"}"#
105+
106+
XCTAssertEqual(request?.head.headers["content-length"], ["\(expectedBody.utf8.count)"])
107+
XCTAssertEqual(try request?.body?.getString(at: 0, length: XCTUnwrap(request?.body?.readableBytes)),
108+
expectedBody)
109+
110+
XCTAssertNil(try self.server.readInbound(as: NIOHTTPServerRequestFull.self))
111+
}
112+
113+
func testPostStartupError() {
114+
let error = ErrorResponse(errorType: "StartupError", errorMessage: "Urgh! Startup failed. 😨")
115+
var request: NIOHTTPServerRequestFull?
116+
XCTAssertNoThrow(request = try self.sendRequest(.initializationError(error)))
117+
118+
XCTAssertEqual(request?.head.isKeepAlive, true)
119+
XCTAssertEqual(request?.head.method, .POST)
120+
XCTAssertEqual(request?.head.uri, "/2018-06-01/runtime/init/error")
121+
XCTAssertEqual(request?.head.version, .http1_1)
122+
XCTAssertEqual(request?.head.headers["host"], [self.host])
123+
XCTAssertEqual(request?.head.headers["user-agent"], ["Swift-Lambda/Unknown"])
124+
XCTAssertEqual(request?.head.headers["lambda-runtime-function-error-type"], ["Unhandled"])
125+
let expectedBody = #"{"errorType":"StartupError","errorMessage":"Urgh! Startup failed. 😨"}"#
126+
XCTAssertEqual(request?.head.headers["content-length"], ["\(expectedBody.utf8.count)"])
127+
XCTAssertEqual(try request?.body?.getString(at: 0, length: XCTUnwrap(request?.body?.readableBytes)),
128+
expectedBody)
129+
130+
XCTAssertNil(try self.server.readInbound(as: NIOHTTPServerRequestFull.self))
131+
}
132+
133+
func sendRequest(_ request: ControlPlaneRequest) throws -> NIOHTTPServerRequestFull? {
134+
try self.client.writeOutbound(request)
135+
while let part = try self.client.readOutbound(as: ByteBuffer.self) {
136+
XCTAssertNoThrow(try self.server.writeInbound(part))
137+
}
138+
return try self.server.readInbound(as: NIOHTTPServerRequestFull.self)
139+
}
140+
}
141+
142+
private final class ControlPlaneRequestEncoderHandler: ChannelOutboundHandler {
143+
typealias OutboundIn = ControlPlaneRequest
144+
typealias OutboundOut = ByteBuffer
145+
146+
private var encoder: ControlPlaneRequestEncoder
147+
148+
init(host: String) {
149+
self.encoder = ControlPlaneRequestEncoder(host: host)
150+
}
151+
152+
func handlerAdded(context: ChannelHandlerContext) {
153+
self.encoder.writerAdded(context: context)
154+
}
155+
156+
func handlerRemoved(context: ChannelHandlerContext) {
157+
self.encoder.writerRemoved(context: context)
158+
}
159+
160+
func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
161+
self.encoder.writeRequest(self.unwrapOutboundIn(data), context: context, promise: promise)
162+
}
163+
}

0 commit comments

Comments
 (0)