Skip to content

Commit 5df5b0c

Browse files
committed
Cancelling works.
1 parent 9fadc4a commit 5df5b0c

File tree

5 files changed

+68
-12
lines changed

5 files changed

+68
-12
lines changed

Sources/AWSLambdaRuntimeCore/HTTPClient.swift

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,20 @@ internal final class HTTPClient {
4848
timeout: timeout ?? self.configuration.requestTimeout))
4949
}
5050

51+
/// cancels the current request if there is one
52+
func cancel() {
53+
guard self.executing.exchange(with: true) else {
54+
// there is no request running. nothing to cancel
55+
return
56+
}
57+
58+
guard case .connected(let channel) = self.state else {
59+
preconditionFailure("if we are executing, we expect to have an open channel")
60+
}
61+
62+
channel.triggerUserOutboundEvent(RequestCancelEvent(), promise: nil)
63+
}
64+
5165
// TODO: cap reconnect attempt
5266
private func execute(_ request: Request, validate: Bool = true) -> EventLoopFuture<Response> {
5367
precondition(!validate || self.executing.compareAndExchange(expected: false, desired: true), "expecting single request at a time")
@@ -120,6 +134,7 @@ internal final class HTTPClient {
120134
internal enum Errors: Error {
121135
case connectionResetByPeer
122136
case timeout
137+
case cancelled
123138
}
124139

125140
private enum State {
@@ -284,6 +299,18 @@ private final class UnaryHandler: ChannelDuplexHandler {
284299
context.fireChannelInactive()
285300
}
286301

302+
func triggerUserOutboundEvent(context: ChannelHandlerContext, event: Any, promise: EventLoopPromise<Void>?) {
303+
switch event {
304+
case is RequestCancelEvent:
305+
guard self.pending != nil else {
306+
return
307+
}
308+
self.completeWith(.failure(HTTPClient.Errors.cancelled))
309+
default:
310+
context.triggerUserOutboundEvent(event, promise: promise)
311+
}
312+
}
313+
287314
private func completeWith(_ result: Result<HTTPClient.Response, Error>) {
288315
guard let pending = self.pending else {
289316
preconditionFailure("invalid state, no pending request")
@@ -299,3 +326,5 @@ private struct HTTPRequestWrapper {
299326
let request: HTTPClient.Request
300327
let promise: EventLoopPromise<HTTPClient.Response>
301328
}
329+
330+
private struct RequestCancelEvent {}

Sources/AWSLambdaRuntimeCore/Lambda.swift

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -107,10 +107,10 @@ public enum Lambda {
107107
var logger = Logger(label: "Lambda")
108108
logger.logLevel = configuration.general.logLevel
109109
let eventLoop = eventLoopGroup.next()
110-
110+
111111
return eventLoop.flatSubmit {
112112
let lifecycle = Lifecycle(eventLoop: eventLoopGroup.next(), logger: logger, configuration: configuration, factory: factory)
113-
113+
114114
#if DEBUG
115115
// We only offer coordinated shutdown in DEBUG, since Lambda functions
116116
// running on AWS are not shutdown in a coordinated way.
@@ -119,13 +119,13 @@ public enum Lambda {
119119
lifecycle.shutdown()
120120
}
121121
#endif
122-
122+
123123
return lifecycle.start().flatMap {
124124
let future = lifecycle.shutdownFuture
125125
#if DEBUG
126-
return future.always { _ in signalSource.cancel() }
126+
return future.always { _ in signalSource.cancel() }
127127
#else
128-
return future
128+
return future
129129
#endif
130130
}
131131
}

Sources/AWSLambdaRuntimeCore/LambdaLifecycle.swift

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,10 +81,20 @@ extension Lambda {
8181

8282
// MARK: - Private
8383

84+
#if DEBUG
8485
/// Begin the `Lifecycle` shutdown.
86+
/// Only needed for debugging purposes.
8587
public func shutdown() {
86-
self.state = .shuttingdown
88+
// make this method thread safe by dispatching onto the eventloop
89+
self.eventLoop.execute {
90+
guard case .active(let runner, _) = self.state else {
91+
return
92+
}
93+
self.state = .shuttingdown
94+
runner.cancelWaitingForNextInvocation()
95+
}
8796
}
97+
#endif
8898

8999
private func markShutdown() {
90100
self.state = .shutdown

Sources/AWSLambdaRuntimeCore/LambdaRunner.swift

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@ import NIO
1818

1919
extension Lambda {
2020
/// LambdaRunner manages the Lambda runtime workflow, or business logic.
21-
internal struct Runner {
21+
internal final class Runner {
2222
private let runtimeClient: RuntimeClient
2323
private let eventLoop: EventLoop
2424

25+
private var isGettingNextInvocation = false
26+
2527
init(eventLoop: EventLoop, configuration: Configuration) {
2628
self.eventLoop = eventLoop
2729
self.runtimeClient = RuntimeClient(eventLoop: self.eventLoop, configuration: configuration.runtimeEngine)
@@ -46,13 +48,12 @@ extension Lambda {
4648
func run(logger: Logger, handler: Handler) -> EventLoopFuture<Void> {
4749
logger.debug("lambda invocation sequence starting")
4850
// 1. request work from lambda runtime engine
49-
return self.runtimeClient.getNextInvocation(logger: logger).peekError { (error) in
50-
if case RuntimeError.badStatusCode(.noContent) = error {
51-
return
52-
}
51+
self.isGettingNextInvocation = true
52+
return self.runtimeClient.getNextInvocation(logger: logger).peekError { error in
5353
logger.error("could not fetch work from lambda runtime engine: \(error)")
5454
}.flatMap { invocation, payload in
55-
// 2. send invocation to handler
55+
// 2. send work to handler
56+
self.isGettingNextInvocation = false
5657
let context = Context(logger: logger, eventLoop: self.eventLoop, invocation: invocation)
5758
logger.debug("sending invocation to lambda handler \(handler)")
5859
return handler.handle(context: context, payload: payload)
@@ -72,6 +73,15 @@ extension Lambda {
7273
logger.log(level: result.successful ? .debug : .warning, "lambda invocation sequence completed \(result.successful ? "successfully" : "with failure")")
7374
}
7475
}
76+
77+
#if DEBUG
78+
/// cancels the current run, if we are waiting for next invocation (long poll from Lambda control plane)
79+
/// only needed for debugging purposes.
80+
func cancelWaitingForNextInvocation() {
81+
guard self.isGettingNextInvocation else { return }
82+
self.runtimeClient.cancel()
83+
}
84+
#endif
7585
}
7686
}
7787

Sources/AWSLambdaRuntimeCore/LambdaRuntimeClient.swift

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,13 @@ extension Lambda {
114114
}
115115
}
116116
}
117+
118+
#if DEBUG
119+
/// Cancels the current request, if one is running. Only needed for debugging purposes
120+
func cancel() {
121+
self.httpClient.cancel()
122+
}
123+
#endif
117124
}
118125
}
119126

0 commit comments

Comments
 (0)