@@ -97,9 +97,17 @@ internal final class HTTPClient {
97
97
private func connect( ) -> EventLoopFuture < Channel > {
98
98
let bootstrap = ClientBootstrap ( group: self . eventLoop)
99
99
. channelInitializer { channel in
100
- channel. pipeline. addHTTPClientHandlers ( ) . flatMap {
101
- channel. pipeline. addHandlers ( [ HTTPHandler ( keepAlive: self . configuration. keepAlive) ,
102
- UnaryHandler ( keepAlive: self . configuration. keepAlive) ] )
100
+ do {
101
+ try channel. pipeline. syncOperations. addHTTPClientHandlers ( )
102
+ // Lambda quotas... An invocation payload is maximal 6MB in size:
103
+ // https://docs.aws.amazon.com/lambda/latest/dg/gettingstarted-limits.html
104
+ try channel. pipeline. syncOperations. addHandler (
105
+ NIOHTTPClientResponseAggregator ( maxContentLength: 6 * 1024 * 1024 ) )
106
+ try channel. pipeline. syncOperations. addHandler (
107
+ UnaryHandler ( keepAlive: self . configuration. keepAlive) )
108
+ return channel. eventLoop. makeSucceededFuture ( ( ) )
109
+ } catch {
110
+ return channel. eventLoop. makeFailedFuture ( error)
103
111
}
104
112
}
105
113
@@ -149,116 +157,54 @@ internal final class HTTPClient {
149
157
}
150
158
}
151
159
152
- private final class HTTPHandler : ChannelDuplexHandler {
153
- typealias OutboundIn = HTTPClient . Request
154
- typealias InboundOut = HTTPClient . Response
155
- typealias InboundIn = HTTPClientResponsePart
160
+ // no need in locks since we validate only one request can run at a time
161
+ private final class UnaryHandler : ChannelDuplexHandler {
162
+ typealias InboundIn = NIOHTTPClientResponseFull
163
+ typealias OutboundIn = HTTPRequestWrapper
156
164
typealias OutboundOut = HTTPClientRequestPart
157
165
158
166
private let keepAlive : Bool
159
- private var readState : ReadState = . idle
167
+
168
+ private var pending : ( promise: EventLoopPromise < HTTPClient . Response > , timeout: Scheduled < Void > ? ) ?
169
+ private var lastError : Error ?
160
170
161
171
init ( keepAlive: Bool ) {
162
172
self . keepAlive = keepAlive
163
173
}
164
174
165
175
func write( context: ChannelHandlerContext , data: NIOAny , promise: EventLoopPromise < Void > ? ) {
166
- let request = unwrapOutboundIn ( data)
167
-
168
- var head = HTTPRequestHead ( version: . init( major: 1 , minor: 1 ) , method: request. method, uri: request. url, headers: request. headers)
169
- head. headers. add ( name: " host " , value: request. targetHost)
170
- switch request. method {
176
+ guard self . pending == nil else {
177
+ preconditionFailure ( " invalid state, outstanding request " )
178
+ }
179
+ let wrapper = unwrapOutboundIn ( data)
180
+
181
+ var head = HTTPRequestHead (
182
+ version: . http1_1,
183
+ method: wrapper. request. method,
184
+ uri: wrapper. request. url,
185
+ headers: wrapper. request. headers)
186
+ head. headers. add ( name: " host " , value: wrapper. request. targetHost)
187
+ switch head. method {
171
188
case . POST, . PUT:
172
- head. headers. add ( name: " content-length " , value: String ( request. body? . readableBytes ?? 0 ) )
189
+ head. headers. add ( name: " content-length " , value: String ( wrapper . request. body? . readableBytes ?? 0 ) )
173
190
default :
174
191
break
175
192
}
176
193
177
194
// We don't add a "Connection" header here if we want to keep the connection open,
178
- // HTTP/1.1 defines specifies the following in RFC 2616 , Section 8.1.2.1 :
195
+ // HTTP/1.1 specified in RFC 7230 , Section 6.3 Persistence :
179
196
//
180
- // An HTTP/1.1 server MAY assume that a HTTP/1.1 client intends to
181
- // maintain a persistent connection unless a Connection header including
182
- // the connection-token "close" was sent in the request. If the server
183
- // chooses to close the connection immediately after sending the
184
- // response, it SHOULD send a Connection header including the
185
- // connection-token close.
197
+ // HTTP/1.1 defaults to the use of "persistent connections", allowing
198
+ // multiple requests and responses to be carried over a single
199
+ // connection. The "close" connection option is used to signal that a
200
+ // connection will not persist after the current request/response. HTTP
201
+ // implementations SHOULD support persistent connections.
186
202
//
187
203
// See also UnaryHandler.channelRead below.
188
204
if !self . keepAlive {
189
205
head. headers. add ( name: " connection " , value: " close " )
190
206
}
191
207
192
- context. write ( self . wrapOutboundOut ( HTTPClientRequestPart . head ( head) ) ) . flatMap { _ -> EventLoopFuture < Void > in
193
- if let body = request. body {
194
- return context. writeAndFlush ( self . wrapOutboundOut ( HTTPClientRequestPart . body ( . byteBuffer( body) ) ) )
195
- } else {
196
- context. flush ( )
197
- return context. eventLoop. makeSucceededFuture ( ( ) )
198
- }
199
- } . cascade ( to: promise)
200
- }
201
-
202
- func channelRead( context: ChannelHandlerContext , data: NIOAny ) {
203
- let response = unwrapInboundIn ( data)
204
-
205
- switch response {
206
- case . head( let head) :
207
- guard case . idle = self . readState else {
208
- preconditionFailure ( " invalid read state \( self . readState) " )
209
- }
210
- self . readState = . head( head)
211
- case . body( var bodyPart) :
212
- switch self . readState {
213
- case . head( let head) :
214
- self . readState = . body( head, bodyPart)
215
- case . body( let head, var body) :
216
- body. writeBuffer ( & bodyPart)
217
- self . readState = . body( head, body)
218
- default :
219
- preconditionFailure ( " invalid read state \( self . readState) " )
220
- }
221
- case . end:
222
- switch self . readState {
223
- case . head( let head) :
224
- context. fireChannelRead ( wrapInboundOut ( HTTPClient . Response ( version: head. version, status: head. status, headers: head. headers, body: nil ) ) )
225
- self . readState = . idle
226
- case . body( let head, let body) :
227
- context. fireChannelRead ( wrapInboundOut ( HTTPClient . Response ( version: head. version, status: head. status, headers: head. headers, body: body) ) )
228
- self . readState = . idle
229
- default :
230
- preconditionFailure ( " invalid read state \( self . readState) " )
231
- }
232
- }
233
- }
234
-
235
- private enum ReadState {
236
- case idle
237
- case head( HTTPResponseHead )
238
- case body( HTTPResponseHead , ByteBuffer )
239
- }
240
- }
241
-
242
- // no need in locks since we validate only one request can run at a time
243
- private final class UnaryHandler : ChannelDuplexHandler {
244
- typealias OutboundIn = HTTPRequestWrapper
245
- typealias InboundIn = HTTPClient . Response
246
- typealias OutboundOut = HTTPClient . Request
247
-
248
- private let keepAlive : Bool
249
-
250
- private var pending : ( promise: EventLoopPromise < HTTPClient . Response > , timeout: Scheduled < Void > ? ) ?
251
- private var lastError : Error ?
252
-
253
- init ( keepAlive: Bool ) {
254
- self . keepAlive = keepAlive
255
- }
256
-
257
- func write( context: ChannelHandlerContext , data: NIOAny , promise: EventLoopPromise < Void > ? ) {
258
- guard self . pending == nil else {
259
- preconditionFailure ( " invalid state, outstanding request " )
260
- }
261
- let wrapper = unwrapOutboundIn ( data)
262
208
let timeoutTask = wrapper. request. timeout. map {
263
209
context. eventLoop. scheduleTask ( in: $0) {
264
210
if self . pending != nil {
@@ -267,15 +213,29 @@ private final class UnaryHandler: ChannelDuplexHandler {
267
213
}
268
214
}
269
215
self . pending = ( promise: wrapper. promise, timeout: timeoutTask)
270
- context. writeAndFlush ( wrapOutboundOut ( wrapper. request) , promise: promise)
216
+
217
+ context. write ( wrapOutboundOut ( . head( head) ) , promise: nil )
218
+ if let body = wrapper. request. body {
219
+ context. write ( wrapOutboundOut ( . body( IOData . byteBuffer ( body) ) ) , promise: nil )
220
+ }
221
+ context. writeAndFlush ( wrapOutboundOut ( . end( nil ) ) , promise: promise)
271
222
}
272
223
273
224
func channelRead( context: ChannelHandlerContext , data: NIOAny ) {
274
- let response = unwrapInboundIn ( data)
275
225
guard let pending = self . pending else {
276
226
preconditionFailure ( " invalid state, no pending request " )
277
227
}
278
-
228
+
229
+ let response = unwrapInboundIn ( data)
230
+
231
+ let httpResponse = HTTPClient . Response (
232
+ version: response. head. version,
233
+ status: response. head. status,
234
+ headers: response. head. headers,
235
+ body: response. body)
236
+
237
+ self . completeWith ( . success( httpResponse) )
238
+
279
239
// As defined in RFC 7230 Section 6.3:
280
240
// HTTP/1.1 defaults to the use of "persistent connections", allowing
281
241
// multiple requests and responses to be carried over a single
@@ -285,14 +245,14 @@ private final class UnaryHandler: ChannelDuplexHandler {
285
245
//
286
246
// That's why we only assume the connection shall be closed if we receive
287
247
// a "connection = close" header.
288
- let serverCloseConnection = response. headers. first ( name: " connection " ) ? . lowercased ( ) == " close "
248
+ let serverCloseConnection =
249
+ response. head. headers [ " connection " ] . contains ( where: { $0. lowercased ( ) == " close " } )
289
250
290
- if !self . keepAlive || serverCloseConnection || response. version != . init ( major : 1 , minor : 1 ) {
251
+ if !self . keepAlive || serverCloseConnection || response. head . version != . http1_1 {
291
252
pending. promise. futureResult. whenComplete { _ in
292
253
_ = context. channel. close ( )
293
254
}
294
255
}
295
- self . completeWith ( . success( response) )
296
256
}
297
257
298
258
func errorCaught( context: ChannelHandlerContext , error: Error ) {
0 commit comments