Skip to content

Two important fixes (and possibly more improvements) #188

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Mar 19, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Sources/CgRPC/shim/call.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
#include <assert.h>

void cgrpc_call_destroy(cgrpc_call *call) {
//grpc_call_destroy(call->call);
if (call->call) {
grpc_call_unref(call->call);
}
free(call);
}

Expand Down
22 changes: 22 additions & 0 deletions Sources/CgRPC/shim/cgrpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,22 @@ typedef enum grpc_completion_type {
GRPC_OP_COMPLETE
} grpc_completion_type;

/** Connectivity state of a channel. */
typedef enum grpc_connectivity_state {
/** channel has just been initialized */
GRPC_CHANNEL_INIT = -1,
/** channel is idle */
GRPC_CHANNEL_IDLE,
/** channel is connecting */
GRPC_CHANNEL_CONNECTING,
/** channel is ready for work */
GRPC_CHANNEL_READY,
/** channel has seen a failure but expects to recover */
GRPC_CHANNEL_TRANSIENT_FAILURE,
/** channel has seen a failure that it cannot recover from */
GRPC_CHANNEL_SHUTDOWN
} grpc_connectivity_state;

typedef struct grpc_event {
/** The type of the completion. */
grpc_completion_type type;
Expand All @@ -110,6 +126,9 @@ void grpc_shutdown(void);
const char *grpc_version_string(void);
const char *grpc_g_stands_for(void);

void cgrpc_completion_queue_drain(cgrpc_completion_queue *cq);
void grpc_completion_queue_destroy(cgrpc_completion_queue *cq);

// helper
void cgrpc_free_copied_string(char *string);

Expand All @@ -126,6 +145,9 @@ cgrpc_call *cgrpc_channel_create_call(cgrpc_channel *channel,
double timeout);
cgrpc_completion_queue *cgrpc_channel_completion_queue(cgrpc_channel *channel);

grpc_connectivity_state cgrpc_channel_check_connectivity_state(
cgrpc_channel *channel, int try_to_connect);

// server support
cgrpc_server *cgrpc_server_create(const char *address);
cgrpc_server *cgrpc_server_create_secure(const char *address,
Expand Down
7 changes: 5 additions & 2 deletions Sources/CgRPC/shim/channel.c
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,6 @@ void cgrpc_channel_destroy(cgrpc_channel *c) {
c->channel = NULL;

grpc_completion_queue_shutdown(c->completion_queue);
cgrpc_completion_queue_drain(c->completion_queue);
grpc_completion_queue_destroy(c->completion_queue);
free(c);
}

Expand All @@ -85,6 +83,7 @@ cgrpc_call *cgrpc_channel_create_call(cgrpc_channel *channel,
// create call
host_slice = grpc_slice_from_copied_string(host);
gpr_timespec deadline = cgrpc_deadline_in_seconds_from_now(timeout);
// The resulting call will have a retain call of +1. We'll release it in `cgrpc_call_destroy()`.
grpc_call *channel_call = grpc_channel_create_call(channel->channel,
NULL,
GRPC_PROPAGATE_DEFAULTS,
Expand All @@ -102,3 +101,7 @@ cgrpc_call *cgrpc_channel_create_call(cgrpc_channel *channel,
cgrpc_completion_queue *cgrpc_channel_completion_queue(cgrpc_channel *channel) {
return channel->completion_queue;
}

grpc_connectivity_state cgrpc_channel_check_connectivity_state(cgrpc_channel *channel, int try_to_connect) {
return grpc_channel_check_connectivity_state(channel->channel, try_to_connect);
}
15 changes: 10 additions & 5 deletions Sources/CgRPC/shim/handler.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,10 @@ cgrpc_handler *cgrpc_handler_create_with_server(cgrpc_server *server) {

void cgrpc_handler_destroy(cgrpc_handler *h) {
grpc_completion_queue_shutdown(h->completion_queue);
cgrpc_completion_queue_drain(h->completion_queue);
grpc_completion_queue_destroy(h->completion_queue);
grpc_metadata_array_destroy(&(h->request_metadata_recv));
grpc_call_details_destroy(&(h->call_details));
if (h->server_call) {
//grpc_call_destroy(h->server_call);
grpc_call_unref(h->server_call);
}
free(h);
}
Expand Down Expand Up @@ -67,6 +65,10 @@ cgrpc_call *cgrpc_handler_get_call(cgrpc_handler *h) {
cgrpc_call *call = (cgrpc_call *) malloc(sizeof(cgrpc_call));
memset(call, 0, sizeof(cgrpc_call));
call->call = h->server_call;
if (call->call) {
// This retain will be balanced by `cgrpc_call_destroy()`.
grpc_call_ref(call->call);
}
return call;
}

Expand All @@ -77,6 +79,11 @@ cgrpc_completion_queue *cgrpc_handler_get_completion_queue(cgrpc_handler *h) {
grpc_call_error cgrpc_handler_request_call(cgrpc_handler *h,
cgrpc_metadata_array *metadata,
long tag) {
if (h->server_call != NULL) {
return GRPC_CALL_OK;
}
// This fills `h->server_call` with a call with retain count of +1.
// We'll release that retain in `cgrpc_handler_destroy()`.
return grpc_server_request_call(h->server->server,
&(h->server_call),
&(h->call_details),
Expand All @@ -85,5 +92,3 @@ grpc_call_error cgrpc_handler_request_call(cgrpc_handler *h,
h->server->completion_queue,
cgrpc_create_tag(tag));
}


2 changes: 0 additions & 2 deletions Sources/CgRPC/shim/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,6 @@ void cgrpc_server_destroy(cgrpc_server *server) {
server->server = NULL;

grpc_completion_queue_shutdown(server->completion_queue);
cgrpc_completion_queue_drain(server->completion_queue);
grpc_completion_queue_destroy(server->completion_queue);
}

void cgrpc_server_start(cgrpc_server *server) {
Expand Down
4 changes: 4 additions & 0 deletions Sources/SwiftGRPC/Core/Channel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ public class Channel {

/// Default host to use for new calls
public var host: String

public var connectivityState: ConnectivityState? {
return ConnectivityState.fromCEnum(cgrpc_channel_check_connectivity_state(underlyingChannel, 0))
}

/// Initializes a gRPC channel
///
Expand Down
5 changes: 5 additions & 0 deletions Sources/SwiftGRPC/Core/CompletionQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ class CompletionQueue {
self.underlyingCompletionQueue = underlyingCompletionQueue
self.name = name
}

deinit {
cgrpc_completion_queue_drain(underlyingCompletionQueue)
grpc_completion_queue_destroy(underlyingCompletionQueue)
}

/// Waits for an operation group to complete
///
Expand Down
32 changes: 32 additions & 0 deletions Sources/SwiftGRPC/Core/ConnectivityState.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2018, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#if SWIFT_PACKAGE
import CgRPC
#endif
import Foundation

public enum ConnectivityState: Int32, Error {
case initializing = -1
case idle
case connecting
case ready
case transient_failure
case shutdown

static func fromCEnum(_ connectivityState: grpc_connectivity_state) -> ConnectivityState? {
return ConnectivityState(rawValue: connectivityState.rawValue)
}
}
2 changes: 1 addition & 1 deletion Sources/SwiftGRPC/Core/Handler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class Handler {
/// A Call object that can be used to respond to the request
public private(set) lazy var call: Call = {
Call(underlyingCall: cgrpc_handler_get_call(self.underlyingHandler),
owned: false,
owned: true,
completionQueue: self.completionQueue)
}()

Expand Down
6 changes: 3 additions & 3 deletions Sources/SwiftGRPC/Core/Metadata.swift
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,13 @@ public class Metadata: CustomStringConvertible {
}

public var description: String {
var result = ""
var lines: [String] = []
for i in 0..<count() {
let key = self.key(i)
let value = self.value(i)
result += (key ?? "(nil)") + ":" + (value ?? "(nil)") + "\n"
lines.append((key ?? "(nil)") + ":" + (value ?? "(nil)"))
}
return result
return lines.joined(separator: "\n")
}

public func copy() -> Metadata {
Expand Down
10 changes: 5 additions & 5 deletions Sources/SwiftGRPC/Core/Server.swift
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,7 @@ public class Server {
cgrpc_server_start(underlyingServer)
// run the server on a new background thread
dispatchQueue.async {
var running = true
while running {
spinloop: while true {
do {
let handler = Handler(underlyingServer: self.underlyingServer)
try handler.requestCall(tag: Server.handlerCallTag)
Expand Down Expand Up @@ -97,16 +96,17 @@ public class Server {
}
}
} else if event.tag == Server.stopTag || event.tag == Server.destroyTag {
running = false // exit the loop
break spinloop
}
} else if event.type == .queueTimeout {
// everything is fine
continue
} else if event.type == .queueShutdown {
running = false
break spinloop
}
} catch {
print("server call error: \(error)")
running = false
break spinloop
}
}
self.onCompletion?()
Expand Down
16 changes: 10 additions & 6 deletions Sources/SwiftGRPC/Runtime/ServiceServer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ open class ServiceServer {
public let address: String
public let server: Server

public var shouldLogRequests = true

/// Create a server that accepts insecure connections.
public init(address: String) {
gRPC.initialize()
Expand Down Expand Up @@ -58,13 +60,15 @@ open class ServiceServer {
return
}

let unwrappedHost = handler.host ?? "(nil)"
let unwrappedMethod = handler.method ?? "(nil)"
let unwrappedCaller = handler.caller ?? "(nil)"
print("Server received request to " + unwrappedHost
+ " calling " + unwrappedMethod
+ " from " + unwrappedCaller
+ " with " + handler.requestMetadata.description)
if strongSelf.shouldLogRequests == true {
let unwrappedHost = handler.host ?? "(nil)"
let unwrappedCaller = handler.caller ?? "(nil)"
print("Server received request to " + unwrappedHost
+ " calling " + unwrappedMethod
+ " from " + unwrappedCaller
+ " with " + handler.requestMetadata.description)
}

do {
if !(try strongSelf.handleMethod(unwrappedMethod, handler: handler, queue: queue)) {
Expand Down
17 changes: 17 additions & 0 deletions Tests/SwiftGRPCTests/EchoTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class EchoTests: BasicEchoTestCase {
static var allTests: [(String, (EchoTests) -> () throws -> Void)] {
return [
("testUnary", testUnary),
("testUnaryLotsOfRequests", testUnaryLotsOfRequests),
("testClientStreaming", testClientStreaming),
("testClientStreamingLotsOfMessages", testClientStreamingLotsOfMessages),
("testServerStreaming", testServerStreaming),
Expand All @@ -48,6 +49,22 @@ extension EchoTests {
XCTAssertEqual("Swift echo get: foo", try! client.get(Echo_EchoRequest(text: "foo")).text)
XCTAssertEqual("Swift echo get: foo", try! client.get(Echo_EchoRequest(text: "foo")).text)
}

func testUnaryLotsOfRequests() {
// No need to spam the log with 50k lines.
server.shouldLogRequests = false
// Sending that many requests at once can sometimes trip things up, it seems.
client.timeout = 5.0
let clockStart = clock()
let numberOfRequests = 50_000
for i in 0..<numberOfRequests {
if i % 1_000 == 0 && i > 0 {
print("\(i) requests sent so far, elapsed time: \(Double(clock() - clockStart) / Double(CLOCKS_PER_SEC))")
}
XCTAssertEqual("Swift echo get: foo \(i)", try client.get(Echo_EchoRequest(text: "foo \(i)")).text)
}
print("total time for \(numberOfRequests) requests: \(Double(clock() - clockStart) / Double(CLOCKS_PER_SEC))")
}
}

extension EchoTests {
Expand Down
Loading