Skip to content

Commit 3f11b3b

Browse files
authored
Fix various type hints (#1903)
* Fix type hints * Minor cleanup
1 parent 01fe4dd commit 3f11b3b

File tree

8 files changed

+58
-49
lines changed

8 files changed

+58
-49
lines changed

pyproject.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ email = "support@confluent.io"
2424
[project.urls]
2525
Homepage = "https://github.com/confluentinc/confluent-kafka-python"
2626

27+
[tool.mypy]
28+
ignore_missing_imports = true
29+
2730
[tool.setuptools]
2831
include-package-data = false
2932

src/confluent_kafka/schema_registry/json_schema.py

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ def _retrieve_via_httpx(uri: str):
7878

7979
def _resolve_named_schema(
8080
schema: Schema, schema_registry_client: SchemaRegistryClient,
81-
ref_registry: Registry = None
81+
ref_registry: Optional[Registry] = None
8282
) -> Registry:
8383
"""
8484
Resolves named schemas referenced by the provided schema recursively.
@@ -222,10 +222,10 @@ def __init__(
222222
self,
223223
schema_str: Union[str, Schema, None],
224224
schema_registry_client: SchemaRegistryClient,
225-
to_dict: Callable[[object, SerializationContext], dict] = None,
226-
conf: dict = None,
227-
rule_conf: dict = None,
228-
rule_registry: RuleRegistry = None
225+
to_dict: Optional[Callable[[object, SerializationContext], dict]] = None,
226+
conf: Optional[dict] = None,
227+
rule_conf: Optional[dict] = None,
228+
rule_registry: Optional[RuleRegistry] = None
229229
):
230230
super().__init__()
231231
if isinstance(schema_str, str):
@@ -296,7 +296,7 @@ def __init__(
296296
rule.configure(self._registry.config() if self._registry else {},
297297
rule_conf if rule_conf else {})
298298

299-
def __call__(self, obj: object, ctx: SerializationContext = None) -> Optional[bytes]:
299+
def __call__(self, obj: object, ctx: Optional[SerializationContext] = None) -> Optional[bytes]:
300300
"""
301301
Serializes an object to JSON, prepending it with Confluent Schema Registry
302302
framing.
@@ -452,11 +452,11 @@ class JSONDeserializer(BaseDeserializer):
452452
def __init__(
453453
self,
454454
schema_str: Union[str, Schema, None],
455-
from_dict: Callable[[dict, SerializationContext], object] = None,
456-
schema_registry_client: SchemaRegistryClient = None,
457-
conf: dict = None,
458-
rule_conf: dict = None,
459-
rule_registry: RuleRegistry = None
455+
from_dict: Optional[Callable[[dict, SerializationContext], object]] = None,
456+
schema_registry_client: Optional[SchemaRegistryClient] = None,
457+
conf: Optional[dict] = None,
458+
rule_conf: Optional[dict] = None,
459+
rule_registry: Optional[RuleRegistry] = None
460460
):
461461
super().__init__()
462462
if isinstance(schema_str, str):
@@ -520,7 +520,7 @@ def __init__(
520520
rule.configure(self._registry.config() if self._registry else {},
521521
rule_conf if rule_conf else {})
522522

523-
def __call__(self, data: bytes, ctx: SerializationContext = None) -> Union[dict, object, None]:
523+
def __call__(self, data: bytes, ctx: Optional[SerializationContext] = None) -> Union[dict, object, None]:
524524
"""
525525
Deserialize a JSON encoded record with Confluent Schema Registry framing to
526526
a dict, or object instance according to from_dict if from_dict is specified.
@@ -703,7 +703,7 @@ def _transform_field(
703703

704704
def _validate_subschemas(
705705
subschemas: List[JsonSchema],
706-
message: str
706+
message: JsonMessage
707707
) -> Optional[JsonSchema]:
708708
for subschema in subschemas:
709709
try:

src/confluent_kafka/schema_registry/mock_schema_registry_client.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -171,8 +171,8 @@ def register_schema_full_response(
171171
return registered_schema
172172

173173
def get_schema(
174-
self, schema_id: int, subject_name: str = None,
175-
fmt: str = None
174+
self, schema_id: int, subject_name: Optional[str] = None,
175+
fmt: Optional[str] = None
176176
) -> 'Schema':
177177
schema = self._store.get_schema(schema_id)
178178
if schema is not None:
@@ -197,7 +197,7 @@ def get_subjects(self) -> List[str]:
197197
def delete_subject(self, subject_name: str, permanent: bool = False) -> List[int]:
198198
return self._store.remove_by_subject(subject_name)
199199

200-
def get_latest_version(self, subject_name: str, fmt: str = None) -> 'RegisteredSchema':
200+
def get_latest_version(self, subject_name: str, fmt: Optional[str] = None) -> 'RegisteredSchema':
201201
registered_schema = self._store.get_latest_version(subject_name)
202202
if registered_schema is not None:
203203
return registered_schema
@@ -206,7 +206,7 @@ def get_latest_version(self, subject_name: str, fmt: str = None) -> 'RegisteredS
206206

207207
def get_latest_with_metadata(
208208
self, subject_name: str, metadata: Dict[str, str],
209-
deleted: bool = False, fmt: str = None
209+
deleted: bool = False, fmt: Optional[str] = None
210210
) -> 'RegisteredSchema':
211211
registered_schema = self._store.get_latest_with_metadata(subject_name, metadata)
212212
if registered_schema is not None:
@@ -216,7 +216,7 @@ def get_latest_with_metadata(
216216

217217
def get_version(
218218
self, subject_name: str, version: int,
219-
deleted: bool = False, fmt: str = None
219+
deleted: bool = False, fmt: Optional[str] = None
220220
) -> 'RegisteredSchema':
221221
registered_schema = self._store.get_version(subject_name, version)
222222
if registered_schema is not None:

src/confluent_kafka/schema_registry/protobuf.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ def _resolve_named_schema(
187187
schema: Schema,
188188
schema_registry_client: SchemaRegistryClient,
189189
pool: DescriptorPool,
190-
visited: Set[str] = None
190+
visited: Optional[Set[str]] = None
191191
):
192192
"""
193193
Resolves named schemas referenced by the provided schema recursively.
@@ -373,9 +373,9 @@ def __init__(
373373
self,
374374
msg_type: Message,
375375
schema_registry_client: SchemaRegistryClient,
376-
conf: dict = None,
377-
rule_conf: dict = None,
378-
rule_registry: RuleRegistry = None
376+
conf: Optional[dict] = None,
377+
rule_conf: Optional[dict] = None,
378+
rule_registry: Optional[RuleRegistry] = None
379379
):
380380
super().__init__()
381381

@@ -528,7 +528,7 @@ def _resolve_dependencies(
528528
reference.version))
529529
return schema_refs
530530

531-
def __call__(self, message: Message, ctx: SerializationContext = None) -> Optional[bytes]:
531+
def __call__(self, message: Message, ctx: Optional[SerializationContext] = None) -> Optional[bytes]:
532532
"""
533533
Serializes an instance of a class derived from Protobuf Message, and prepends
534534
it with Confluent Schema Registry framing.
@@ -675,10 +675,10 @@ class ProtobufDeserializer(BaseDeserializer):
675675
def __init__(
676676
self,
677677
message_type: Message,
678-
conf: dict = None,
679-
schema_registry_client: SchemaRegistryClient = None,
680-
rule_conf: dict = None,
681-
rule_registry: RuleRegistry = None
678+
conf: Optional[dict] = None,
679+
schema_registry_client: Optional[SchemaRegistryClient] = None,
680+
rule_conf: Optional[dict] = None,
681+
rule_registry: Optional[RuleRegistry] = None
682682
):
683683
super().__init__()
684684

@@ -812,7 +812,7 @@ def _read_index_array(buf: io.BytesIO, zigzag: bool = True) -> List[int]:
812812

813813
return msg_index
814814

815-
def __call__(self, data: bytes, ctx: SerializationContext = None) -> Optional[Message]:
815+
def __call__(self, data: bytes, ctx: Optional[SerializationContext] = None) -> Optional[Message]:
816816
"""
817817
Deserialize a serialized protobuf message with Confluent Schema Registry
818818
framing.

src/confluent_kafka/schema_registry/rules/encryption/dek_registry/dek_registry_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -471,7 +471,7 @@ def config(self):
471471

472472
def register_kek(
473473
self, name: str, kms_type: str, kms_key_id: str,
474-
shared: bool = False, kms_props: Dict[str, str] = None, doc: str = None
474+
shared: bool = False, kms_props: Optional[Dict[str, str]] = None, doc: Optional[str] = None
475475
) -> Kek:
476476
"""
477477
Register a new Key Encryption Key (KEK) with the DEK Registry.

src/confluent_kafka/schema_registry/rules/encryption/dek_registry/mock_dek_registry_client.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
#
1818

1919
import time
20-
from typing import Dict
20+
from typing import Dict, Optional
2121

2222
from confluent_kafka.schema_registry import SchemaRegistryError
2323
from confluent_kafka.schema_registry.rules.encryption.dek_registry.dek_registry_client import \
@@ -39,7 +39,7 @@ def __init__(self, conf: dict):
3939

4040
def register_kek(
4141
self, name: str, kms_type: str, kms_key_id: str,
42-
shared: bool = False, kms_props: Dict[str, str] = None, doc: str = None
42+
shared: bool = False, kms_props: Optional[Dict[str, str]] = None, doc: Optional[str] = None
4343
) -> Kek:
4444
cache_key = KekId(name=name, deleted=False)
4545
kek = self._kek_cache.get_kek(cache_key)

src/confluent_kafka/schema_registry/schema_registry_client.py

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -175,16 +175,16 @@ def __init__(self, conf: dict):
175175
raise ValueError("Unrecognized properties: {}"
176176
.format(", ".join(conf_copy.keys())))
177177

178-
def get(self, url: str, query: dict = None) -> Any:
178+
def get(self, url: str, query: Optional[dict] = None) -> Any:
179179
raise NotImplementedError()
180180

181-
def post(self, url: str, body: dict, **kwargs) -> Any:
181+
def post(self, url: str, body: Optional[dict], **kwargs) -> Any:
182182
raise NotImplementedError()
183183

184184
def delete(self, url: str) -> Any:
185185
raise NotImplementedError()
186186

187-
def put(self, url: str, body: dict = None) -> Any:
187+
def put(self, url: str, body: Optional[dict] = None) -> Any:
188188
raise NotImplementedError()
189189

190190

@@ -209,21 +209,21 @@ def __init__(self, conf: dict):
209209
timeout=self.timeout
210210
)
211211

212-
def get(self, url: str, query: dict = None) -> Any:
212+
def get(self, url: str, query: Optional[dict] = None) -> Any:
213213
return self.send_request(url, method='GET', query=query)
214214

215-
def post(self, url: str, body: dict, **kwargs) -> Any:
215+
def post(self, url: str, body: Optional[dict], **kwargs) -> Any:
216216
return self.send_request(url, method='POST', body=body)
217217

218218
def delete(self, url: str) -> Any:
219219
return self.send_request(url, method='DELETE')
220220

221-
def put(self, url: str, body: dict = None) -> Any:
221+
def put(self, url: str, body: Optional[dict] = None) -> Any:
222222
return self.send_request(url, method='PUT', body=body)
223223

224224
def send_request(
225-
self, url: str, method: str, body: dict = None,
226-
query: dict = None
225+
self, url: str, method: str, body: Optional[dict] = None,
226+
query: Optional[dict] = None
227227
) -> Any:
228228
"""
229229
Sends HTTP request to the SchemaRegistry, trying each base URL in turn.
@@ -284,8 +284,8 @@ def send_request(
284284
+ str(response.content))
285285

286286
def send_http_request(
287-
self, base_url: str, url: str, method: str, headers: dict,
288-
body: Optional[str] = None, query: dict = None
287+
self, base_url: str, url: str, method: str, headers: Optional[dict],
288+
body: Optional[str] = None, query: Optional[dict] = None
289289
) -> Response:
290290
"""
291291
Sends HTTP request to the SchemaRegistry.
@@ -312,6 +312,7 @@ def send_http_request(
312312
Returns:
313313
Response: Schema Registry response content.
314314
"""
315+
response = None
315316
for i in range(self.max_retries + 1):
316317
response = self.session.request(
317318
method, url="/".join([base_url, url]),
@@ -324,6 +325,7 @@ def send_http_request(
324325
return response
325326

326327
time.sleep(full_jitter(self.retries_wait_ms, self.retries_max_wait_ms, i) / 1000)
328+
return response
327329

328330

329331
def is_success(status_code: int) -> bool:
@@ -691,7 +693,7 @@ def register_schema_full_response(
691693
return registered_schema
692694

693695
def get_schema(
694-
self, schema_id: int, subject_name: str = None, fmt: str = None
696+
self, schema_id: int, subject_name: Optional[str] = None, fmt: Optional[str] = None
695697
) -> 'Schema':
696698
"""
697699
Fetches the schema associated with ``schema_id`` from the
@@ -817,7 +819,7 @@ def delete_subject(self, subject_name: str, permanent: bool = False) -> List[int
817819
return versions
818820

819821
def get_latest_version(
820-
self, subject_name: str, fmt: str = None
822+
self, subject_name: str, fmt: Optional[str] = None
821823
) -> 'RegisteredSchema':
822824
"""
823825
Retrieves latest registered version for subject
@@ -853,7 +855,7 @@ def get_latest_version(
853855

854856
def get_latest_with_metadata(
855857
self, subject_name: str, metadata: Dict[str, str],
856-
deleted: bool = False, fmt: str = None
858+
deleted: bool = False, fmt: Optional[str] = None
857859
) -> 'RegisteredSchema':
858860
"""
859861
Retrieves latest registered version for subject with the given metadata
@@ -893,7 +895,7 @@ def get_latest_with_metadata(
893895

894896
def get_version(
895897
self, subject_name: str, version: int,
896-
deleted: bool = False, fmt: str = None
898+
deleted: bool = False, fmt: Optional[str] = None
897899
) -> 'RegisteredSchema':
898900
"""
899901
Retrieves a specific schema registered under ``subject_name``.
@@ -981,7 +983,7 @@ def delete_version(self, subject_name: str, version: int, permanent: bool = Fals
981983

982984
return response
983985

984-
def set_compatibility(self, subject_name: Optional[str] = None, level: str = None) -> str:
986+
def set_compatibility(self, subject_name: Optional[str] = None, level: Optional[str] = None) -> str:
985987
"""
986988
Update global or subject level compatibility level.
987989
@@ -1068,7 +1070,10 @@ def test_compatibility(
10681070

10691071
return response['is_compatible']
10701072

1071-
def set_config(self, subject_name: Optional[str] = None, config: 'ServerConfig' = None) -> 'ServerConfig':
1073+
def set_config(
1074+
self, subject_name: Optional[str] = None,
1075+
config: Optional['ServerConfig'] = None
1076+
) -> 'ServerConfig':
10721077
"""
10731078
Update global or subject config.
10741079

src/confluent_kafka/schema_registry/serde.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,7 @@ class BaseSerde(object):
272272
'_registry', '_rule_registry', '_subject_name_func',
273273
'_field_transformer']
274274

275-
def _get_reader_schema(self, subject: str, fmt: str = None) -> Optional[RegisteredSchema]:
275+
def _get_reader_schema(self, subject: str, fmt: Optional[str] = None) -> Optional[RegisteredSchema]:
276276
if self._use_latest_with_metadata is not None:
277277
return self._registry.get_latest_with_metadata(
278278
subject, self._use_latest_with_metadata, True, fmt)
@@ -427,6 +427,7 @@ def _has_rules(self, rule_set: RuleSet, mode: RuleMode) -> bool:
427427
for rule in rule_set.domain_rules or [])
428428
elif mode == RuleMode.WRITEREAD:
429429
return any(rule.mode == mode for rule in rule_set.migration_rules or [])
430+
return False
430431

431432
def _get_migrations(
432433
self, subject: str, source_info: Schema,
@@ -464,7 +465,7 @@ def _get_migrations(
464465

465466
def _get_schemas_between(
466467
self, subject: str, first: RegisteredSchema,
467-
last: RegisteredSchema, fmt: str = None
468+
last: RegisteredSchema, fmt: Optional[str] = None
468469
) -> List[RegisteredSchema]:
469470
if last.version - first.version <= 1:
470471
return [first, last]

0 commit comments

Comments
 (0)