2
2
//
3
3
// This source file is part of the SwiftAWSLambdaRuntime open source project
4
4
//
5
- // Copyright (c) 2017-2018 Apple Inc. and the SwiftAWSLambdaRuntime project authors
5
+ // Copyright (c) 2017-2021 Apple Inc. and the SwiftAWSLambdaRuntime project authors
6
6
// Licensed under Apache License v2.0
7
7
//
8
8
// See LICENSE.txt for license information
@@ -103,8 +103,7 @@ internal final class HTTPClient {
103
103
// https://docs.aws.amazon.com/lambda/latest/dg/gettingstarted-limits.html
104
104
try channel. pipeline. syncOperations. addHandler (
105
105
NIOHTTPClientResponseAggregator ( maxContentLength: 6 * 1024 * 1024 ) )
106
- try channel. pipeline. syncOperations. addHandler (
107
- UnaryHandler ( keepAlive: self . configuration. keepAlive) )
106
+ try channel. pipeline. syncOperations. addHandler ( UnaryHandler ( ) )
108
107
return channel. eventLoop. makeSucceededFuture ( ( ) )
109
108
} catch {
110
109
return channel. eventLoop. makeFailedFuture ( error)
@@ -139,10 +138,10 @@ internal final class HTTPClient {
139
138
}
140
139
141
140
internal struct Response : Equatable {
142
- public var version : HTTPVersion
143
- public var status : HTTPResponseStatus
144
- public var headers : HTTPHeaders
145
- public var body : ByteBuffer ?
141
+ var version : HTTPVersion
142
+ var status : HTTPResponseStatus
143
+ var headers : HTTPHeaders
144
+ var body : ByteBuffer ?
146
145
}
147
146
148
147
internal enum Errors : Error {
@@ -163,26 +162,29 @@ private final class UnaryHandler: ChannelDuplexHandler {
163
162
typealias OutboundIn = HTTPRequestWrapper
164
163
typealias OutboundOut = HTTPClientRequestPart
165
164
166
- private let keepAlive : Bool
165
+ enum State {
166
+ case idle
167
+ case running( promise: EventLoopPromise < HTTPClient . Response > , timeout: Scheduled < Void > ? )
168
+ case waitForConnectionClose( HTTPClient . Response , EventLoopPromise < HTTPClient . Response > )
169
+ }
167
170
168
- private var pending : ( promise : EventLoopPromise < HTTPClient . Response > , timeout : Scheduled < Void > ? ) ?
171
+ private var state : State = . idle
169
172
private var lastError : Error ?
170
173
171
- init ( keepAlive: Bool ) {
172
- self . keepAlive = keepAlive
173
- }
174
+ init ( ) { }
174
175
175
176
func write( context: ChannelHandlerContext , data: NIOAny , promise: EventLoopPromise < Void > ? ) {
176
- guard self . pending == nil else {
177
+ guard case . idle = self . state else {
177
178
preconditionFailure ( " invalid state, outstanding request " )
178
179
}
179
180
let wrapper = unwrapOutboundIn ( data)
180
-
181
+
181
182
var head = HTTPRequestHead (
182
183
version: . http1_1,
183
184
method: wrapper. request. method,
184
185
uri: wrapper. request. url,
185
- headers: wrapper. request. headers)
186
+ headers: wrapper. request. headers
187
+ )
186
188
head. headers. add ( name: " host " , value: wrapper. request. targetHost)
187
189
switch head. method {
188
190
case . POST, . PUT:
@@ -191,29 +193,17 @@ private final class UnaryHandler: ChannelDuplexHandler {
191
193
break
192
194
}
193
195
194
- // We don't add a "Connection" header here if we want to keep the connection open,
195
- // HTTP/1.1 specified in RFC 7230, Section 6.3 Persistence:
196
- //
197
- // HTTP/1.1 defaults to the use of "persistent connections", allowing
198
- // multiple requests and responses to be carried over a single
199
- // connection. The "close" connection option is used to signal that a
200
- // connection will not persist after the current request/response. HTTP
201
- // implementations SHOULD support persistent connections.
202
- //
203
- // See also UnaryHandler.channelRead below.
204
- if !self . keepAlive {
205
- head. headers. add ( name: " connection " , value: " close " )
206
- }
207
-
208
196
let timeoutTask = wrapper. request. timeout. map {
209
197
context. eventLoop. scheduleTask ( in: $0) {
210
- if self . pending != nil {
211
- context . pipeline . fireErrorCaught ( HTTPClient . Errors . timeout )
198
+ guard case . running = self . state else {
199
+ preconditionFailure ( " invalid state " )
212
200
}
201
+
202
+ context. pipeline. fireErrorCaught ( HTTPClient . Errors. timeout)
213
203
}
214
204
}
215
- self . pending = ( promise: wrapper. promise, timeout: timeoutTask)
216
-
205
+ self . state = . running ( promise: wrapper. promise, timeout: timeoutTask)
206
+
217
207
context. write ( wrapOutboundOut ( . head( head) ) , promise: nil )
218
208
if let body = wrapper. request. body {
219
209
context. write ( wrapOutboundOut ( . body( IOData . byteBuffer ( body) ) ) , promise: nil )
@@ -222,20 +212,21 @@ private final class UnaryHandler: ChannelDuplexHandler {
222
212
}
223
213
224
214
func channelRead( context: ChannelHandlerContext , data: NIOAny ) {
225
- guard let pending = self . pending else {
215
+ guard case . running ( let promise , let timeout ) = self . state else {
226
216
preconditionFailure ( " invalid state, no pending request " )
227
217
}
228
-
218
+
229
219
let response = unwrapInboundIn ( data)
230
-
220
+
231
221
let httpResponse = HTTPClient . Response (
232
222
version: response. head. version,
233
223
status: response. head. status,
234
224
headers: response. head. headers,
235
- body: response. body)
236
-
237
- self . completeWith ( . success( httpResponse) )
238
-
225
+ body: response. body
226
+ )
227
+
228
+ timeout? . cancel ( )
229
+
239
230
// As defined in RFC 7230 Section 6.3:
240
231
// HTTP/1.1 defaults to the use of "persistent connections", allowing
241
232
// multiple requests and responses to be carried over a single
@@ -248,10 +239,27 @@ private final class UnaryHandler: ChannelDuplexHandler {
248
239
let serverCloseConnection =
249
240
response. head. headers [ " connection " ] . contains ( where: { $0. lowercased ( ) == " close " } )
250
241
251
- if !self . keepAlive || serverCloseConnection || response. head. version != . http1_1 {
252
- pending. promise. futureResult. whenComplete { _ in
253
- _ = context. channel. close ( )
254
- }
242
+ let closeConnection = serverCloseConnection || response. head. version != . http1_1
243
+
244
+ if closeConnection {
245
+ // If we were succeeding the request promise here directly and closing the connection
246
+ // after succeeding the promise we may run into a race condition:
247
+ //
248
+ // The lambda runtime will ask for the next work item directly after a succeeded post
249
+ // response request. The desire for the next work item might be faster than the attempt
250
+ // to close the connection. This will lead to a situation where we try to the connection
251
+ // but the next request has already been scheduled on the connection that we want to
252
+ // close. For this reason we postpone succeeding the promise until the connection has
253
+ // been closed. This codepath will only be hit in the very, very unlikely event of the
254
+ // Lambda control plane demanding to close connection. (It's more or less only
255
+ // implemented to support http1.1 correctly.) This behavior is ensured with the test
256
+ // `LambdaTest.testNoKeepAliveServer`.
257
+ self . state = . waitForConnectionClose( httpResponse, promise)
258
+ _ = context. channel. close ( )
259
+ return
260
+ } else {
261
+ self . state = . idle
262
+ promise. succeed ( httpResponse)
255
263
}
256
264
}
257
265
@@ -263,36 +271,44 @@ private final class UnaryHandler: ChannelDuplexHandler {
263
271
264
272
func channelInactive( context: ChannelHandlerContext ) {
265
273
// fail any pending responses with last error or assume peer disconnected
266
- if self . pending != nil {
267
- let error = self . lastError ?? HTTPClient . Errors. connectionResetByPeer
268
- self . completeWith ( . failure( error) )
269
- }
270
274
context. fireChannelInactive ( )
275
+
276
+ switch self . state {
277
+ case . idle:
278
+ break
279
+ case . running( let promise, let timeout) :
280
+ self . state = . idle
281
+ timeout? . cancel ( )
282
+ promise. fail ( self . lastError ?? HTTPClient . Errors. connectionResetByPeer)
283
+
284
+ case . waitForConnectionClose( let response, let promise) :
285
+ self . state = . idle
286
+ promise. succeed ( response)
287
+ }
271
288
}
272
289
273
290
func triggerUserOutboundEvent( context: ChannelHandlerContext , event: Any , promise: EventLoopPromise < Void > ? ) {
274
291
switch event {
275
292
case is RequestCancelEvent :
276
- if self . pending != nil {
277
- self . completeWith ( . failure( HTTPClient . Errors. cancelled) )
293
+ switch self . state {
294
+ case . idle:
295
+ break
296
+ case . running( let promise, let timeout) :
297
+ self . state = . idle
298
+ timeout? . cancel ( )
299
+ promise. fail ( HTTPClient . Errors. cancelled)
300
+
278
301
// after the cancel error has been send, we want to close the connection so
279
302
// that no more packets can be read on this connection.
280
303
_ = context. channel. close ( )
304
+ case . waitForConnectionClose( _, let promise) :
305
+ self . state = . idle
306
+ promise. fail ( HTTPClient . Errors. cancelled)
281
307
}
282
308
default :
283
309
context. triggerUserOutboundEvent ( event, promise: promise)
284
310
}
285
311
}
286
-
287
- private func completeWith( _ result: Result < HTTPClient . Response , Error > ) {
288
- guard let pending = self . pending else {
289
- preconditionFailure ( " invalid state, no pending request " )
290
- }
291
- self . pending = nil
292
- self . lastError = nil
293
- pending. timeout? . cancel ( )
294
- pending. promise. completeWith ( result)
295
- }
296
312
}
297
313
298
314
private struct HTTPRequestWrapper {
0 commit comments