Skip to content

Commit 03dfd03

Browse files
committed
Fixes
1 parent d021910 commit 03dfd03

File tree

4 files changed

+68
-66
lines changed

4 files changed

+68
-66
lines changed

Sources/AWSLambdaRuntimeCore/HTTPClient.swift

Lines changed: 64 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
//
33
// This source file is part of the SwiftAWSLambdaRuntime open source project
44
//
5-
// Copyright (c) 2017-2018 Apple Inc. and the SwiftAWSLambdaRuntime project authors
5+
// Copyright (c) 2017-2021 Apple Inc. and the SwiftAWSLambdaRuntime project authors
66
// Licensed under Apache License v2.0
77
//
88
// See LICENSE.txt for license information
@@ -103,8 +103,7 @@ internal final class HTTPClient {
103103
// https://docs.aws.amazon.com/lambda/latest/dg/gettingstarted-limits.html
104104
try channel.pipeline.syncOperations.addHandler(
105105
NIOHTTPClientResponseAggregator(maxContentLength: 6 * 1024 * 1024))
106-
try channel.pipeline.syncOperations.addHandler(
107-
UnaryHandler(keepAlive: self.configuration.keepAlive))
106+
try channel.pipeline.syncOperations.addHandler(UnaryHandler())
108107
return channel.eventLoop.makeSucceededFuture(())
109108
} catch {
110109
return channel.eventLoop.makeFailedFuture(error)
@@ -139,10 +138,10 @@ internal final class HTTPClient {
139138
}
140139

141140
internal struct Response: Equatable {
142-
public var version: HTTPVersion
143-
public var status: HTTPResponseStatus
144-
public var headers: HTTPHeaders
145-
public var body: ByteBuffer?
141+
var version: HTTPVersion
142+
var status: HTTPResponseStatus
143+
var headers: HTTPHeaders
144+
var body: ByteBuffer?
146145
}
147146

148147
internal enum Errors: Error {
@@ -163,26 +162,29 @@ private final class UnaryHandler: ChannelDuplexHandler {
163162
typealias OutboundIn = HTTPRequestWrapper
164163
typealias OutboundOut = HTTPClientRequestPart
165164

166-
private let keepAlive: Bool
165+
enum State {
166+
case idle
167+
case running(promise: EventLoopPromise<HTTPClient.Response>, timeout: Scheduled<Void>?)
168+
case waitForConnectionClose(HTTPClient.Response, EventLoopPromise<HTTPClient.Response>)
169+
}
167170

168-
private var pending: (promise: EventLoopPromise<HTTPClient.Response>, timeout: Scheduled<Void>?)?
171+
private var state: State = .idle
169172
private var lastError: Error?
170173

171-
init(keepAlive: Bool) {
172-
self.keepAlive = keepAlive
173-
}
174+
init() {}
174175

175176
func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
176-
guard self.pending == nil else {
177+
guard case .idle = self.state else {
177178
preconditionFailure("invalid state, outstanding request")
178179
}
179180
let wrapper = unwrapOutboundIn(data)
180-
181+
181182
var head = HTTPRequestHead(
182183
version: .http1_1,
183184
method: wrapper.request.method,
184185
uri: wrapper.request.url,
185-
headers: wrapper.request.headers)
186+
headers: wrapper.request.headers
187+
)
186188
head.headers.add(name: "host", value: wrapper.request.targetHost)
187189
switch head.method {
188190
case .POST, .PUT:
@@ -191,29 +193,17 @@ private final class UnaryHandler: ChannelDuplexHandler {
191193
break
192194
}
193195

194-
// We don't add a "Connection" header here if we want to keep the connection open,
195-
// HTTP/1.1 specified in RFC 7230, Section 6.3 Persistence:
196-
//
197-
// HTTP/1.1 defaults to the use of "persistent connections", allowing
198-
// multiple requests and responses to be carried over a single
199-
// connection. The "close" connection option is used to signal that a
200-
// connection will not persist after the current request/response. HTTP
201-
// implementations SHOULD support persistent connections.
202-
//
203-
// See also UnaryHandler.channelRead below.
204-
if !self.keepAlive {
205-
head.headers.add(name: "connection", value: "close")
206-
}
207-
208196
let timeoutTask = wrapper.request.timeout.map {
209197
context.eventLoop.scheduleTask(in: $0) {
210-
if self.pending != nil {
211-
context.pipeline.fireErrorCaught(HTTPClient.Errors.timeout)
198+
guard case .running = self.state else {
199+
preconditionFailure("invalid state")
212200
}
201+
202+
context.pipeline.fireErrorCaught(HTTPClient.Errors.timeout)
213203
}
214204
}
215-
self.pending = (promise: wrapper.promise, timeout: timeoutTask)
216-
205+
self.state = .running(promise: wrapper.promise, timeout: timeoutTask)
206+
217207
context.write(wrapOutboundOut(.head(head)), promise: nil)
218208
if let body = wrapper.request.body {
219209
context.write(wrapOutboundOut(.body(IOData.byteBuffer(body))), promise: nil)
@@ -222,20 +212,21 @@ private final class UnaryHandler: ChannelDuplexHandler {
222212
}
223213

224214
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
225-
guard let pending = self.pending else {
215+
guard case .running(let promise, let timeout) = self.state else {
226216
preconditionFailure("invalid state, no pending request")
227217
}
228-
218+
229219
let response = unwrapInboundIn(data)
230-
220+
231221
let httpResponse = HTTPClient.Response(
232222
version: response.head.version,
233223
status: response.head.status,
234224
headers: response.head.headers,
235-
body: response.body)
236-
237-
self.completeWith(.success(httpResponse))
238-
225+
body: response.body
226+
)
227+
228+
timeout?.cancel()
229+
239230
// As defined in RFC 7230 Section 6.3:
240231
// HTTP/1.1 defaults to the use of "persistent connections", allowing
241232
// multiple requests and responses to be carried over a single
@@ -248,10 +239,15 @@ private final class UnaryHandler: ChannelDuplexHandler {
248239
let serverCloseConnection =
249240
response.head.headers["connection"].contains(where: { $0.lowercased() == "close" })
250241

251-
if !self.keepAlive || serverCloseConnection || response.head.version != .http1_1 {
252-
pending.promise.futureResult.whenComplete { _ in
253-
_ = context.channel.close()
254-
}
242+
let closeConnection = serverCloseConnection || response.head.version != .http1_1
243+
244+
if closeConnection {
245+
self.state = .waitForConnectionClose(httpResponse, promise)
246+
_ = context.channel.close()
247+
return
248+
} else {
249+
self.state = .idle
250+
promise.succeed(httpResponse)
255251
}
256252
}
257253

@@ -263,36 +259,44 @@ private final class UnaryHandler: ChannelDuplexHandler {
263259

264260
func channelInactive(context: ChannelHandlerContext) {
265261
// fail any pending responses with last error or assume peer disconnected
266-
if self.pending != nil {
267-
let error = self.lastError ?? HTTPClient.Errors.connectionResetByPeer
268-
self.completeWith(.failure(error))
269-
}
270262
context.fireChannelInactive()
263+
264+
switch self.state {
265+
case .idle:
266+
break
267+
case .running(let promise, let timeout):
268+
self.state = .idle
269+
timeout?.cancel()
270+
promise.fail(self.lastError ?? HTTPClient.Errors.connectionResetByPeer)
271+
272+
case .waitForConnectionClose(let response, let promise):
273+
self.state = .idle
274+
promise.succeed(response)
275+
}
271276
}
272277

273278
func triggerUserOutboundEvent(context: ChannelHandlerContext, event: Any, promise: EventLoopPromise<Void>?) {
274279
switch event {
275280
case is RequestCancelEvent:
276-
if self.pending != nil {
277-
self.completeWith(.failure(HTTPClient.Errors.cancelled))
281+
switch self.state {
282+
case .idle:
283+
break
284+
case .running(let promise, let timeout):
285+
self.state = .idle
286+
timeout?.cancel()
287+
promise.fail(HTTPClient.Errors.cancelled)
288+
278289
// after the cancel error has been send, we want to close the connection so
279290
// that no more packets can be read on this connection.
280291
_ = context.channel.close()
292+
case .waitForConnectionClose(_, let promise):
293+
self.state = .idle
294+
promise.fail(HTTPClient.Errors.cancelled)
281295
}
282296
default:
283297
context.triggerUserOutboundEvent(event, promise: promise)
284298
}
285299
}
286-
287-
private func completeWith(_ result: Result<HTTPClient.Response, Error>) {
288-
guard let pending = self.pending else {
289-
preconditionFailure("invalid state, no pending request")
290-
}
291-
self.pending = nil
292-
self.lastError = nil
293-
pending.timeout?.cancel()
294-
pending.promise.completeWith(result)
295-
}
296300
}
297301

298302
private struct HTTPRequestWrapper {

Sources/AWSLambdaRuntimeCore/LambdaConfiguration.swift

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
//
33
// This source file is part of the SwiftAWSLambdaRuntime open source project
44
//
5-
// Copyright (c) 2017-2020 Apple Inc. and the SwiftAWSLambdaRuntime project authors
5+
// Copyright (c) 2017-2021 Apple Inc. and the SwiftAWSLambdaRuntime project authors
66
// Licensed under Apache License v2.0
77
//
88
// See LICENSE.txt for license information
@@ -64,7 +64,6 @@ extension Lambda {
6464
struct RuntimeEngine: CustomStringConvertible {
6565
let ip: String
6666
let port: Int
67-
let keepAlive: Bool
6867
let requestTimeout: TimeAmount?
6968

7069
init(address: String? = nil, keepAlive: Bool? = nil, requestTimeout: TimeAmount? = nil) {
@@ -74,12 +73,11 @@ extension Lambda {
7473
}
7574
self.ip = String(ipPort[0])
7675
self.port = port
77-
self.keepAlive = keepAlive ?? env("KEEP_ALIVE").flatMap(Bool.init) ?? true
7876
self.requestTimeout = requestTimeout ?? env("REQUEST_TIMEOUT").flatMap(Int64.init).flatMap { .milliseconds($0) }
7977
}
8078

8179
var description: String {
82-
"\(RuntimeEngine.self)(ip: \(self.ip), port: \(self.port), keepAlive: \(self.keepAlive), requestTimeout: \(String(describing: self.requestTimeout))"
80+
"\(RuntimeEngine.self)(ip: \(self.ip), port: \(self.port), requestTimeout: \(String(describing: self.requestTimeout))"
8381
}
8482
}
8583

Tests/AWSLambdaRuntimeCoreTests/MockLambdaServer.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
//
33
// This source file is part of the SwiftAWSLambdaRuntime open source project
44
//
5-
// Copyright (c) 2017-2018 Apple Inc. and the SwiftAWSLambdaRuntime project authors
5+
// Copyright (c) 2017-2021 Apple Inc. and the SwiftAWSLambdaRuntime project authors
66
// Licensed under Apache License v2.0
77
//
88
// See LICENSE.txt for license information

Tests/AWSLambdaRuntimeCoreTests/Utils.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
//
33
// This source file is part of the SwiftAWSLambdaRuntime open source project
44
//
5-
// Copyright (c) 2017-2018 Apple Inc. and the SwiftAWSLambdaRuntime project authors
5+
// Copyright (c) 2017-2021 Apple Inc. and the SwiftAWSLambdaRuntime project authors
66
// Licensed under Apache License v2.0
77
//
88
// See LICENSE.txt for license information

0 commit comments

Comments
 (0)