Skip to content

Commit 0255917

Browse files
committed
make schema id serializer changes to async
1 parent 6865398 commit 0255917

File tree

8 files changed

+356
-381
lines changed

8 files changed

+356
-381
lines changed
Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +0,0 @@
1-
#!/usr/bin/env python
2-
# -*- coding: utf-8 -*-
3-
#
4-
# Copyright 2020 Confluent Inc.
5-
#
6-
# Licensed under the Apache License, Version 2.0 (the "License");
7-
# you may not use this file except in compliance with the License.
8-
# You may obtain a copy of the License at
9-
#
10-
# http://www.apache.org/licenses/LICENSE-2.0
11-
#
12-
# Unless required by applicable law or agreed to in writing, software
13-
# distributed under the License is distributed on an "AS IS" BASIS,
14-
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15-
# See the License for the specific language governing permissions and
16-
# limitations under the License.
17-
#

src/confluent_kafka/schema_registry/_async/avro.py

Lines changed: 91 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -20,27 +20,30 @@
2020
from typing import Dict, Union, Optional, Callable
2121

2222
from fastavro import schemaless_reader, schemaless_writer
23-
24-
from confluent_kafka.schema_registry.common.avro import AvroSchema, _schema_loads, get_inline_tags, parse_schema_with_repo, transform
23+
from confluent_kafka.schema_registry.common import asyncinit
24+
from confluent_kafka.schema_registry.common.avro import AvroSchema, _schema_loads, \
25+
get_inline_tags, parse_schema_with_repo, transform, _ContextStringIO, AVRO_TYPE
2526

2627
from confluent_kafka.schema_registry import (_MAGIC_BYTE,
27-
Schema,
28-
topic_subject_name_strategy,
29-
RuleMode,
30-
AsyncSchemaRegistryClient)
28+
Schema,
29+
topic_subject_name_strategy,
30+
RuleMode,
31+
AsyncSchemaRegistryClient,
32+
prefix_schema_id_serializer,
33+
dual_schema_id_deserializer)
3134
from confluent_kafka.serialization import (SerializationError,
3235
SerializationContext)
33-
from confluent_kafka.schema_registry.common import asyncinit
34-
from confluent_kafka.schema_registry.common import _ContextStringIO
3536
from confluent_kafka.schema_registry.rule_registry import RuleRegistry
36-
from confluent_kafka.schema_registry.serde import AsyncBaseSerializer, AsyncBaseDeserializer, ParsedSchemaCache
37+
from confluent_kafka.schema_registry.serde import AsyncBaseSerializer, AsyncBaseDeserializer, ParsedSchemaCache, SchemaId
38+
3739

3840
__all__ = [
3941
'_resolve_named_schema',
4042
'AsyncAvroSerializer',
4143
'AsyncAvroDeserializer',
4244
]
4345

46+
4447
async def _resolve_named_schema(
4548
schema: Schema, schema_registry_client: AsyncSchemaRegistryClient
4649
) -> Dict[str, AvroSchema]:
@@ -113,6 +116,12 @@ class AsyncAvroSerializer(AsyncBaseSerializer):
113116
| | | |
114117
| | | Defaults to topic_subject_name_strategy. |
115118
+-----------------------------+----------+--------------------------------------------------+
119+
| | | Callable(bytes, SerializationContext, schema_id) |
120+
| | | -> bytes |
121+
| | | |
122+
| ``schema.id.serializer`` | callable | Defines how the schema id/guid is serialized. |
123+
| | | Defaults to prefix_schema_id_serializer. |
124+
+-----------------------------+----------+--------------------------------------------------+
116125
117126
Schemas are registered against subject names in Confluent Schema Registry that
118127
define a scope in which the schemas can be evolved. By default, the subject name
@@ -172,7 +181,8 @@ class AsyncAvroSerializer(AsyncBaseSerializer):
172181
'use.schema.id': None,
173182
'use.latest.version': False,
174183
'use.latest.with.metadata': None,
175-
'subject.name.strategy': topic_subject_name_strategy}
184+
'subject.name.strategy': topic_subject_name_strategy,
185+
'schema.id.serializer': prefix_schema_id_serializer}
176186

177187
async def __init__(
178188
self,
@@ -234,6 +244,10 @@ async def __init__(
234244
self._subject_name_func = conf_copy.pop('subject.name.strategy')
235245
if not callable(self._subject_name_func):
236246
raise ValueError("subject.name.strategy must be callable")
247+
248+
self._schema_id_deserializer = conf_copy.pop('schema.id.deserializer')
249+
if not callable(self._schema_id_deserializer):
250+
raise ValueError("schema.id.deserializer must be callable")
237251

238252
if len(conf_copy) > 0:
239253
raise ValueError("Unrecognized properties: {}"
@@ -297,19 +311,20 @@ async def __serialize(self, obj: object, ctx: Optional[SerializationContext] = N
297311
subject = self._subject_name_func(ctx, self._schema_name)
298312
latest_schema = await self._get_reader_schema(subject)
299313
if latest_schema is not None:
300-
self._schema_id = latest_schema.schema_id
314+
self._schema_id = SchemaId(AVRO_TYPE, latest_schema.schema_id, latest_schema.guid)
301315
elif subject not in self._known_subjects:
302316
# Check to ensure this schema has been registered under subject_name.
303317
if self._auto_register:
304318
# The schema name will always be the same. We can't however register
305319
# a schema without a subject so we set the schema_id here to handle
306320
# the initial registration.
307-
self._schema_id = await self._registry.register_schema(
321+
registered_schema = await self._registry.register_schema_full_response(
308322
subject, self._schema, self._normalize_schemas)
323+
self._schema_id = SchemaId(AVRO_TYPE, registered_schema.schema_id, registered_schema.guid)
309324
else:
310325
registered_schema = await self._registry.lookup_schema(
311326
subject, self._schema, self._normalize_schemas)
312-
self._schema_id = registered_schema.schema_id
327+
self._schema_id = SchemaId(AVRO_TYPE, registered_schema.schema_id, registered_schema.guid)
313328

314329
self._known_subjects.add(subject)
315330

@@ -320,7 +335,7 @@ async def __serialize(self, obj: object, ctx: Optional[SerializationContext] = N
320335

321336
if latest_schema is not None:
322337
parsed_schema = await self._get_parsed_schema(latest_schema.schema)
323-
field_transformer = lambda rule_ctx, field_transform, msg: ( # noqa: E731
338+
def field_transformer(rule_ctx, field_transform, msg): return ( # noqa: E731
324339
transform(rule_ctx, parsed_schema, msg, field_transform))
325340
value = self._execute_rules(ctx, subject, RuleMode.WRITE, None,
326341
latest_schema.schema, value, get_inline_tags(parsed_schema),
@@ -334,7 +349,7 @@ async def __serialize(self, obj: object, ctx: Optional[SerializationContext] = N
334349
# write the record to the rest of the buffer
335350
schemaless_writer(fo, parsed_schema, value)
336351

337-
return fo.getvalue()
352+
return self._schema_id_serializer(fo.getvalue(), ctx, self._schema_id)
338353

339354
async def _get_parsed_schema(self, schema: Schema) -> AvroSchema:
340355
parsed_schema = self._parsed_schemas.get_parsed_schema(schema)
@@ -378,7 +393,12 @@ class AsyncAvroDeserializer(AsyncBaseDeserializer):
378393
| | | |
379394
| | | Defaults to topic_subject_name_strategy. |
380395
+-----------------------------+----------+--------------------------------------------------+
381-
396+
| | | Callable(bytes, SerializationContext, schema_id) |
397+
| | | -> io.BytesIO |
398+
| | | |
399+
| ``schema.id.deserializer`` | callable | Defines how the schema id/guid is deserialized. |
400+
| | | Defaults to dual_schema_id_deserializer. |
401+
+-----------------------------+----------+--------------------------------------------------+
382402
Note:
383403
By default, Avro complex types are returned as dicts. This behavior can
384404
be overridden by registering a callable ``from_dict`` with the deserializer to
@@ -415,7 +435,8 @@ class AsyncAvroDeserializer(AsyncBaseDeserializer):
415435

416436
_default_conf = {'use.latest.version': False,
417437
'use.latest.with.metadata': None,
418-
'subject.name.strategy': topic_subject_name_strategy}
438+
'subject.name.strategy': topic_subject_name_strategy,
439+
'schema.id.deserializer': dual_schema_id_deserializer}
419440

420441
async def __init__(
421442
self,
@@ -460,6 +481,11 @@ async def __init__(
460481
if not callable(self._subject_name_func):
461482
raise ValueError("subject.name.strategy must be callable")
462483

484+
self._schema_id_serializer = conf_copy.pop('schema.id.serializer')
485+
if not callable(self._schema_id_serializer):
486+
raise ValueError("schema.id.serializer must be callable")
487+
488+
463489
if len(conf_copy) > 0:
464490
raise ValueError("Unrecognized properties: {}"
465491
.format(", ".join(conf_copy.keys())))
@@ -513,61 +539,61 @@ async def __deserialize(self, data: bytes, ctx: Optional[SerializationContext] =
513539
"message was not produced with a Confluent "
514540
"Schema Registry serializer".format(len(data)))
515541

516-
subject = self._subject_name_func(ctx, None)
542+
subject = self._subject_name_func(ctx, None) if ctx else None
517543
latest_schema = None
518544
if subject is not None:
519545
latest_schema = await self._get_reader_schema(subject)
520546

521-
with _ContextStringIO(data) as payload:
522-
magic, schema_id = unpack('>bI', payload.read(5))
523-
if magic != _MAGIC_BYTE:
524-
raise SerializationError("Unexpected magic byte {}. This message "
525-
"was not produced with a Confluent "
526-
"Schema Registry serializer".format(magic))
527-
528-
writer_schema_raw = await self._registry.get_schema(schema_id)
529-
writer_schema = await self._get_parsed_schema(writer_schema_raw)
530-
531-
if subject is None:
532-
subject = self._subject_name_func(ctx, writer_schema.get("name"))
533-
if subject is not None:
534-
latest_schema = await self._get_reader_schema(subject)
535-
536-
if latest_schema is not None:
537-
migrations = self._get_migrations(subject, writer_schema_raw, latest_schema, None)
538-
reader_schema_raw = latest_schema.schema
539-
reader_schema = await self._get_parsed_schema(latest_schema.schema)
540-
elif self._schema is not None:
541-
migrations = None
542-
reader_schema_raw = self._schema
543-
reader_schema = self._reader_schema
544-
else:
545-
migrations = None
546-
reader_schema_raw = writer_schema_raw
547-
reader_schema = writer_schema
548-
549-
if migrations:
550-
obj_dict = schemaless_reader(payload,
551-
writer_schema,
552-
None,
553-
self._return_record_name)
554-
obj_dict = self._execute_migrations(ctx, subject, migrations, obj_dict)
555-
else:
556-
obj_dict = schemaless_reader(payload,
557-
writer_schema,
558-
reader_schema,
559-
self._return_record_name)
547+
schema_id = SchemaId(AVRO_TYPE)
548+
payload = self._schema_id_deserializer(data, ctx, schema_id)
549+
550+
writer_schema_raw = self._get_writer_schema(schema_id, subject)
551+
writer_schema = self._get_parsed_schema(writer_schema_raw)
552+
553+
if subject is None:
554+
subject = self._subject_name_func(ctx, writer_schema.get("name")) if ctx else None
555+
if subject is not None:
556+
latest_schema = await self._get_reader_schema(subject)
557+
558+
if latest_schema is not None:
559+
migrations = self._get_migrations(subject, writer_schema_raw, latest_schema, None)
560+
reader_schema_raw = latest_schema.schema
561+
reader_schema = await self._get_parsed_schema(latest_schema.schema)
562+
elif self._schema is not None:
563+
migrations = None
564+
reader_schema_raw = self._schema
565+
reader_schema = self._reader_schema
566+
else:
567+
migrations = None
568+
reader_schema_raw = writer_schema_raw
569+
reader_schema = writer_schema
570+
571+
if migrations:
572+
obj_dict = schemaless_reader(payload,
573+
writer_schema,
574+
None,
575+
self._return_record_name)
576+
obj_dict = self._execute_migrations(ctx, subject, migrations, obj_dict)
577+
else:
578+
obj_dict = schemaless_reader(payload,
579+
writer_schema,
580+
reader_schema,
581+
self._return_record_name)
582+
583+
584+
585+
560586

561-
field_transformer = lambda rule_ctx, field_transform, message: ( # noqa: E731
562-
transform(rule_ctx, reader_schema, message, field_transform))
563-
obj_dict = self._execute_rules(ctx, subject, RuleMode.READ, None,
564-
reader_schema_raw, obj_dict, get_inline_tags(reader_schema),
565-
field_transformer)
587+
field_transformer = lambda rule_ctx, field_transform, message: ( # noqa: E731
588+
transform(rule_ctx, reader_schema, message, field_transform))
589+
obj_dict = self._execute_rules(ctx, subject, RuleMode.READ, None,
590+
reader_schema_raw, obj_dict, get_inline_tags(reader_schema),
591+
field_transformer)
566592

567-
if self._from_dict is not None:
568-
return self._from_dict(obj_dict, ctx)
593+
if self._from_dict is not None:
594+
return self._from_dict(obj_dict, ctx)
569595

570-
return obj_dict
596+
return obj_dict
571597

572598
async def _get_parsed_schema(self, schema: Schema) -> AvroSchema:
573599
parsed_schema = self._parsed_schemas.get_parsed_schema(schema)

0 commit comments

Comments
 (0)