diff --git a/CHANGELOG.md b/CHANGELOG.md index cc0ac4857..3b148fc02 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,16 @@ # Confluent's Python client for Apache Kafka +## v2.10.0 + +v2.10.0 is a feature release with the following fixes and enhancements: + +- [KIP-848] Group Config is now supported in AlterConfigs, IncrementalAlterConfigs and DescribeConfigs. (#1856) +- [KIP-848] `describe_consumer_groups()` now supports KIP-848 introduced `consumer` groups. Two new fields for consumer group type and target assignment have also been added. Type defines whether this group is a `classic` or `consumer` group. Target assignment is only valid for the `consumer` protocol and its defaults to NULL. (#1873). + +confluent-kafka-python v2.10.0 is based on librdkafka v2.10.0, see the +[librdkafka release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.10.0) +for a complete list of changes, enhancements, fixes and upgrade considerations. + ## v2.9.0 v2.9.0 is a feature release with the following fixes and enhancements: diff --git a/docs/conf.py b/docs/conf.py index 95fca1765..097484066 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -20,14 +20,14 @@ ###################################################################### # General information about the project. project = u'confluent-kafka' -copyright = u'2016-2024, Confluent Inc.' +copyright = u'2016-2025, Confluent Inc.' # The version info for the project you're documenting, acts as replacement for # |version| and |release|, also used in various other places throughout the # built documents. # # The short X.Y version. -version = '2.9.0' +version = '2.10.0rc3' # The full version, including alpha/beta/rc tags. release = version ###################################################################### diff --git a/examples/adminapi.py b/examples/adminapi.py index 54f119e02..0eb87216c 100755 --- a/examples/adminapi.py +++ b/examples/adminapi.py @@ -535,6 +535,7 @@ def example_describe_consumer_groups(a, args): print("Group Id: {}".format(g.group_id)) print(" Is Simple : {}".format(g.is_simple_consumer_group)) print(" State : {}".format(g.state)) + print(" Type : {}".format(g.type)) print(" Partition Assignor : {}".format(g.partition_assignor)) print( f" Coordinator : {g.coordinator}") @@ -548,6 +549,10 @@ def example_describe_consumer_groups(a, args): print(" Assignments :") for toppar in member.assignment.topic_partitions: print(" {} [{}]".format(toppar.topic, toppar.partition)) + if member.target_assignment: + print(" Target Assignments:") + for toppar in member.target_assignment.topic_partitions: + print(f" {toppar.topic} [{toppar.partition}]") if (include_auth_ops): print(" Authorized operations: ") op_string = "" diff --git a/examples/docker/Dockerfile.alpine b/examples/docker/Dockerfile.alpine index c7f37fe4a..db08bad1e 100644 --- a/examples/docker/Dockerfile.alpine +++ b/examples/docker/Dockerfile.alpine @@ -30,7 +30,7 @@ FROM alpine:3.12 COPY . /usr/src/confluent-kafka-python -ENV LIBRDKAFKA_VERSION="v2.8.0" +ENV LIBRDKAFKA_VERSION="v2.10.0-RC3" ENV KCAT_VERSION="master" ENV CKP_VERSION="master" diff --git a/pyproject.toml b/pyproject.toml index e581b8e45..c3b7aa814 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "confluent-kafka" -version = "2.9.0" +version = "2.10.0rc3" description = "Confluent's Python client for Apache Kafka" classifiers = [ "Development Status :: 5 - Production/Stable", diff --git a/src/confluent_kafka/admin/_group.py b/src/confluent_kafka/admin/_group.py index 964d62b2f..7823e976a 100644 --- a/src/confluent_kafka/admin/_group.py +++ b/src/confluent_kafka/admin/_group.py @@ -94,15 +94,18 @@ class MemberDescription: The host where the group member is running. assignment: MemberAssignment The assignment of the group member + target_assignment: MemberAssignment + The target assignment of the group member group_instance_id : str The instance id of the group member. """ - def __init__(self, member_id, client_id, host, assignment, group_instance_id=None): + def __init__(self, member_id, client_id, host, assignment, group_instance_id=None, target_assignment=None): self.member_id = member_id self.client_id = client_id self.host = host self.assignment = assignment + self.target_assignment = target_assignment self.group_instance_id = group_instance_id @@ -123,6 +126,8 @@ class ConsumerGroupDescription: Partition assignor. state : ConsumerGroupState Current state of the consumer group. + type : ConsumerGroupType + Type of the consumer group. coordinator: Node Consumer group coordinator. authorized_operations: list(AclOperation) @@ -130,7 +135,7 @@ class ConsumerGroupDescription: """ def __init__(self, group_id, is_simple_consumer_group, members, partition_assignor, state, - coordinator, authorized_operations=None): + coordinator, authorized_operations=None, type=ConsumerGroupType.UNKNOWN): self.group_id = group_id self.is_simple_consumer_group = is_simple_consumer_group self.members = members @@ -143,4 +148,6 @@ def __init__(self, group_id, is_simple_consumer_group, members, partition_assign self.partition_assignor = partition_assignor if state is not None: self.state = ConversionUtil.convert_to_enum(state, ConsumerGroupState) + if type is not None: + self.type = ConversionUtil.convert_to_enum(type, ConsumerGroupType) self.coordinator = coordinator diff --git a/src/confluent_kafka/src/Admin.c b/src/confluent_kafka/src/Admin.c index 5eeb5c4cd..451017ad6 100644 --- a/src/confluent_kafka/src/Admin.c +++ b/src/confluent_kafka/src/Admin.c @@ -3857,7 +3857,9 @@ static PyObject *Admin_c_MemberDescription_to_py(const rd_kafka_MemberDescriptio PyObject *args = NULL; PyObject *kwargs = NULL; PyObject *assignment = NULL; + PyObject *target_assignment = NULL; const rd_kafka_MemberAssignment_t *c_assignment; + const rd_kafka_MemberAssignment_t *c_target_assignment; MemberDescription_type = cfl_PyObject_lookup("confluent_kafka.admin", "MemberDescription"); @@ -3892,6 +3894,15 @@ static PyObject *Admin_c_MemberDescription_to_py(const rd_kafka_MemberDescriptio PyDict_SetItemString(kwargs, "assignment", assignment); + c_target_assignment = rd_kafka_MemberDescription_target_assignment(c_member); + if(c_target_assignment) { + target_assignment = Admin_c_MemberAssignment_to_py(c_target_assignment); + if (!target_assignment) { + goto err; + } + PyDict_SetItemString(kwargs, "target_assignment", target_assignment); + } + args = PyTuple_New(0); member = PyObject_Call(MemberDescription_type, args, kwargs); @@ -3900,6 +3911,7 @@ static PyObject *Admin_c_MemberDescription_to_py(const rd_kafka_MemberDescriptio Py_DECREF(kwargs); Py_DECREF(MemberDescription_type); Py_DECREF(assignment); + Py_XDECREF(target_assignment); return member; err: @@ -3908,6 +3920,7 @@ static PyObject *Admin_c_MemberDescription_to_py(const rd_kafka_MemberDescriptio Py_XDECREF(kwargs); Py_XDECREF(MemberDescription_type); Py_XDECREF(assignment); + Py_XDECREF(target_assignment); Py_XDECREF(member); return NULL; } @@ -4003,6 +4016,8 @@ static PyObject *Admin_c_ConsumerGroupDescription_to_py( cfl_PyDict_SetInt(kwargs, "state", rd_kafka_ConsumerGroupDescription_state(c_consumer_group_description)); + cfl_PyDict_SetInt(kwargs, "type", rd_kafka_ConsumerGroupDescription_type(c_consumer_group_description)); + args = PyTuple_New(0); consumer_group_description = PyObject_Call(ConsumerGroupDescription_type, args, kwargs); diff --git a/src/confluent_kafka/src/confluent_kafka.h b/src/confluent_kafka/src/confluent_kafka.h index 12d7c14e1..871ce88af 100644 --- a/src/confluent_kafka/src/confluent_kafka.h +++ b/src/confluent_kafka/src/confluent_kafka.h @@ -42,8 +42,8 @@ * 0xMMmmRRPP * MM=major, mm=minor, RR=revision, PP=patchlevel (not used) */ -#define CFL_VERSION 0x02090000 -#define CFL_VERSION_STR "2.9.0" +#define CFL_VERSION 0x020a0000 +#define CFL_VERSION_STR "2.10.0rc3" /** * Minimum required librdkafka version. This is checked both during @@ -51,19 +51,19 @@ * Make sure to keep the MIN_RD_KAFKA_VERSION, MIN_VER_ERRSTR and #error * defines and strings in sync. */ -#define MIN_RD_KAFKA_VERSION 0x020800ff +#define MIN_RD_KAFKA_VERSION 0x020a00ff #ifdef __APPLE__ -#define MIN_VER_ERRSTR "confluent-kafka-python requires librdkafka v2.8.0 or later. Install the latest version of librdkafka from Homebrew by running `brew install librdkafka` or `brew upgrade librdkafka`" +#define MIN_VER_ERRSTR "confluent-kafka-python requires librdkafka v2.10.0 or later. Install the latest version of librdkafka from Homebrew by running `brew install librdkafka` or `brew upgrade librdkafka`" #else -#define MIN_VER_ERRSTR "confluent-kafka-python requires librdkafka v2.8.0 or later. Install the latest version of librdkafka from the Confluent repositories, see http://docs.confluent.io/current/installation.html" +#define MIN_VER_ERRSTR "confluent-kafka-python requires librdkafka v2.10.0 or later. Install the latest version of librdkafka from the Confluent repositories, see http://docs.confluent.io/current/installation.html" #endif #if RD_KAFKA_VERSION < MIN_RD_KAFKA_VERSION #ifdef __APPLE__ -#error "confluent-kafka-python requires librdkafka v2.8.0 or later. Install the latest version of librdkafka from Homebrew by running `brew install librdkafka` or `brew upgrade librdkafka`" +#error "confluent-kafka-python requires librdkafka v2.10.0 or later. Install the latest version of librdkafka from Homebrew by running `brew install librdkafka` or `brew upgrade librdkafka`" #else -#error "confluent-kafka-python requires librdkafka v2.8.0 or later. Install the latest version of librdkafka from the Confluent repositories, see http://docs.confluent.io/current/installation.html" +#error "confluent-kafka-python requires librdkafka v2.10.0 or later. Install the latest version of librdkafka from the Confluent repositories, see http://docs.confluent.io/current/installation.html" #endif #endif