Skip to content

Commit a282ef3

Browse files
simonjbeaumontglbrntt
authored andcommitted
[async-await] Base types for client implementation (grpc#1243)
This PR implements some of the types required by the proposal for async/await support, added in grpc#1231. To aid reviewing, only the types required for the client are included. They have been pulled in from the proof-of-concept implementation linked from the proposal PR. Note that this makes use of a clunky mechanism for yielding results into the response stream. @glbrntt is working on a parallel effort to provide a more flexible implementation of `AsyncSequence` that allows for values to be yielded from outside of the closure parameter to the initializer. When it is ready, this code will be updated to make use of it. There is a note in the code comments to this effect.
1 parent c1f3c50 commit a282ef3

11 files changed

+1214
-1
lines changed

Package.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ let package = Package(
6060
.product(name: "NIOEmbedded", package: "swift-nio"),
6161
.product(name: "NIOFoundationCompat", package: "swift-nio"),
6262
.product(name: "NIOTLS", package: "swift-nio"),
63+
.product(name: "_NIOConcurrency", package: "swift-nio"),
6364
.product(name: "NIOTransportServices", package: "swift-nio-transport-services"),
6465
.product(name: "NIOHTTP1", package: "swift-nio"),
6566
.product(name: "NIOHTTP2", package: "swift-nio-http2"),
Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
/*
2+
* Copyright 2021, gRPC Authors All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
#if compiler(>=5.5)
17+
18+
import _NIOConcurrency
19+
import NIOHPACK
20+
21+
/// Async-await variant of BidirectionalStreamingCall.
22+
@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
23+
public struct GRPCAsyncBidirectionalStreamingCall<Request, Response> {
24+
private let call: Call<Request, Response>
25+
private let responseParts: StreamingResponseParts<Response>
26+
27+
/// The stream of responses from the server.
28+
public let responses: GRPCAsyncResponseStream<Response>
29+
30+
/// The options used to make the RPC.
31+
public var options: CallOptions {
32+
return self.call.options
33+
}
34+
35+
/// Cancel this RPC if it hasn't already completed.
36+
public func cancel() async throws {
37+
try await self.call.cancel().get()
38+
}
39+
40+
// MARK: - Response Parts
41+
42+
/// The initial metadata returned from the server.
43+
public var initialMetadata: HPACKHeaders {
44+
// swiftformat:disable:next redundantGet
45+
get async throws {
46+
try await self.responseParts.initialMetadata.get()
47+
}
48+
}
49+
50+
/// The trailing metadata returned from the server.
51+
///
52+
/// - Important: Awaiting this property will suspend until the responses have been consumed.
53+
public var trailingMetadata: HPACKHeaders {
54+
// swiftformat:disable:next redundantGet
55+
get async throws {
56+
try await self.responseParts.trailingMetadata.get()
57+
}
58+
}
59+
60+
/// The final status of the the RPC.
61+
///
62+
/// - Important: Awaiting this property will suspend until the responses have been consumed.
63+
public var status: GRPCStatus {
64+
// swiftformat:disable:next redundantGet
65+
get async {
66+
// force-try acceptable because any error is encapsulated in a successful GRPCStatus future.
67+
try! await self.responseParts.status.get()
68+
}
69+
}
70+
71+
private init(call: Call<Request, Response>) {
72+
self.call = call
73+
// Initialise `responseParts` with an empty response handler because we
74+
// provide the responses as an AsyncSequence in `responseStream`.
75+
self.responseParts = StreamingResponseParts(on: call.eventLoop) { _ in }
76+
77+
// Call and StreamingResponseParts are reference types so we grab a
78+
// referecence to them here to avoid capturing mutable self in the closure
79+
// passed to the AsyncThrowingStream initializer.
80+
//
81+
// The alternative would be to declare the responseStream as:
82+
// ```
83+
// public private(set) var responseStream: AsyncThrowingStream<ResponsePayload>!
84+
// ```
85+
//
86+
// UPDATE: Additionally we expect to replace this soon with an AsyncSequence
87+
// implementation that supports yielding values from outside the closure.
88+
let call = self.call
89+
let responseParts = self.responseParts
90+
let responseStream = AsyncThrowingStream(Response.self) { continuation in
91+
call.invokeStreamingRequests { error in
92+
responseParts.handleError(error)
93+
continuation.finish(throwing: error)
94+
} onResponsePart: { responsePart in
95+
responseParts.handle(responsePart)
96+
switch responsePart {
97+
case let .message(response):
98+
continuation.yield(response)
99+
case .metadata:
100+
break
101+
case .end:
102+
continuation.finish()
103+
}
104+
}
105+
}
106+
self.responses = .init(responseStream)
107+
}
108+
109+
/// We expose this as the only non-private initializer so that the caller
110+
/// knows that invocation is part of initialisation.
111+
internal static func makeAndInvoke(call: Call<Request, Response>) -> Self {
112+
Self(call: call)
113+
}
114+
115+
// MARK: - Requests
116+
117+
/// Sends a message to the service.
118+
///
119+
/// - Important: Callers must terminate the stream of messages by calling `sendEnd()`.
120+
///
121+
/// - Parameters:
122+
/// - message: The message to send.
123+
/// - compression: Whether compression should be used for this message. Ignored if compression
124+
/// was not enabled for the RPC.
125+
public func sendMessage(
126+
_ message: Request,
127+
compression: Compression = .deferToCallDefault
128+
) async throws {
129+
let compress = self.call.compress(compression)
130+
let promise = self.call.eventLoop.makePromise(of: Void.self)
131+
self.call.send(.message(message, .init(compress: compress, flush: true)), promise: promise)
132+
// TODO: This waits for the message to be written to the socket. We should probably just wait for it to be written to the channel?
133+
try await promise.futureResult.get()
134+
}
135+
136+
/// Sends a sequence of messages to the service.
137+
///
138+
/// - Important: Callers must terminate the stream of messages by calling `sendEnd()`.
139+
///
140+
/// - Parameters:
141+
/// - messages: The sequence of messages to send.
142+
/// - compression: Whether compression should be used for this message. Ignored if compression
143+
/// was not enabled for the RPC.
144+
public func sendMessages<S>(
145+
_ messages: S,
146+
compression: Compression = .deferToCallDefault
147+
) async throws where S: Sequence, S.Element == Request {
148+
let promise = self.call.eventLoop.makePromise(of: Void.self)
149+
self.call.sendMessages(messages, compression: compression, promise: promise)
150+
try await promise.futureResult.get()
151+
}
152+
153+
/// Terminates a stream of messages sent to the service.
154+
///
155+
/// - Important: This should only ever be called once.
156+
public func sendEnd() async throws {
157+
let promise = self.call.eventLoop.makePromise(of: Void.self)
158+
self.call.send(.end, promise: promise)
159+
try await promise.futureResult.get()
160+
}
161+
}
162+
163+
#endif
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
/*
2+
* Copyright 2021, gRPC Authors All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
#if compiler(>=5.5)
17+
18+
import NIOHPACK
19+
20+
/// Async-await variant of `ClientStreamingCall`.
21+
@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
22+
public struct GRPCAsyncClientStreamingCall<Request, Response> {
23+
private let call: Call<Request, Response>
24+
private let responseParts: UnaryResponseParts<Response>
25+
26+
/// The options used to make the RPC.
27+
public var options: CallOptions {
28+
return self.call.options
29+
}
30+
31+
/// Cancel this RPC if it hasn't already completed.
32+
public func cancel() async throws {
33+
try await self.call.cancel().get()
34+
}
35+
36+
// MARK: - Response Parts
37+
38+
/// The initial metadata returned from the server.
39+
public var initialMetadata: HPACKHeaders {
40+
// swiftformat:disable:next redundantGet
41+
get async throws {
42+
try await self.responseParts.initialMetadata.get()
43+
}
44+
}
45+
46+
/// The response returned by the server.
47+
public var response: Response {
48+
// swiftformat:disable:next redundantGet
49+
get async throws {
50+
try await self.responseParts.response.get()
51+
}
52+
}
53+
54+
/// The trailing metadata returned from the server.
55+
///
56+
/// - Important: Awaiting this property will suspend until the responses have been consumed.
57+
public var trailingMetadata: HPACKHeaders {
58+
// swiftformat:disable:next redundantGet
59+
get async throws {
60+
try await self.responseParts.trailingMetadata.get()
61+
}
62+
}
63+
64+
/// The final status of the the RPC.
65+
///
66+
/// - Important: Awaiting this property will suspend until the responses have been consumed.
67+
public var status: GRPCStatus {
68+
// swiftformat:disable:next redundantGet
69+
get async {
70+
// force-try acceptable because any error is encapsulated in a successful GRPCStatus future.
71+
try! await self.responseParts.status.get()
72+
}
73+
}
74+
75+
private init(call: Call<Request, Response>) {
76+
self.call = call
77+
self.responseParts = UnaryResponseParts(on: call.eventLoop)
78+
self.call.invokeStreamingRequests(
79+
onError: self.responseParts.handleError(_:),
80+
onResponsePart: self.responseParts.handle(_:)
81+
)
82+
}
83+
84+
/// We expose this as the only non-private initializer so that the caller
85+
/// knows that invocation is part of initialisation.
86+
internal static func makeAndInvoke(call: Call<Request, Response>) -> Self {
87+
Self(call: call)
88+
}
89+
90+
// MARK: - Requests
91+
92+
/// Sends a message to the service.
93+
///
94+
/// - Important: Callers must terminate the stream of messages by calling `sendEnd()`.
95+
///
96+
/// - Parameters:
97+
/// - message: The message to send.
98+
/// - compression: Whether compression should be used for this message. Ignored if compression
99+
/// was not enabled for the RPC.
100+
public func sendMessage(
101+
_ message: Request,
102+
compression: Compression = .deferToCallDefault
103+
) async throws {
104+
let compress = self.call.compress(compression)
105+
let promise = self.call.eventLoop.makePromise(of: Void.self)
106+
self.call.send(.message(message, .init(compress: compress, flush: true)), promise: promise)
107+
// TODO: This waits for the message to be written to the socket. We should probably just wait for it to be written to the channel?
108+
try await promise.futureResult.get()
109+
}
110+
111+
/// Sends a sequence of messages to the service.
112+
///
113+
/// - Important: Callers must terminate the stream of messages by calling `sendEnd()`.
114+
///
115+
/// - Parameters:
116+
/// - messages: The sequence of messages to send.
117+
/// - compression: Whether compression should be used for this message. Ignored if compression
118+
/// was not enabled for the RPC.
119+
public func sendMessages<S>(
120+
_ messages: S,
121+
compression: Compression = .deferToCallDefault
122+
) async throws where S: Sequence, S.Element == Request {
123+
let promise = self.call.eventLoop.makePromise(of: Void.self)
124+
self.call.sendMessages(messages, compression: compression, promise: promise)
125+
try await promise.futureResult.get()
126+
}
127+
128+
/// Terminates a stream of messages sent to the service.
129+
///
130+
/// - Important: This should only ever be called once.
131+
public func sendEnd() async throws {
132+
let promise = self.call.eventLoop.makePromise(of: Void.self)
133+
self.call.send(.end, promise: promise)
134+
try await promise.futureResult.get()
135+
}
136+
}
137+
138+
#endif
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright 2021, gRPC Authors All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
#if compiler(>=5.5)
17+
18+
/// This is currently a wrapper around AsyncThrowingStream because we want to be
19+
/// able to swap out the implementation for something else in the future.
20+
@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
21+
public struct GRPCAsyncResponseStream<Element>: AsyncSequence {
22+
@usableFromInline
23+
internal typealias WrappedStream = AsyncThrowingStream<Element, Error>
24+
25+
@usableFromInline
26+
internal let stream: WrappedStream
27+
28+
@inlinable
29+
internal init(_ stream: WrappedStream) {
30+
self.stream = stream
31+
}
32+
33+
public func makeAsyncIterator() -> Iterator {
34+
Self.AsyncIterator(self.stream)
35+
}
36+
37+
public struct Iterator: AsyncIteratorProtocol {
38+
@usableFromInline
39+
internal var iterator: WrappedStream.AsyncIterator
40+
41+
fileprivate init(_ stream: WrappedStream) {
42+
self.iterator = stream.makeAsyncIterator()
43+
}
44+
45+
@inlinable
46+
public mutating func next() async throws -> Element? {
47+
try await self.iterator.next()
48+
}
49+
}
50+
}
51+
52+
#endif

0 commit comments

Comments
 (0)