Skip to content

refactor: add a timeout settings to grpc client #171

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/spaceone/core/connector/space_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@ def __init__(
endpoint: str = None,
token: str = None,
return_type: str = "dict",
timeout: int = None,
**kwargs,
):
super().__init__(*args, **kwargs)
self._service = service
self._endpoint = endpoint
self._token = token
self._return_type = return_type
self._timeout = timeout

self._client = None
self._endpoints: dict = self.config.get("endpoints", {})
Expand Down Expand Up @@ -95,6 +97,7 @@ def _init_client(self) -> None:
endpoint=e["endpoint"],
ssl_enabled=e["ssl_enabled"],
max_message_length=1024 * 1024 * 256,
timeout=self._timeout,
)

@staticmethod
Expand Down
4 changes: 4 additions & 0 deletions src/spaceone/core/error.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,10 @@ class ERROR_GRPC_CONNECTION(ERROR_UNAVAILAVBLE):
_message = "Server is unavailable. (channel = {channel}, message = {message})"


class ERROR_GRPC_TIMEOUT(ERROR_GRPC_CONNECTION):
_message = "gRPC Timeout."


class ERROR_GRPC_TLS_HANDSHAKE(ERROR_GRPC_CONNECTION):
_message = "TLS handshake failed. (reason = {reason})"

Expand Down
57 changes: 49 additions & 8 deletions src/spaceone/core/pygrpc/client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
import types
import grpc
from grpc import ClientCallDetails
from google.protobuf.json_format import ParseDict
from google.protobuf.message_factory import MessageFactory # , GetMessageClass
from google.protobuf.descriptor_pool import DescriptorPool
Expand All @@ -15,16 +16,28 @@
_LOGGER = logging.getLogger(__name__)


class _ClientCallDetails(ClientCallDetails):
def __init__(self, method, timeout, metadata, credentials, wait_for_ready):
self.method = method
self.timeout = timeout
self.metadata = metadata
self.credentials = credentials
self.wait_for_ready = wait_for_ready


class _ClientInterceptor(
grpc.UnaryUnaryClientInterceptor,
grpc.UnaryStreamClientInterceptor,
grpc.StreamUnaryClientInterceptor,
grpc.StreamStreamClientInterceptor,
):
def __init__(self, options: dict, channel_key: str, request_map: dict):
def __init__(
self, options: dict, channel_key: str, request_map: dict, timeout: int = None
):
self._request_map = request_map
self._channel_key = channel_key
self.metadata = options.get("metadata", {})
self.timeout = timeout or 60

def _check_message(self, client_call_details, request_or_iterator, is_stream):
if client_call_details.method in self._request_map:
Expand Down Expand Up @@ -123,18 +136,27 @@ def _retry_call(
return response_or_iterator

except Exception as e:
if e.error_code == "ERROR_GRPC_CONNECTION":
if not isinstance(e, ERROR_BASE):
e = ERROR_UNKNOWN(message=str(e))

if (
e.error_code == "ERROR_GRPC_CONNECTION"
or e.status_code == "DEADLINE_EXCEEDED"
):
if retries >= _MAX_RETRIES:
channel = e.meta.get("channel")
if channel in _GRPC_CHANNEL:
_LOGGER.error(
f"Disconnect gRPC Endpoint. (channel = {channel})"
)
del _GRPC_CHANNEL[channel]

if e.status_code == "DEADLINE_EXCEEDED":
raise ERROR_GRPC_TIMEOUT()
raise e
else:
_LOGGER.debug(
f"Retry gRPC Call: reason = {e.message}, retry = {retries + 1}"
f"Retry gRPC Call: method = {client_call_details.method}, reason = {e.message}, retry = {retries + 1}"
)
else:
raise e
Expand All @@ -160,9 +182,20 @@ def _intercept_call(
is_response_stream,
)

def _create_new_call_details(self, client_call_details):
return _ClientCallDetails(
method=client_call_details.method,
timeout=self.timeout,
metadata=client_call_details.metadata,
credentials=client_call_details.credentials,
wait_for_ready=client_call_details.wait_for_ready,
)

def intercept_unary_unary(self, continuation, client_call_details, request):
new_call_details = self._create_new_call_details(client_call_details)

return self._intercept_call(
continuation, client_call_details, request, False, False
continuation, new_call_details, request, False, False
)

def intercept_unary_stream(self, continuation, client_call_details, request):
Expand Down Expand Up @@ -263,7 +296,7 @@ def _bind_grpc_method(


class GRPCClient(object):
def __init__(self, channel, options, channel_key):
def __init__(self, channel, options, channel_key, timeout=None):
self._request_map = {}
self._api_resources = {}

Expand All @@ -272,7 +305,7 @@ def __init__(self, channel, options, channel_key):
self._init_grpc_reflection()

_client_interceptor = _ClientInterceptor(
options, channel_key, self._request_map
options, channel_key, self._request_map, timeout
)
_intercept_channel = grpc.intercept_channel(channel, _client_interceptor)
self._bind_grpc_stub(_intercept_channel)
Expand Down Expand Up @@ -326,7 +359,13 @@ def _create_insecure_channel(endpoint, options):
return grpc.insecure_channel(endpoint, options=options)


def client(endpoint=None, ssl_enabled=False, max_message_length=None, **client_opts):
def client(
endpoint=None,
ssl_enabled=False,
max_message_length=None,
timeout=None,
**client_opts,
):
if endpoint is None:
raise Exception("Client's endpoint is undefined.")

Expand All @@ -350,7 +389,9 @@ def client(endpoint=None, ssl_enabled=False, max_message_length=None, **client_o
)

try:
_GRPC_CHANNEL[endpoint] = GRPCClient(channel, client_opts, endpoint)
_GRPC_CHANNEL[endpoint] = GRPCClient(
channel, client_opts, endpoint, timeout
)
except Exception as e:
if hasattr(e, "details"):
raise ERROR_GRPC_CONNECTION(channel=endpoint, message=e.details())
Expand Down
Loading