Skip to content

Implement Async Schema Registry client #1965

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 36 commits into
base: master
Choose a base branch
from

Conversation

rohitsanj
Copy link
Contributor

@rohitsanj rohitsanj commented Apr 10, 2025

What

  • Converted the existing sync Schema Registry clients to be made async and placed them in the _async module.
  • The code was converted such that it could be made sync by simply applying some regular expression driven replace logic (like await func(x, y, z) -> func(x, y, z), AsyncXYZ -> XYZ and so on). Going forward, all diffs must be solely made to the _async module, the sync clients will be derived from the async clients.
  • The same treatment is applied to the tests, any new async test written must work with async keywords removed.

#1971 must be merged before this one.

unasync.py script

Scenario 1: When changes are made to the _async directory but not to its _sync counterpart:

$ python tools/unasync.py --check

⚠️  Detected changes to a _sync directory that are uncommitted.

Files with differences:
  - src/confluent_kafka/schema_registry/_sync/serde.py

Please either:
1. Commit the changes in the generated _sync files, or
2. Revert the changes in the original _async files

Scenario 2: When changes across _async and _sync are consistent.

$ python tools/unasync.py --check

✅ All _sync directories are up to date!

Checklist

  • Contains customer facing changes? Including API/behavior changes
  • Did you add sufficient unit test and/or integration test coverage for this PR?
    • If not, please explain why it is not required

References

JIRA:

Test & Review

Open questions / Follow-ups

@confluent-cla-assistant
Copy link

🎉 All Contributor License Agreements have been signed. Ready to merge.
Please push an empty commit if you would like to re-run the checks to verify CLA status for all contributors.

@airlock-confluentinc airlock-confluentinc bot force-pushed the add-async-compat-schema-registry branch from 30156d9 to 920ddb6 Compare April 17, 2025 21:40
@rohitsanj rohitsanj changed the base branch from master to refactor-for-async-support April 17, 2025 21:42
@airlock-confluentinc airlock-confluentinc bot force-pushed the refactor-for-async-support branch from cd95e33 to bd445fd Compare April 17, 2025 21:59
@airlock-confluentinc airlock-confluentinc bot force-pushed the add-async-compat-schema-registry branch from 036451a to 1fe184e Compare April 21, 2025 18:22
…support

	src/confluent_kafka/schema_registry/avro.py (change applied to src/confluent_kafka/schema_registry/_sync/avro.py)
@rohitsanj rohitsanj mentioned this pull request Apr 23, 2025
2 tasks
@airlock-confluentinc airlock-confluentinc bot force-pushed the add-async-compat-schema-registry branch from 1fe184e to 0255917 Compare May 27, 2025 23:09
@airlock-confluentinc airlock-confluentinc bot force-pushed the add-async-compat-schema-registry branch from 0255917 to 3181809 Compare May 28, 2025 04:16
@airlock-confluentinc airlock-confluentinc bot force-pushed the add-async-compat-schema-registry branch from 3181809 to 082df8f Compare May 28, 2025 18:38
@rohitsanj rohitsanj marked this pull request as ready for review May 28, 2025 23:30
@rohitsanj rohitsanj requested review from a team as code owners May 28, 2025 23:30
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Async implementation of the Kafka consumer -- used only in tests and not surfaced as a public API.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Async implementation of the Kafka producer -- used only in tests and not surfaced as a public API.

Comment on lines +32 to +35
per-file-ignores =
./src/confluent_kafka/schema_registry/_sync/avro.py: E303
./src/confluent_kafka/schema_registry/_sync/json_schema.py: E303
./src/confluent_kafka/schema_registry/_sync/protobuf.py: E303
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With @asyncinit in async files replaced with an empty string in the sync files, this leaves three empty lines separating the classes which is a flake8 violation.

@airlock-confluentinc airlock-confluentinc bot force-pushed the add-async-compat-schema-registry branch from c5c548e to dac2bef Compare May 29, 2025 23:58
@rohitsanj rohitsanj changed the title Implement asyncio-compatible Schema Registry client Implement Async Schema Registry client May 30, 2025
@sonarqube-confluent

This comment has been minimized.

@sonarqube-confluent

This comment has been minimized.

rayokota
rayokota previously approved these changes May 30, 2025
Copy link
Member

@rayokota rayokota left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @rohitsanj , LGTM

Base automatically changed from refactor-for-async-support to master May 30, 2025 19:56
@rohitsanj rohitsanj dismissed rayokota’s stale review May 30, 2025 19:56

The base branch was changed.

@sonarqube-confluent
Copy link

Failed

  • 51.40% Coverage on New Code (is less than 80.00%)

Analysis Details

53 Issues

  • Bug 0 Bugs
  • Vulnerability 3 Vulnerabilities
  • Code Smell 50 Code Smells

Coverage and Duplications

  • Coverage 51.40% Coverage (60.50% Estimated after merge)
  • Duplications No duplication information (6.20% Estimated after merge)

Project ID: confluent-kafka-python

View in SonarQube

Copy link
Member

@rayokota rayokota left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

poll_timeout = None if poll_timeout == -1 else poll_timeout
async with timeout(poll_timeout):
while True:
# Zero timeout here is what makes it non-blocking
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this because it's wrong -- poll(0) could block.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants