Skip to content

Commit 239631e

Browse files
author
Jongmin Kim
authored
Merge pull request #150 from whdalsrnt/master
refactor: refactor code for grpc compatability
2 parents 7dc1980 + 8cfa68e commit 239631e

File tree

1 file changed

+169
-80
lines changed

1 file changed

+169
-80
lines changed

src/spaceone/core/pygrpc/client.py

Lines changed: 169 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@
22
import types
33
import grpc
44
from google.protobuf.json_format import ParseDict
5-
from google.protobuf.message_factory import MessageFactory
5+
from google.protobuf.message_factory import MessageFactory, GetMessageClass
66
from google.protobuf.descriptor_pool import DescriptorPool
77
from google.protobuf.descriptor import ServiceDescriptor, MethodDescriptor
8-
from grpc_reflection.v1alpha.proto_reflection_descriptor_database import ProtoReflectionDescriptorDatabase
8+
from grpc_reflection.v1alpha.proto_reflection_descriptor_database import (
9+
ProtoReflectionDescriptorDatabase,
10+
)
911
from spaceone.core.error import *
1012

1113
_MAX_RETRIES = 2
@@ -14,24 +16,32 @@
1416

1517

1618
class _ClientInterceptor(
17-
grpc.UnaryUnaryClientInterceptor, grpc.UnaryStreamClientInterceptor,
18-
grpc.StreamUnaryClientInterceptor, grpc.StreamStreamClientInterceptor):
19-
19+
grpc.UnaryUnaryClientInterceptor,
20+
grpc.UnaryStreamClientInterceptor,
21+
grpc.StreamUnaryClientInterceptor,
22+
grpc.StreamStreamClientInterceptor,
23+
):
2024
def __init__(self, options: dict, channel_key: str, request_map: dict):
2125
self._request_map = request_map
2226
self._channel_key = channel_key
23-
self.metadata = options.get('metadata', {})
27+
self.metadata = options.get("metadata", {})
2428

2529
def _check_message(self, client_call_details, request_or_iterator, is_stream):
2630
if client_call_details.method in self._request_map:
2731
if is_stream:
2832
if not isinstance(request_or_iterator, types.GeneratorType):
29-
raise Exception("Stream method must be specified as a generator type.")
33+
raise Exception(
34+
"Stream method must be specified as a generator type."
35+
)
3036

31-
return self._generate_message(request_or_iterator, client_call_details.method)
37+
return self._generate_message(
38+
request_or_iterator, client_call_details.method
39+
)
3240

3341
else:
34-
return self._make_message(request_or_iterator, client_call_details.method)
42+
return self._make_message(
43+
request_or_iterator, client_call_details.method
44+
)
3545

3646
return request_or_iterator
3747

@@ -50,17 +60,17 @@ def _check_error(self, response):
5060
if isinstance(response, Exception):
5161
details = response.details()
5262
status_code = response.code().name
53-
if details.startswith('ERROR_'):
54-
details_split = details.split(':', 1)
63+
if details.startswith("ERROR_"):
64+
details_split = details.split(":", 1)
5565
if len(details_split) == 2:
5666
error_code, error_message = details_split
5767
else:
5868
error_code = details_split[0]
5969
error_message = details
6070

61-
if status_code == 'PERMISSION_DENIED':
71+
if status_code == "PERMISSION_DENIED":
6272
raise ERROR_PERMISSION_DENIED()
63-
elif status_code == 'UNAUTHENTICATED':
73+
elif status_code == "UNAUTHENTICATED":
6474
raise ERROR_AUTHENTICATE_FAILURE(message=error_message)
6575
else:
6676
e = ERROR_INTERNAL_API(message=error_message)
@@ -70,13 +80,15 @@ def _check_error(self, response):
7080

7181
else:
7282
error_message = response.details()
73-
if status_code == 'PERMISSION_DENIED':
83+
if status_code == "PERMISSION_DENIED":
7484
raise ERROR_PERMISSION_DENIED()
75-
elif status_code == 'PERMISSION_DENIED':
85+
elif status_code == "PERMISSION_DENIED":
7686
raise ERROR_AUTHENTICATE_FAILURE(message=error_message)
77-
elif status_code == 'UNAVAILABLE':
78-
e = ERROR_GRPC_CONNECTION(channel=self._channel_key, message=error_message)
79-
e.meta['channel'] = self._channel_key
87+
elif status_code == "UNAVAILABLE":
88+
e = ERROR_GRPC_CONNECTION(
89+
channel=self._channel_key, message=error_message
90+
)
91+
e.meta["channel"] = self._channel_key
8092
raise e
8193
else:
8294
e = ERROR_INTERNAL_API(message=error_message)
@@ -92,12 +104,16 @@ def _generate_response(self, response_iterator):
92104
except Exception as e:
93105
self._check_error(e)
94106

95-
def _retry_call(self, continuation, client_call_details, request_or_iterator, is_stream):
107+
def _retry_call(
108+
self, continuation, client_call_details, request_or_iterator, is_stream
109+
):
96110
retries = 0
97111

98112
while True:
99113
try:
100-
response_or_iterator = continuation(client_call_details, request_or_iterator)
114+
response_or_iterator = continuation(
115+
client_call_details, request_or_iterator
116+
)
101117

102118
if is_stream:
103119
response_or_iterator = self._generate_response(response_or_iterator)
@@ -107,84 +123,142 @@ def _retry_call(self, continuation, client_call_details, request_or_iterator, is
107123
return response_or_iterator
108124

109125
except Exception as e:
110-
if e.error_code == 'ERROR_GRPC_CONNECTION':
126+
if e.error_code == "ERROR_GRPC_CONNECTION":
111127
if retries >= _MAX_RETRIES:
112-
channel = e.meta.get('channel')
128+
channel = e.meta.get("channel")
113129
if channel in _GRPC_CHANNEL:
114-
_LOGGER.error(f'Disconnect gRPC Endpoint. (channel = {channel})')
130+
_LOGGER.error(
131+
f"Disconnect gRPC Endpoint. (channel = {channel})"
132+
)
115133
del _GRPC_CHANNEL[channel]
116134
raise e
117135
else:
118-
_LOGGER.debug(f'Retry gRPC Call: reason = {e.message}, retry = {retries + 1}')
136+
_LOGGER.debug(
137+
f"Retry gRPC Call: reason = {e.message}, retry = {retries + 1}"
138+
)
119139
else:
120140
raise e
121141

122142
retries += 1
123143

124-
def _intercept_call(self, continuation, client_call_details,
125-
request_or_iterator, is_request_stream, is_response_stream):
126-
new_request_or_iterator = self. _check_message(
127-
client_call_details, request_or_iterator, is_request_stream)
128-
129-
return self._retry_call(continuation, client_call_details,
130-
new_request_or_iterator, is_response_stream)
144+
def _intercept_call(
145+
self,
146+
continuation,
147+
client_call_details,
148+
request_or_iterator,
149+
is_request_stream,
150+
is_response_stream,
151+
):
152+
new_request_or_iterator = self._check_message(
153+
client_call_details, request_or_iterator, is_request_stream
154+
)
155+
156+
return self._retry_call(
157+
continuation,
158+
client_call_details,
159+
new_request_or_iterator,
160+
is_response_stream,
161+
)
131162

132163
def intercept_unary_unary(self, continuation, client_call_details, request):
133-
return self._intercept_call(continuation, client_call_details, request, False, False)
164+
return self._intercept_call(
165+
continuation, client_call_details, request, False, False
166+
)
134167

135168
def intercept_unary_stream(self, continuation, client_call_details, request):
136-
return self._intercept_call(continuation, client_call_details, request, False, True)
169+
return self._intercept_call(
170+
continuation, client_call_details, request, False, True
171+
)
137172

138-
def intercept_stream_unary(self, continuation, client_call_details, request_iterator):
139-
return self._intercept_call(continuation, client_call_details, request_iterator, True, False)
173+
def intercept_stream_unary(
174+
self, continuation, client_call_details, request_iterator
175+
):
176+
return self._intercept_call(
177+
continuation, client_call_details, request_iterator, True, False
178+
)
140179

141-
def intercept_stream_stream(self, continuation, client_call_details, request_iterator):
142-
return self._intercept_call(continuation, client_call_details, request_iterator, True, True)
180+
def intercept_stream_stream(
181+
self, continuation, client_call_details, request_iterator
182+
):
183+
return self._intercept_call(
184+
continuation, client_call_details, request_iterator, True, True
185+
)
143186

144187

145188
class _GRPCStub(object):
146-
147-
def __init__(self, desc_pool: DescriptorPool, service_desc: ServiceDescriptor, channel: grpc.Channel):
189+
def __init__(
190+
self,
191+
desc_pool: DescriptorPool,
192+
service_desc: ServiceDescriptor,
193+
channel: grpc.Channel,
194+
):
148195
self._desc_pool = desc_pool
149196
for method_desc in service_desc.methods:
150197
self._bind_grpc_method(service_desc, method_desc, channel)
151198

152-
def _bind_grpc_method(self, service_desc: ServiceDescriptor, method_desc: MethodDescriptor, channel: grpc.Channel):
199+
def _bind_grpc_method(
200+
self,
201+
service_desc: ServiceDescriptor,
202+
method_desc: MethodDescriptor,
203+
channel: grpc.Channel,
204+
):
153205
method_name = method_desc.name
154-
method_key = f'/{service_desc.full_name}/{method_name}'
155-
request_desc = self._desc_pool.FindMessageTypeByName(method_desc.input_type.full_name)
156-
request_message_desc = MessageFactory(self._desc_pool).GetPrototype(request_desc)
157-
response_desc = self._desc_pool.FindMessageTypeByName(method_desc.output_type.full_name)
158-
response_message_desc = MessageFactory(self._desc_pool).GetPrototype(response_desc)
206+
method_key = f"/{service_desc.full_name}/{method_name}"
207+
request_desc = self._desc_pool.FindMessageTypeByName(
208+
method_desc.input_type.full_name
209+
)
210+
# request_message_desc = MessageFactory(self._desc_pool).GetPrototype(request_desc)
211+
request_message_desc = GetMessageClass(request_desc)
212+
213+
response_desc = self._desc_pool.FindMessageTypeByName(
214+
method_desc.output_type.full_name
215+
)
216+
# response_message_desc = MessageFactory(self._desc_pool).GetPrototype(response_desc)
217+
response_message_desc = GetMessageClass(response_desc)
159218

160219
if method_desc.client_streaming and method_desc.server_streaming:
161-
setattr(self, method_name, channel.stream_stream(
162-
method_key,
163-
request_serializer=request_message_desc.SerializeToString,
164-
response_deserializer=response_message_desc.FromString
165-
))
220+
setattr(
221+
self,
222+
method_name,
223+
channel.stream_stream(
224+
method_key,
225+
request_serializer=request_message_desc.SerializeToString,
226+
response_deserializer=response_message_desc.FromString,
227+
),
228+
)
166229
elif method_desc.client_streaming and not method_desc.server_streaming:
167-
setattr(self, method_name, channel.stream_unary(
168-
method_key,
169-
request_serializer=request_message_desc.SerializeToString,
170-
response_deserializer=response_message_desc.FromString
171-
))
230+
setattr(
231+
self,
232+
method_name,
233+
channel.stream_unary(
234+
method_key,
235+
request_serializer=request_message_desc.SerializeToString,
236+
response_deserializer=response_message_desc.FromString,
237+
),
238+
)
172239
elif not method_desc.client_streaming and method_desc.server_streaming:
173-
setattr(self, method_name, channel.unary_stream(
174-
method_key,
175-
request_serializer=request_message_desc.SerializeToString,
176-
response_deserializer=response_message_desc.FromString
177-
))
240+
setattr(
241+
self,
242+
method_name,
243+
channel.unary_stream(
244+
method_key,
245+
request_serializer=request_message_desc.SerializeToString,
246+
response_deserializer=response_message_desc.FromString,
247+
),
248+
)
178249
else:
179-
setattr(self, method_name, channel.unary_unary(
180-
method_key,
181-
request_serializer=request_message_desc.SerializeToString,
182-
response_deserializer=response_message_desc.FromString
183-
))
250+
setattr(
251+
self,
252+
method_name,
253+
channel.unary_unary(
254+
method_key,
255+
request_serializer=request_message_desc.SerializeToString,
256+
response_deserializer=response_message_desc.FromString,
257+
),
258+
)
184259

185260

186261
class GRPCClient(object):
187-
188262
def __init__(self, channel, options, channel_key):
189263
self._request_map = {}
190264
self._api_resources = {}
@@ -193,7 +267,9 @@ def __init__(self, channel, options, channel_key):
193267
self._desc_pool = DescriptorPool(self._reflection_db)
194268
self._init_grpc_reflection()
195269

196-
_client_interceptor = _ClientInterceptor(options, channel_key, self._request_map)
270+
_client_interceptor = _ClientInterceptor(
271+
options, channel_key, self._request_map
272+
)
197273
_intercept_channel = grpc.intercept_channel(channel, _client_interceptor)
198274
self._bind_grpc_stub(_intercept_channel)
199275

@@ -206,9 +282,12 @@ def _init_grpc_reflection(self):
206282
service_desc: ServiceDescriptor = self._desc_pool.FindServiceByName(service)
207283
service_name = service_desc.name
208284
for method_desc in service_desc.methods:
209-
method_key = f'/{service}/{method_desc.name}'
210-
request_desc = self._desc_pool.FindMessageTypeByName(method_desc.input_type.full_name)
211-
self._request_map[method_key] = MessageFactory(self._desc_pool).GetPrototype(request_desc)
285+
method_key = f"/{service}/{method_desc.name}"
286+
request_desc = self._desc_pool.FindMessageTypeByName(
287+
method_desc.input_type.full_name
288+
)
289+
# self._request_map[method_key] = MessageFactory(self._desc_pool).GetPrototype(request_desc)
290+
self._request_map[method_key] = GetMessageClass(request_desc)
212291

213292
if service_desc.name not in self._api_resources:
214293
self._api_resources[service_name] = []
@@ -219,7 +298,11 @@ def _bind_grpc_stub(self, intercept_channel: grpc.Channel):
219298
for service in self._reflection_db.get_services():
220299
service_desc: ServiceDescriptor = self._desc_pool.FindServiceByName(service)
221300

222-
setattr(self, service_desc.name, _GRPCStub(self._desc_pool, service_desc, intercept_channel))
301+
setattr(
302+
self,
303+
service_desc.name,
304+
_GRPCStub(self._desc_pool, service_desc, intercept_channel),
305+
)
223306

224307

225308
def _create_secure_channel(endpoint, options):
@@ -245,8 +328,8 @@ def client(endpoint=None, ssl_enabled=False, max_message_length=None, **client_o
245328
options = []
246329

247330
if max_message_length:
248-
options.append(('grpc.max_send_message_length', max_message_length))
249-
options.append(('grpc.max_receive_message_length', max_message_length))
331+
options.append(("grpc.max_send_message_length", max_message_length))
332+
options.append(("grpc.max_receive_message_length", max_message_length))
250333

251334
if ssl_enabled:
252335
channel = _create_secure_channel(endpoint, options)
@@ -256,12 +339,14 @@ def client(endpoint=None, ssl_enabled=False, max_message_length=None, **client_o
256339
try:
257340
grpc.channel_ready_future(channel).result(timeout=3)
258341
except Exception as e:
259-
raise ERROR_GRPC_CONNECTION(channel=endpoint, message='Channel is not ready.')
342+
raise ERROR_GRPC_CONNECTION(
343+
channel=endpoint, message="Channel is not ready."
344+
)
260345

261346
try:
262347
_GRPC_CHANNEL[endpoint] = GRPCClient(channel, client_opts, endpoint)
263348
except Exception as e:
264-
if hasattr(e, 'details'):
349+
if hasattr(e, "details"):
265350
raise ERROR_GRPC_CONNECTION(channel=endpoint, message=e.details())
266351
else:
267352
raise ERROR_GRPC_CONNECTION(channel=endpoint, message=str(e))
@@ -271,12 +356,16 @@ def client(endpoint=None, ssl_enabled=False, max_message_length=None, **client_o
271356

272357
def get_grpc_method(uri_info):
273358
try:
274-
conn = client(endpoint=uri_info['endpoint'], ssl_enabled=uri_info['ssl_enabled'])
275-
return getattr(getattr(conn, uri_info['service']), uri_info['method'])
359+
conn = client(
360+
endpoint=uri_info["endpoint"], ssl_enabled=uri_info["ssl_enabled"]
361+
)
362+
return getattr(getattr(conn, uri_info["service"]), uri_info["method"])
276363

277364
except ERROR_BASE as e:
278365
raise e
279366
except Exception as e:
280-
raise ERROR_GRPC_CONFIGURATION(endpoint=uri_info.get('endpoint'),
281-
service=uri_info.get('service'),
282-
method=uri_info.get('method'))
367+
raise ERROR_GRPC_CONFIGURATION(
368+
endpoint=uri_info.get("endpoint"),
369+
service=uri_info.get("service"),
370+
method=uri_info.get("method"),
371+
)

0 commit comments

Comments
 (0)