Skip to content

Commit 920ddb6

Browse files
committed
Implement async variants of SR clients and serdes
1 parent cd95e33 commit 920ddb6

File tree

17 files changed

+5124
-5
lines changed

17 files changed

+5124
-5
lines changed

LICENSE

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -652,3 +652,36 @@ For the files wingetopt.c wingetopt.h downloaded from https://github.com/alex85k
652652
*/
653653

654654

655+
656+
LICENSE.unasync
657+
--------------------------------------------------------------
658+
For unasync code in setup.py, derived from
659+
https://github.com/encode/httpcore/blob/ae46dfbd4330eefaa9cd6ab1560dec18a1d0bcb8/scripts/unasync.py
660+
661+
Copyright © 2020, [Encode OSS Ltd](https://www.encode.io/).
662+
All rights reserved.
663+
664+
Redistribution and use in source and binary forms, with or without
665+
modification, are permitted provided that the following conditions are met:
666+
667+
* Redistributions of source code must retain the above copyright notice, this
668+
list of conditions and the following disclaimer.
669+
670+
* Redistributions in binary form must reproduce the above copyright notice,
671+
this list of conditions and the following disclaimer in the documentation
672+
and/or other materials provided with the distribution.
673+
674+
* Neither the name of the copyright holder nor the names of its
675+
contributors may be used to endorse or promote products derived from
676+
this software without specific prior written permission.
677+
678+
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
679+
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
680+
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
681+
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
682+
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
683+
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
684+
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
685+
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
686+
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
687+
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

pyproject.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,3 +73,6 @@ optional-dependencies.all = { file = [
7373
"requirements/requirements-avro.txt",
7474
"requirements/requirements-json.txt",
7575
"requirements/requirements-protobuf.txt"] }
76+
77+
[tool.pytest.ini_options]
78+
asyncio_mode = "auto"

requirements/requirements-tests.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,4 @@ pytest-timeout
88
requests-mock
99
respx
1010
pytest_cov
11+
pytest-asyncio

setup.py

Lines changed: 131 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
11
#!/usr/bin/env python
22

33
import os
4-
from setuptools import setup
5-
from distutils.core import Extension
64
import platform
5+
import re
6+
import sys
7+
from setuptools import setup
8+
from setuptools import Extension
9+
from setuptools.command.build_py import build_py as _build_py
10+
from pprint import pprint
711

812
work_dir = os.path.dirname(os.path.realpath(__file__))
913
mod_dir = os.path.join(work_dir, 'src', 'confluent_kafka')
@@ -25,4 +29,128 @@
2529
os.path.join(ext_dir, 'AdminTypes.c'),
2630
os.path.join(ext_dir, 'Admin.c')])
2731

28-
setup(ext_modules=[module])
32+
SUBS = [
33+
('from confluent_kafka.schema_registry.common import asyncinit', ''),
34+
('@asyncinit', ''),
35+
('import asyncio', ''),
36+
('asyncio.sleep', 'time.sleep'),
37+
38+
('Async([A-Z][A-Za-z0-9_]*)', r'\2'),
39+
('_Async([A-Z][A-Za-z0-9_]*)', r'_\2'),
40+
('async_([a-z][A-Za-z0-9_]*)', r'\2'),
41+
42+
('async def', 'def'),
43+
('await ', ''),
44+
('aclose', 'close'),
45+
('__aenter__', '__enter__'),
46+
('__aexit__', '__exit__'),
47+
('__aiter__', '__iter__'),
48+
]
49+
COMPILED_SUBS = [
50+
(re.compile(r'(^|\b)' + regex + r'($|\b)'), repl)
51+
for regex, repl in SUBS
52+
]
53+
54+
USED_SUBS = set()
55+
56+
def unasync_line(line):
57+
for index, (regex, repl) in enumerate(COMPILED_SUBS):
58+
old_line = line
59+
line = re.sub(regex, repl, line)
60+
if old_line != line:
61+
USED_SUBS.add(index)
62+
return line
63+
64+
65+
def unasync_file(in_path, out_path):
66+
with open(in_path, "r") as in_file:
67+
with open(out_path, "w", newline="") as out_file:
68+
for line in in_file.readlines():
69+
line = unasync_line(line)
70+
out_file.write(line)
71+
72+
73+
def unasync_file_check(in_path, out_path):
74+
with open(in_path, "r") as in_file:
75+
with open(out_path, "r") as out_file:
76+
for in_line, out_line in zip(in_file.readlines(), out_file.readlines()):
77+
expected = unasync_line(in_line)
78+
if out_line != expected:
79+
print(f'unasync mismatch between {in_path!r} and {out_path!r}')
80+
print(f'Async code: {in_line!r}')
81+
print(f'Expected sync code: {expected!r}')
82+
print(f'Actual sync code: {out_line!r}')
83+
sys.exit(1)
84+
85+
86+
def unasync_dir(in_dir, out_dir, check_only=False):
87+
for dirpath, dirnames, filenames in os.walk(in_dir):
88+
for filename in filenames:
89+
if not filename.endswith('.py'):
90+
continue
91+
rel_dir = os.path.relpath(dirpath, in_dir)
92+
in_path = os.path.normpath(os.path.join(in_dir, rel_dir, filename))
93+
out_path = os.path.normpath(os.path.join(out_dir, rel_dir, filename))
94+
print(in_path, '->', out_path)
95+
if check_only:
96+
unasync_file_check(in_path, out_path)
97+
else:
98+
unasync_file(in_path, out_path)
99+
100+
def unasync():
101+
unasync_dir(
102+
"src/confluent_kafka/schema_registry/_async",
103+
"src/confluent_kafka/schema_registry/_sync",
104+
check_only=False
105+
)
106+
unasync_dir(
107+
"tests/integration/schema_registry/_async",
108+
"tests/integration/schema_registry/_sync",
109+
check_only=False
110+
)
111+
112+
113+
if len(USED_SUBS) != len(SUBS):
114+
unused_subs = [SUBS[i] for i in range(len(SUBS)) if i not in USED_SUBS]
115+
116+
print("These patterns were not used:")
117+
pprint(unused_subs)
118+
119+
120+
class build_py(_build_py):
121+
"""
122+
Subclass build_py from setuptools to modify its behavior.
123+
124+
Convert files in _async dir from being asynchronous to synchronous
125+
and saves them to the specified output directory.
126+
"""
127+
128+
def run(self):
129+
self._updated_files = []
130+
131+
# Base class code
132+
if self.py_modules:
133+
self.build_modules()
134+
if self.packages:
135+
self.build_packages()
136+
self.build_package_data()
137+
138+
# Our modification
139+
unasync()
140+
141+
# Remaining base class code
142+
self.byte_compile(self.get_outputs(include_bytecode=0))
143+
144+
def build_module(self, module, module_file, package):
145+
outfile, copied = super().build_module(module, module_file, package)
146+
if copied:
147+
self._updated_files.append(outfile)
148+
return outfile, copied
149+
150+
151+
setup(
152+
ext_modules=[module],
153+
cmdclass={
154+
'build_py': build_py,
155+
}
156+
)

src/confluent_kafka/schema_registry/__init__.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
Schema,
3232
SchemaRegistryClient,
3333
SchemaRegistryError,
34-
AsyncSchemaRegistryClient,
3534
SchemaReference,
3635
ServerConfig
3736
)
@@ -57,7 +56,6 @@
5756
"RuleSet",
5857
"Schema",
5958
"SchemaRegistryClient",
60-
"AsyncSchemaRegistryClient"
6159
"SchemaRegistryError",
6260
"SchemaReference",
6361
"ServerConfig",
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*-
3+
#
4+
# Copyright 2020 Confluent Inc.
5+
#
6+
# Licensed under the Apache License, Version 2.0 (the "License");
7+
# you may not use this file except in compliance with the License.
8+
# You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
#

0 commit comments

Comments
 (0)