From d021910d141f41ccc8b604937a5e841d4ae9ffd6 Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Fri, 9 Apr 2021 15:47:56 +0200 Subject: [PATCH 1/2] Performance improvements --- Package.swift | 2 +- Sources/AWSLambdaRuntimeCore/HTTPClient.swift | 154 +++++++----------- .../MockLambdaServer.swift | 2 +- Tests/AWSLambdaRuntimeCoreTests/Utils.swift | 2 +- 4 files changed, 60 insertions(+), 100 deletions(-) diff --git a/Package.swift b/Package.swift index 0e5823f6..4f6ebc22 100644 --- a/Package.swift +++ b/Package.swift @@ -15,7 +15,7 @@ let package = Package( .library(name: "AWSLambdaTesting", targets: ["AWSLambdaTesting"]), ], dependencies: [ - .package(url: "https://github.com/apple/swift-nio.git", .upToNextMajor(from: "2.17.0")), + .package(url: "https://github.com/apple/swift-nio.git", .upToNextMajor(from: "2.26.0")), .package(url: "https://github.com/apple/swift-log.git", .upToNextMajor(from: "1.0.0")), .package(url: "https://github.com/swift-server/swift-backtrace.git", .upToNextMajor(from: "1.1.0")), ], diff --git a/Sources/AWSLambdaRuntimeCore/HTTPClient.swift b/Sources/AWSLambdaRuntimeCore/HTTPClient.swift index fcd2a450..259dd58e 100644 --- a/Sources/AWSLambdaRuntimeCore/HTTPClient.swift +++ b/Sources/AWSLambdaRuntimeCore/HTTPClient.swift @@ -97,9 +97,17 @@ internal final class HTTPClient { private func connect() -> EventLoopFuture { let bootstrap = ClientBootstrap(group: self.eventLoop) .channelInitializer { channel in - channel.pipeline.addHTTPClientHandlers().flatMap { - channel.pipeline.addHandlers([HTTPHandler(keepAlive: self.configuration.keepAlive), - UnaryHandler(keepAlive: self.configuration.keepAlive)]) + do { + try channel.pipeline.syncOperations.addHTTPClientHandlers() + // Lambda quotas... An invocation payload is maximal 6MB in size: + // https://docs.aws.amazon.com/lambda/latest/dg/gettingstarted-limits.html + try channel.pipeline.syncOperations.addHandler( + NIOHTTPClientResponseAggregator(maxContentLength: 6 * 1024 * 1024)) + try channel.pipeline.syncOperations.addHandler( + UnaryHandler(keepAlive: self.configuration.keepAlive)) + return channel.eventLoop.makeSucceededFuture(()) + } catch { + return channel.eventLoop.makeFailedFuture(error) } } @@ -149,116 +157,54 @@ internal final class HTTPClient { } } -private final class HTTPHandler: ChannelDuplexHandler { - typealias OutboundIn = HTTPClient.Request - typealias InboundOut = HTTPClient.Response - typealias InboundIn = HTTPClientResponsePart +// no need in locks since we validate only one request can run at a time +private final class UnaryHandler: ChannelDuplexHandler { + typealias InboundIn = NIOHTTPClientResponseFull + typealias OutboundIn = HTTPRequestWrapper typealias OutboundOut = HTTPClientRequestPart private let keepAlive: Bool - private var readState: ReadState = .idle + + private var pending: (promise: EventLoopPromise, timeout: Scheduled?)? + private var lastError: Error? init(keepAlive: Bool) { self.keepAlive = keepAlive } func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise?) { - let request = unwrapOutboundIn(data) - - var head = HTTPRequestHead(version: .init(major: 1, minor: 1), method: request.method, uri: request.url, headers: request.headers) - head.headers.add(name: "host", value: request.targetHost) - switch request.method { + guard self.pending == nil else { + preconditionFailure("invalid state, outstanding request") + } + let wrapper = unwrapOutboundIn(data) + + var head = HTTPRequestHead( + version: .http1_1, + method: wrapper.request.method, + uri: wrapper.request.url, + headers: wrapper.request.headers) + head.headers.add(name: "host", value: wrapper.request.targetHost) + switch head.method { case .POST, .PUT: - head.headers.add(name: "content-length", value: String(request.body?.readableBytes ?? 0)) + head.headers.add(name: "content-length", value: String(wrapper.request.body?.readableBytes ?? 0)) default: break } // We don't add a "Connection" header here if we want to keep the connection open, - // HTTP/1.1 defines specifies the following in RFC 2616, Section 8.1.2.1: + // HTTP/1.1 specified in RFC 7230, Section 6.3 Persistence: // - // An HTTP/1.1 server MAY assume that a HTTP/1.1 client intends to - // maintain a persistent connection unless a Connection header including - // the connection-token "close" was sent in the request. If the server - // chooses to close the connection immediately after sending the - // response, it SHOULD send a Connection header including the - // connection-token close. + // HTTP/1.1 defaults to the use of "persistent connections", allowing + // multiple requests and responses to be carried over a single + // connection. The "close" connection option is used to signal that a + // connection will not persist after the current request/response. HTTP + // implementations SHOULD support persistent connections. // // See also UnaryHandler.channelRead below. if !self.keepAlive { head.headers.add(name: "connection", value: "close") } - context.write(self.wrapOutboundOut(HTTPClientRequestPart.head(head))).flatMap { _ -> EventLoopFuture in - if let body = request.body { - return context.writeAndFlush(self.wrapOutboundOut(HTTPClientRequestPart.body(.byteBuffer(body)))) - } else { - context.flush() - return context.eventLoop.makeSucceededFuture(()) - } - }.cascade(to: promise) - } - - func channelRead(context: ChannelHandlerContext, data: NIOAny) { - let response = unwrapInboundIn(data) - - switch response { - case .head(let head): - guard case .idle = self.readState else { - preconditionFailure("invalid read state \(self.readState)") - } - self.readState = .head(head) - case .body(var bodyPart): - switch self.readState { - case .head(let head): - self.readState = .body(head, bodyPart) - case .body(let head, var body): - body.writeBuffer(&bodyPart) - self.readState = .body(head, body) - default: - preconditionFailure("invalid read state \(self.readState)") - } - case .end: - switch self.readState { - case .head(let head): - context.fireChannelRead(wrapInboundOut(HTTPClient.Response(version: head.version, status: head.status, headers: head.headers, body: nil))) - self.readState = .idle - case .body(let head, let body): - context.fireChannelRead(wrapInboundOut(HTTPClient.Response(version: head.version, status: head.status, headers: head.headers, body: body))) - self.readState = .idle - default: - preconditionFailure("invalid read state \(self.readState)") - } - } - } - - private enum ReadState { - case idle - case head(HTTPResponseHead) - case body(HTTPResponseHead, ByteBuffer) - } -} - -// no need in locks since we validate only one request can run at a time -private final class UnaryHandler: ChannelDuplexHandler { - typealias OutboundIn = HTTPRequestWrapper - typealias InboundIn = HTTPClient.Response - typealias OutboundOut = HTTPClient.Request - - private let keepAlive: Bool - - private var pending: (promise: EventLoopPromise, timeout: Scheduled?)? - private var lastError: Error? - - init(keepAlive: Bool) { - self.keepAlive = keepAlive - } - - func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise?) { - guard self.pending == nil else { - preconditionFailure("invalid state, outstanding request") - } - let wrapper = unwrapOutboundIn(data) let timeoutTask = wrapper.request.timeout.map { context.eventLoop.scheduleTask(in: $0) { if self.pending != nil { @@ -267,15 +213,29 @@ private final class UnaryHandler: ChannelDuplexHandler { } } self.pending = (promise: wrapper.promise, timeout: timeoutTask) - context.writeAndFlush(wrapOutboundOut(wrapper.request), promise: promise) + + context.write(wrapOutboundOut(.head(head)), promise: nil) + if let body = wrapper.request.body { + context.write(wrapOutboundOut(.body(IOData.byteBuffer(body))), promise: nil) + } + context.writeAndFlush(wrapOutboundOut(.end(nil)), promise: promise) } func channelRead(context: ChannelHandlerContext, data: NIOAny) { - let response = unwrapInboundIn(data) guard let pending = self.pending else { preconditionFailure("invalid state, no pending request") } - + + let response = unwrapInboundIn(data) + + let httpResponse = HTTPClient.Response( + version: response.head.version, + status: response.head.status, + headers: response.head.headers, + body: response.body) + + self.completeWith(.success(httpResponse)) + // As defined in RFC 7230 Section 6.3: // HTTP/1.1 defaults to the use of "persistent connections", allowing // multiple requests and responses to be carried over a single @@ -285,14 +245,14 @@ private final class UnaryHandler: ChannelDuplexHandler { // // That's why we only assume the connection shall be closed if we receive // a "connection = close" header. - let serverCloseConnection = response.headers.first(name: "connection")?.lowercased() == "close" + let serverCloseConnection = + response.head.headers["connection"].contains(where: { $0.lowercased() == "close" }) - if !self.keepAlive || serverCloseConnection || response.version != .init(major: 1, minor: 1) { + if !self.keepAlive || serverCloseConnection || response.head.version != .http1_1 { pending.promise.futureResult.whenComplete { _ in _ = context.channel.close() } } - self.completeWith(.success(response)) } func errorCaught(context: ChannelHandlerContext, error: Error) { diff --git a/Tests/AWSLambdaRuntimeCoreTests/MockLambdaServer.swift b/Tests/AWSLambdaRuntimeCoreTests/MockLambdaServer.swift index 32bbb52e..56aff6bb 100644 --- a/Tests/AWSLambdaRuntimeCoreTests/MockLambdaServer.swift +++ b/Tests/AWSLambdaRuntimeCoreTests/MockLambdaServer.swift @@ -30,7 +30,7 @@ internal final class MockLambdaServer { private var shutdown = false public init(behavior: LambdaServerBehavior, host: String = "127.0.0.1", port: Int = 7000, keepAlive: Bool = true) { - self.group = MultiThreadedEventLoopGroup(numberOfThreads: System.coreCount) + self.group = MultiThreadedEventLoopGroup(numberOfThreads: 1) self.behavior = behavior self.host = host self.port = port diff --git a/Tests/AWSLambdaRuntimeCoreTests/Utils.swift b/Tests/AWSLambdaRuntimeCoreTests/Utils.swift index 7fc9f9ef..4f2e6a01 100644 --- a/Tests/AWSLambdaRuntimeCoreTests/Utils.swift +++ b/Tests/AWSLambdaRuntimeCoreTests/Utils.swift @@ -22,7 +22,7 @@ func runLambda(behavior: LambdaServerBehavior, handler: Lambda.Handler) throws { } func runLambda(behavior: LambdaServerBehavior, factory: @escaping Lambda.HandlerFactory) throws { - let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: System.coreCount) + let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) } let logger = Logger(label: "TestLogger") let configuration = Lambda.Configuration(runtimeEngine: .init(requestTimeout: .milliseconds(100))) From 9a64ed9c96a237f077fc9558749d6bb5a44dd2d1 Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Fri, 9 Apr 2021 16:22:36 +0200 Subject: [PATCH 2/2] Fixes --- Sources/AWSLambdaRuntimeCore/HTTPClient.swift | 138 ++++++++++-------- .../LambdaConfiguration.swift | 6 +- .../MockLambdaServer.swift | 2 +- Tests/AWSLambdaRuntimeCoreTests/Utils.swift | 2 +- scripts/soundness.sh | 2 +- 5 files changed, 82 insertions(+), 68 deletions(-) diff --git a/Sources/AWSLambdaRuntimeCore/HTTPClient.swift b/Sources/AWSLambdaRuntimeCore/HTTPClient.swift index 259dd58e..1fc62b65 100644 --- a/Sources/AWSLambdaRuntimeCore/HTTPClient.swift +++ b/Sources/AWSLambdaRuntimeCore/HTTPClient.swift @@ -2,7 +2,7 @@ // // This source file is part of the SwiftAWSLambdaRuntime open source project // -// Copyright (c) 2017-2018 Apple Inc. and the SwiftAWSLambdaRuntime project authors +// Copyright (c) 2017-2021 Apple Inc. and the SwiftAWSLambdaRuntime project authors // Licensed under Apache License v2.0 // // See LICENSE.txt for license information @@ -103,8 +103,7 @@ internal final class HTTPClient { // https://docs.aws.amazon.com/lambda/latest/dg/gettingstarted-limits.html try channel.pipeline.syncOperations.addHandler( NIOHTTPClientResponseAggregator(maxContentLength: 6 * 1024 * 1024)) - try channel.pipeline.syncOperations.addHandler( - UnaryHandler(keepAlive: self.configuration.keepAlive)) + try channel.pipeline.syncOperations.addHandler(LambdaChannelHandler()) return channel.eventLoop.makeSucceededFuture(()) } catch { return channel.eventLoop.makeFailedFuture(error) @@ -139,10 +138,10 @@ internal final class HTTPClient { } internal struct Response: Equatable { - public var version: HTTPVersion - public var status: HTTPResponseStatus - public var headers: HTTPHeaders - public var body: ByteBuffer? + var version: HTTPVersion + var status: HTTPResponseStatus + var headers: HTTPHeaders + var body: ByteBuffer? } internal enum Errors: Error { @@ -158,31 +157,34 @@ internal final class HTTPClient { } // no need in locks since we validate only one request can run at a time -private final class UnaryHandler: ChannelDuplexHandler { +private final class LambdaChannelHandler: ChannelDuplexHandler { typealias InboundIn = NIOHTTPClientResponseFull typealias OutboundIn = HTTPRequestWrapper typealias OutboundOut = HTTPClientRequestPart - private let keepAlive: Bool + enum State { + case idle + case running(promise: EventLoopPromise, timeout: Scheduled?) + case waitForConnectionClose(HTTPClient.Response, EventLoopPromise) + } - private var pending: (promise: EventLoopPromise, timeout: Scheduled?)? + private var state: State = .idle private var lastError: Error? - init(keepAlive: Bool) { - self.keepAlive = keepAlive - } + init() {} func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise?) { - guard self.pending == nil else { + guard case .idle = self.state else { preconditionFailure("invalid state, outstanding request") } let wrapper = unwrapOutboundIn(data) - + var head = HTTPRequestHead( version: .http1_1, method: wrapper.request.method, uri: wrapper.request.url, - headers: wrapper.request.headers) + headers: wrapper.request.headers + ) head.headers.add(name: "host", value: wrapper.request.targetHost) switch head.method { case .POST, .PUT: @@ -191,29 +193,17 @@ private final class UnaryHandler: ChannelDuplexHandler { break } - // We don't add a "Connection" header here if we want to keep the connection open, - // HTTP/1.1 specified in RFC 7230, Section 6.3 Persistence: - // - // HTTP/1.1 defaults to the use of "persistent connections", allowing - // multiple requests and responses to be carried over a single - // connection. The "close" connection option is used to signal that a - // connection will not persist after the current request/response. HTTP - // implementations SHOULD support persistent connections. - // - // See also UnaryHandler.channelRead below. - if !self.keepAlive { - head.headers.add(name: "connection", value: "close") - } - let timeoutTask = wrapper.request.timeout.map { context.eventLoop.scheduleTask(in: $0) { - if self.pending != nil { - context.pipeline.fireErrorCaught(HTTPClient.Errors.timeout) + guard case .running = self.state else { + preconditionFailure("invalid state") } + + context.pipeline.fireErrorCaught(HTTPClient.Errors.timeout) } } - self.pending = (promise: wrapper.promise, timeout: timeoutTask) - + self.state = .running(promise: wrapper.promise, timeout: timeoutTask) + context.write(wrapOutboundOut(.head(head)), promise: nil) if let body = wrapper.request.body { context.write(wrapOutboundOut(.body(IOData.byteBuffer(body))), promise: nil) @@ -222,20 +212,21 @@ private final class UnaryHandler: ChannelDuplexHandler { } func channelRead(context: ChannelHandlerContext, data: NIOAny) { - guard let pending = self.pending else { + guard case .running(let promise, let timeout) = self.state else { preconditionFailure("invalid state, no pending request") } - + let response = unwrapInboundIn(data) - + let httpResponse = HTTPClient.Response( version: response.head.version, status: response.head.status, headers: response.head.headers, - body: response.body) - - self.completeWith(.success(httpResponse)) - + body: response.body + ) + + timeout?.cancel() + // As defined in RFC 7230 Section 6.3: // HTTP/1.1 defaults to the use of "persistent connections", allowing // multiple requests and responses to be carried over a single @@ -248,10 +239,27 @@ private final class UnaryHandler: ChannelDuplexHandler { let serverCloseConnection = response.head.headers["connection"].contains(where: { $0.lowercased() == "close" }) - if !self.keepAlive || serverCloseConnection || response.head.version != .http1_1 { - pending.promise.futureResult.whenComplete { _ in - _ = context.channel.close() - } + let closeConnection = serverCloseConnection || response.head.version != .http1_1 + + if closeConnection { + // If we were succeeding the request promise here directly and closing the connection + // after succeeding the promise we may run into a race condition: + // + // The lambda runtime will ask for the next work item directly after a succeeded post + // response request. The desire for the next work item might be faster than the attempt + // to close the connection. This will lead to a situation where we try to the connection + // but the next request has already been scheduled on the connection that we want to + // close. For this reason we postpone succeeding the promise until the connection has + // been closed. This codepath will only be hit in the very, very unlikely event of the + // Lambda control plane demanding to close connection. (It's more or less only + // implemented to support http1.1 correctly.) This behavior is ensured with the test + // `LambdaTest.testNoKeepAliveServer`. + self.state = .waitForConnectionClose(httpResponse, promise) + _ = context.channel.close() + return + } else { + self.state = .idle + promise.succeed(httpResponse) } } @@ -263,36 +271,44 @@ private final class UnaryHandler: ChannelDuplexHandler { func channelInactive(context: ChannelHandlerContext) { // fail any pending responses with last error or assume peer disconnected - if self.pending != nil { - let error = self.lastError ?? HTTPClient.Errors.connectionResetByPeer - self.completeWith(.failure(error)) - } context.fireChannelInactive() + + switch self.state { + case .idle: + break + case .running(let promise, let timeout): + self.state = .idle + timeout?.cancel() + promise.fail(self.lastError ?? HTTPClient.Errors.connectionResetByPeer) + + case .waitForConnectionClose(let response, let promise): + self.state = .idle + promise.succeed(response) + } } func triggerUserOutboundEvent(context: ChannelHandlerContext, event: Any, promise: EventLoopPromise?) { switch event { case is RequestCancelEvent: - if self.pending != nil { - self.completeWith(.failure(HTTPClient.Errors.cancelled)) + switch self.state { + case .idle: + break + case .running(let promise, let timeout): + self.state = .idle + timeout?.cancel() + promise.fail(HTTPClient.Errors.cancelled) + // after the cancel error has been send, we want to close the connection so // that no more packets can be read on this connection. _ = context.channel.close() + case .waitForConnectionClose(_, let promise): + self.state = .idle + promise.fail(HTTPClient.Errors.cancelled) } default: context.triggerUserOutboundEvent(event, promise: promise) } } - - private func completeWith(_ result: Result) { - guard let pending = self.pending else { - preconditionFailure("invalid state, no pending request") - } - self.pending = nil - self.lastError = nil - pending.timeout?.cancel() - pending.promise.completeWith(result) - } } private struct HTTPRequestWrapper { diff --git a/Sources/AWSLambdaRuntimeCore/LambdaConfiguration.swift b/Sources/AWSLambdaRuntimeCore/LambdaConfiguration.swift index 9b9ec8fb..39a3fb96 100644 --- a/Sources/AWSLambdaRuntimeCore/LambdaConfiguration.swift +++ b/Sources/AWSLambdaRuntimeCore/LambdaConfiguration.swift @@ -2,7 +2,7 @@ // // This source file is part of the SwiftAWSLambdaRuntime open source project // -// Copyright (c) 2017-2020 Apple Inc. and the SwiftAWSLambdaRuntime project authors +// Copyright (c) 2017-2021 Apple Inc. and the SwiftAWSLambdaRuntime project authors // Licensed under Apache License v2.0 // // See LICENSE.txt for license information @@ -64,7 +64,6 @@ extension Lambda { struct RuntimeEngine: CustomStringConvertible { let ip: String let port: Int - let keepAlive: Bool let requestTimeout: TimeAmount? init(address: String? = nil, keepAlive: Bool? = nil, requestTimeout: TimeAmount? = nil) { @@ -74,12 +73,11 @@ extension Lambda { } self.ip = String(ipPort[0]) self.port = port - self.keepAlive = keepAlive ?? env("KEEP_ALIVE").flatMap(Bool.init) ?? true self.requestTimeout = requestTimeout ?? env("REQUEST_TIMEOUT").flatMap(Int64.init).flatMap { .milliseconds($0) } } var description: String { - "\(RuntimeEngine.self)(ip: \(self.ip), port: \(self.port), keepAlive: \(self.keepAlive), requestTimeout: \(String(describing: self.requestTimeout))" + "\(RuntimeEngine.self)(ip: \(self.ip), port: \(self.port), requestTimeout: \(String(describing: self.requestTimeout))" } } diff --git a/Tests/AWSLambdaRuntimeCoreTests/MockLambdaServer.swift b/Tests/AWSLambdaRuntimeCoreTests/MockLambdaServer.swift index 56aff6bb..9f543e66 100644 --- a/Tests/AWSLambdaRuntimeCoreTests/MockLambdaServer.swift +++ b/Tests/AWSLambdaRuntimeCoreTests/MockLambdaServer.swift @@ -2,7 +2,7 @@ // // This source file is part of the SwiftAWSLambdaRuntime open source project // -// Copyright (c) 2017-2018 Apple Inc. and the SwiftAWSLambdaRuntime project authors +// Copyright (c) 2017-2021 Apple Inc. and the SwiftAWSLambdaRuntime project authors // Licensed under Apache License v2.0 // // See LICENSE.txt for license information diff --git a/Tests/AWSLambdaRuntimeCoreTests/Utils.swift b/Tests/AWSLambdaRuntimeCoreTests/Utils.swift index 4f2e6a01..e3be55d5 100644 --- a/Tests/AWSLambdaRuntimeCoreTests/Utils.swift +++ b/Tests/AWSLambdaRuntimeCoreTests/Utils.swift @@ -2,7 +2,7 @@ // // This source file is part of the SwiftAWSLambdaRuntime open source project // -// Copyright (c) 2017-2018 Apple Inc. and the SwiftAWSLambdaRuntime project authors +// Copyright (c) 2017-2021 Apple Inc. and the SwiftAWSLambdaRuntime project authors // Licensed under Apache License v2.0 // // See LICENSE.txt for license information diff --git a/scripts/soundness.sh b/scripts/soundness.sh index dbec227b..71d43e44 100755 --- a/scripts/soundness.sh +++ b/scripts/soundness.sh @@ -19,7 +19,7 @@ here="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" function replace_acceptable_years() { # this needs to replace all acceptable forms with 'YEARS' - sed -e 's/2017-2018/YEARS/' -e 's/2017-2020/YEARS/' -e 's/2019/YEARS/' -e 's/2020/YEARS/' + sed -e 's/2017-2018/YEARS/' -e 's/2017-2020/YEARS/' -e 's/2017-2021/YEARS/' -e 's/2019/YEARS/' -e 's/2020/YEARS/' } printf "=> Checking for unacceptable language... "