diff --git a/src/spaceone/core/connector/space_connector.py b/src/spaceone/core/connector/space_connector.py index 7555f47..03d0809 100644 --- a/src/spaceone/core/connector/space_connector.py +++ b/src/spaceone/core/connector/space_connector.py @@ -25,6 +25,7 @@ def __init__( endpoint: str = None, token: str = None, return_type: str = "dict", + timeout: int = None, **kwargs, ): super().__init__(*args, **kwargs) @@ -32,6 +33,7 @@ def __init__( self._endpoint = endpoint self._token = token self._return_type = return_type + self._timeout = timeout self._client = None self._endpoints: dict = self.config.get("endpoints", {}) @@ -95,6 +97,7 @@ def _init_client(self) -> None: endpoint=e["endpoint"], ssl_enabled=e["ssl_enabled"], max_message_length=1024 * 1024 * 256, + timeout=self._timeout, ) @staticmethod diff --git a/src/spaceone/core/error.py b/src/spaceone/core/error.py index 057c84c..cda5232 100644 --- a/src/spaceone/core/error.py +++ b/src/spaceone/core/error.py @@ -250,6 +250,10 @@ class ERROR_GRPC_CONNECTION(ERROR_UNAVAILAVBLE): _message = "Server is unavailable. (channel = {channel}, message = {message})" +class ERROR_GRPC_TIMEOUT(ERROR_GRPC_CONNECTION): + _message = "gRPC Timeout." + + class ERROR_GRPC_TLS_HANDSHAKE(ERROR_GRPC_CONNECTION): _message = "TLS handshake failed. (reason = {reason})" diff --git a/src/spaceone/core/pygrpc/client.py b/src/spaceone/core/pygrpc/client.py index 7fe083b..b588dce 100644 --- a/src/spaceone/core/pygrpc/client.py +++ b/src/spaceone/core/pygrpc/client.py @@ -1,6 +1,7 @@ import logging import types import grpc +from grpc import ClientCallDetails from google.protobuf.json_format import ParseDict from google.protobuf.message_factory import MessageFactory # , GetMessageClass from google.protobuf.descriptor_pool import DescriptorPool @@ -15,16 +16,28 @@ _LOGGER = logging.getLogger(__name__) +class _ClientCallDetails(ClientCallDetails): + def __init__(self, method, timeout, metadata, credentials, wait_for_ready): + self.method = method + self.timeout = timeout + self.metadata = metadata + self.credentials = credentials + self.wait_for_ready = wait_for_ready + + class _ClientInterceptor( grpc.UnaryUnaryClientInterceptor, grpc.UnaryStreamClientInterceptor, grpc.StreamUnaryClientInterceptor, grpc.StreamStreamClientInterceptor, ): - def __init__(self, options: dict, channel_key: str, request_map: dict): + def __init__( + self, options: dict, channel_key: str, request_map: dict, timeout: int = None + ): self._request_map = request_map self._channel_key = channel_key self.metadata = options.get("metadata", {}) + self.timeout = timeout or 60 def _check_message(self, client_call_details, request_or_iterator, is_stream): if client_call_details.method in self._request_map: @@ -123,7 +136,13 @@ def _retry_call( return response_or_iterator except Exception as e: - if e.error_code == "ERROR_GRPC_CONNECTION": + if not isinstance(e, ERROR_BASE): + e = ERROR_UNKNOWN(message=str(e)) + + if ( + e.error_code == "ERROR_GRPC_CONNECTION" + or e.status_code == "DEADLINE_EXCEEDED" + ): if retries >= _MAX_RETRIES: channel = e.meta.get("channel") if channel in _GRPC_CHANNEL: @@ -131,10 +150,13 @@ def _retry_call( f"Disconnect gRPC Endpoint. (channel = {channel})" ) del _GRPC_CHANNEL[channel] + + if e.status_code == "DEADLINE_EXCEEDED": + raise ERROR_GRPC_TIMEOUT() raise e else: _LOGGER.debug( - f"Retry gRPC Call: reason = {e.message}, retry = {retries + 1}" + f"Retry gRPC Call: method = {client_call_details.method}, reason = {e.message}, retry = {retries + 1}" ) else: raise e @@ -160,9 +182,20 @@ def _intercept_call( is_response_stream, ) + def _create_new_call_details(self, client_call_details): + return _ClientCallDetails( + method=client_call_details.method, + timeout=self.timeout, + metadata=client_call_details.metadata, + credentials=client_call_details.credentials, + wait_for_ready=client_call_details.wait_for_ready, + ) + def intercept_unary_unary(self, continuation, client_call_details, request): + new_call_details = self._create_new_call_details(client_call_details) + return self._intercept_call( - continuation, client_call_details, request, False, False + continuation, new_call_details, request, False, False ) def intercept_unary_stream(self, continuation, client_call_details, request): @@ -263,7 +296,7 @@ def _bind_grpc_method( class GRPCClient(object): - def __init__(self, channel, options, channel_key): + def __init__(self, channel, options, channel_key, timeout=None): self._request_map = {} self._api_resources = {} @@ -272,7 +305,7 @@ def __init__(self, channel, options, channel_key): self._init_grpc_reflection() _client_interceptor = _ClientInterceptor( - options, channel_key, self._request_map + options, channel_key, self._request_map, timeout ) _intercept_channel = grpc.intercept_channel(channel, _client_interceptor) self._bind_grpc_stub(_intercept_channel) @@ -326,7 +359,13 @@ def _create_insecure_channel(endpoint, options): return grpc.insecure_channel(endpoint, options=options) -def client(endpoint=None, ssl_enabled=False, max_message_length=None, **client_opts): +def client( + endpoint=None, + ssl_enabled=False, + max_message_length=None, + timeout=None, + **client_opts, +): if endpoint is None: raise Exception("Client's endpoint is undefined.") @@ -350,7 +389,9 @@ def client(endpoint=None, ssl_enabled=False, max_message_length=None, **client_o ) try: - _GRPC_CHANNEL[endpoint] = GRPCClient(channel, client_opts, endpoint) + _GRPC_CHANNEL[endpoint] = GRPCClient( + channel, client_opts, endpoint, timeout + ) except Exception as e: if hasattr(e, "details"): raise ERROR_GRPC_CONNECTION(channel=endpoint, message=e.details())