Skip to content

Commit 5a1f136

Browse files
KIP 848: Added support for DescribeConsumerGroup for consumer protocol groups (#1873)
* Added support for describe consumer group for new protocol * style fix * removed session timeout * style fix * Simplifying test code * target assignment declaration * Protocol->group_protocol * name change * Changelog added * removed test * Changelog change * v2.10.0rc3 release changes (#1969) --------- Co-authored-by: Pranav Rathi <4427674+pranavrth@users.noreply.github.com>
1 parent 3785b1b commit 5a1f136

File tree

8 files changed

+51
-13
lines changed

8 files changed

+51
-13
lines changed

CHANGELOG.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,16 @@
11
# Confluent's Python client for Apache Kafka
22

3+
## v2.10.0
4+
5+
v2.10.0 is a feature release with the following fixes and enhancements:
6+
7+
- [KIP-848] Group Config is now supported in AlterConfigs, IncrementalAlterConfigs and DescribeConfigs. (#1856)
8+
- [KIP-848] `describe_consumer_groups()` now supports KIP-848 introduced `consumer` groups. Two new fields for consumer group type and target assignment have also been added. Type defines whether this group is a `classic` or `consumer` group. Target assignment is only valid for the `consumer` protocol and its defaults to NULL. (#1873).
9+
10+
confluent-kafka-python v2.10.0 is based on librdkafka v2.10.0, see the
11+
[librdkafka release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.10.0)
12+
for a complete list of changes, enhancements, fixes and upgrade considerations.
13+
314
## v2.9.0
415

516
v2.9.0 is a feature release with the following fixes and enhancements:

docs/conf.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,14 @@
2020
######################################################################
2121
# General information about the project.
2222
project = u'confluent-kafka'
23-
copyright = u'2016-2024, Confluent Inc.'
23+
copyright = u'2016-2025, Confluent Inc.'
2424

2525
# The version info for the project you're documenting, acts as replacement for
2626
# |version| and |release|, also used in various other places throughout the
2727
# built documents.
2828
#
2929
# The short X.Y version.
30-
version = '2.9.0'
30+
version = '2.10.0rc3'
3131
# The full version, including alpha/beta/rc tags.
3232
release = version
3333
######################################################################

examples/adminapi.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -535,6 +535,7 @@ def example_describe_consumer_groups(a, args):
535535
print("Group Id: {}".format(g.group_id))
536536
print(" Is Simple : {}".format(g.is_simple_consumer_group))
537537
print(" State : {}".format(g.state))
538+
print(" Type : {}".format(g.type))
538539
print(" Partition Assignor : {}".format(g.partition_assignor))
539540
print(
540541
f" Coordinator : {g.coordinator}")
@@ -548,6 +549,10 @@ def example_describe_consumer_groups(a, args):
548549
print(" Assignments :")
549550
for toppar in member.assignment.topic_partitions:
550551
print(" {} [{}]".format(toppar.topic, toppar.partition))
552+
if member.target_assignment:
553+
print(" Target Assignments:")
554+
for toppar in member.target_assignment.topic_partitions:
555+
print(f" {toppar.topic} [{toppar.partition}]")
551556
if (include_auth_ops):
552557
print(" Authorized operations: ")
553558
op_string = ""

examples/docker/Dockerfile.alpine

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ FROM alpine:3.12
3030

3131
COPY . /usr/src/confluent-kafka-python
3232

33-
ENV LIBRDKAFKA_VERSION="v2.8.0"
33+
ENV LIBRDKAFKA_VERSION="v2.10.0-RC3"
3434
ENV KCAT_VERSION="master"
3535
ENV CKP_VERSION="master"
3636

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
44

55
[project]
66
name = "confluent-kafka"
7-
version = "2.9.0"
7+
version = "2.10.0rc3"
88
description = "Confluent's Python client for Apache Kafka"
99
classifiers = [
1010
"Development Status :: 5 - Production/Stable",

src/confluent_kafka/admin/_group.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,15 +94,18 @@ class MemberDescription:
9494
The host where the group member is running.
9595
assignment: MemberAssignment
9696
The assignment of the group member
97+
target_assignment: MemberAssignment
98+
The target assignment of the group member
9799
group_instance_id : str
98100
The instance id of the group member.
99101
"""
100102

101-
def __init__(self, member_id, client_id, host, assignment, group_instance_id=None):
103+
def __init__(self, member_id, client_id, host, assignment, group_instance_id=None, target_assignment=None):
102104
self.member_id = member_id
103105
self.client_id = client_id
104106
self.host = host
105107
self.assignment = assignment
108+
self.target_assignment = target_assignment
106109
self.group_instance_id = group_instance_id
107110

108111

@@ -123,14 +126,16 @@ class ConsumerGroupDescription:
123126
Partition assignor.
124127
state : ConsumerGroupState
125128
Current state of the consumer group.
129+
type : ConsumerGroupType
130+
Type of the consumer group.
126131
coordinator: Node
127132
Consumer group coordinator.
128133
authorized_operations: list(AclOperation)
129134
AclOperations allowed for the consumer group.
130135
"""
131136

132137
def __init__(self, group_id, is_simple_consumer_group, members, partition_assignor, state,
133-
coordinator, authorized_operations=None):
138+
coordinator, authorized_operations=None, type=ConsumerGroupType.UNKNOWN):
134139
self.group_id = group_id
135140
self.is_simple_consumer_group = is_simple_consumer_group
136141
self.members = members
@@ -143,4 +148,6 @@ def __init__(self, group_id, is_simple_consumer_group, members, partition_assign
143148
self.partition_assignor = partition_assignor
144149
if state is not None:
145150
self.state = ConversionUtil.convert_to_enum(state, ConsumerGroupState)
151+
if type is not None:
152+
self.type = ConversionUtil.convert_to_enum(type, ConsumerGroupType)
146153
self.coordinator = coordinator

src/confluent_kafka/src/Admin.c

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3857,7 +3857,9 @@ static PyObject *Admin_c_MemberDescription_to_py(const rd_kafka_MemberDescriptio
38573857
PyObject *args = NULL;
38583858
PyObject *kwargs = NULL;
38593859
PyObject *assignment = NULL;
3860+
PyObject *target_assignment = NULL;
38603861
const rd_kafka_MemberAssignment_t *c_assignment;
3862+
const rd_kafka_MemberAssignment_t *c_target_assignment;
38613863

38623864
MemberDescription_type = cfl_PyObject_lookup("confluent_kafka.admin",
38633865
"MemberDescription");
@@ -3892,6 +3894,15 @@ static PyObject *Admin_c_MemberDescription_to_py(const rd_kafka_MemberDescriptio
38923894

38933895
PyDict_SetItemString(kwargs, "assignment", assignment);
38943896

3897+
c_target_assignment = rd_kafka_MemberDescription_target_assignment(c_member);
3898+
if(c_target_assignment) {
3899+
target_assignment = Admin_c_MemberAssignment_to_py(c_target_assignment);
3900+
if (!target_assignment) {
3901+
goto err;
3902+
}
3903+
PyDict_SetItemString(kwargs, "target_assignment", target_assignment);
3904+
}
3905+
38953906
args = PyTuple_New(0);
38963907

38973908
member = PyObject_Call(MemberDescription_type, args, kwargs);
@@ -3900,6 +3911,7 @@ static PyObject *Admin_c_MemberDescription_to_py(const rd_kafka_MemberDescriptio
39003911
Py_DECREF(kwargs);
39013912
Py_DECREF(MemberDescription_type);
39023913
Py_DECREF(assignment);
3914+
Py_XDECREF(target_assignment);
39033915
return member;
39043916

39053917
err:
@@ -3908,6 +3920,7 @@ static PyObject *Admin_c_MemberDescription_to_py(const rd_kafka_MemberDescriptio
39083920
Py_XDECREF(kwargs);
39093921
Py_XDECREF(MemberDescription_type);
39103922
Py_XDECREF(assignment);
3923+
Py_XDECREF(target_assignment);
39113924
Py_XDECREF(member);
39123925
return NULL;
39133926
}
@@ -4003,6 +4016,8 @@ static PyObject *Admin_c_ConsumerGroupDescription_to_py(
40034016

40044017
cfl_PyDict_SetInt(kwargs, "state", rd_kafka_ConsumerGroupDescription_state(c_consumer_group_description));
40054018

4019+
cfl_PyDict_SetInt(kwargs, "type", rd_kafka_ConsumerGroupDescription_type(c_consumer_group_description));
4020+
40064021
args = PyTuple_New(0);
40074022

40084023
consumer_group_description = PyObject_Call(ConsumerGroupDescription_type, args, kwargs);

src/confluent_kafka/src/confluent_kafka.h

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -42,28 +42,28 @@
4242
* 0xMMmmRRPP
4343
* MM=major, mm=minor, RR=revision, PP=patchlevel (not used)
4444
*/
45-
#define CFL_VERSION 0x02090000
46-
#define CFL_VERSION_STR "2.9.0"
45+
#define CFL_VERSION 0x020a0000
46+
#define CFL_VERSION_STR "2.10.0rc3"
4747

4848
/**
4949
* Minimum required librdkafka version. This is checked both during
5050
* build-time (just below) and runtime (see confluent_kafka.c).
5151
* Make sure to keep the MIN_RD_KAFKA_VERSION, MIN_VER_ERRSTR and #error
5252
* defines and strings in sync.
5353
*/
54-
#define MIN_RD_KAFKA_VERSION 0x020800ff
54+
#define MIN_RD_KAFKA_VERSION 0x020a00ff
5555

5656
#ifdef __APPLE__
57-
#define MIN_VER_ERRSTR "confluent-kafka-python requires librdkafka v2.8.0 or later. Install the latest version of librdkafka from Homebrew by running `brew install librdkafka` or `brew upgrade librdkafka`"
57+
#define MIN_VER_ERRSTR "confluent-kafka-python requires librdkafka v2.10.0 or later. Install the latest version of librdkafka from Homebrew by running `brew install librdkafka` or `brew upgrade librdkafka`"
5858
#else
59-
#define MIN_VER_ERRSTR "confluent-kafka-python requires librdkafka v2.8.0 or later. Install the latest version of librdkafka from the Confluent repositories, see http://docs.confluent.io/current/installation.html"
59+
#define MIN_VER_ERRSTR "confluent-kafka-python requires librdkafka v2.10.0 or later. Install the latest version of librdkafka from the Confluent repositories, see http://docs.confluent.io/current/installation.html"
6060
#endif
6161

6262
#if RD_KAFKA_VERSION < MIN_RD_KAFKA_VERSION
6363
#ifdef __APPLE__
64-
#error "confluent-kafka-python requires librdkafka v2.8.0 or later. Install the latest version of librdkafka from Homebrew by running `brew install librdkafka` or `brew upgrade librdkafka`"
64+
#error "confluent-kafka-python requires librdkafka v2.10.0 or later. Install the latest version of librdkafka from Homebrew by running `brew install librdkafka` or `brew upgrade librdkafka`"
6565
#else
66-
#error "confluent-kafka-python requires librdkafka v2.8.0 or later. Install the latest version of librdkafka from the Confluent repositories, see http://docs.confluent.io/current/installation.html"
66+
#error "confluent-kafka-python requires librdkafka v2.10.0 or later. Install the latest version of librdkafka from the Confluent repositories, see http://docs.confluent.io/current/installation.html"
6767
#endif
6868
#endif
6969

0 commit comments

Comments
 (0)