13
13
# See the License for the specific language governing permissions and
14
14
# limitations under the License.
15
15
16
- import time
17
- import pytest
18
-
19
- from confluent_kafka import Consumer , ConsumerGroupState , ConsumerGroupType , TopicPartition
20
- from confluent_kafka .admin import AdminClient
16
+ from confluent_kafka import ConsumerGroupState , ConsumerGroupType , TopicPartition
21
17
import uuid
22
18
23
19
from tests .common import TestUtils
24
20
25
21
topic_prefix = "test-topic"
26
- # Generate random group IDs
22
+
27
23
28
24
def create_consumers (kafka_cluster , topic , group_id , client_id , Protocol ):
29
25
conf = {'group.id' : group_id ,
@@ -38,6 +34,7 @@ def create_consumers(kafka_cluster, topic, group_id, client_id, Protocol):
38
34
consumer .poll (10 )
39
35
return consumer
40
36
37
+
41
38
def verify_describe_consumer_groups (kafka_cluster , admin_client , topic ):
42
39
43
40
group_id_new1 = f"test-group_new1-{ uuid .uuid4 ()} "
@@ -51,7 +48,7 @@ def verify_describe_consumer_groups(kafka_cluster, admin_client, topic):
51
48
client_id4 = "test-client4"
52
49
53
50
consumers = []
54
-
51
+
55
52
# Create two groups with new group protocol
56
53
consumers .append (create_consumers (kafka_cluster , topic , group_id_new1 , client_id1 , "consumer" ))
57
54
consumers .append (create_consumers (kafka_cluster , topic , group_id_new2 , client_id2 , "consumer" ))
@@ -76,7 +73,7 @@ def verify_describe_consumer_groups(kafka_cluster, admin_client, topic):
76
73
for member in result .members :
77
74
assert member .client_id in [client_id1 , client_id2 ]
78
75
assert member .assignment .topic_partitions == partition
79
-
76
+
80
77
fs2 = admin_client .describe_consumer_groups (group_ids = [group_id_old1 , group_id_old2 ])
81
78
for group_id , f in fs2 .items ():
82
79
result = f .result ()
@@ -88,7 +85,7 @@ def verify_describe_consumer_groups(kafka_cluster, admin_client, topic):
88
85
for member in result .members :
89
86
assert member .client_id in [client_id3 , client_id4 ]
90
87
assert member .assignment .topic_partitions == partition
91
-
88
+
92
89
fs3 = admin_client .describe_consumer_groups (group_ids = [group_id_new1 , group_id_new2 , group_id_old1 , group_id_old2 ])
93
90
for group_id , f in fs3 .items ():
94
91
result = f .result ()
@@ -106,10 +103,11 @@ def verify_describe_consumer_groups(kafka_cluster, admin_client, topic):
106
103
else :
107
104
assert member .client_id in [client_id3 , client_id4 ]
108
105
assert member .assignment .topic_partitions == partition
109
-
106
+
110
107
for consumer in consumers :
111
108
consumer .close ()
112
109
110
+
113
111
def test_describe_consumer_groups_compatability (kafka_cluster ):
114
112
115
113
admin_client = kafka_cluster .admin ()
@@ -124,11 +122,11 @@ def test_describe_consumer_groups_compatability(kafka_cluster):
124
122
},
125
123
validate_only = False
126
124
)
127
-
125
+
128
126
if TestUtils .use_group_protocol_consumer ():
129
127
verify_describe_consumer_groups (kafka_cluster , admin_client , our_topic )
130
128
131
129
# Delete created topic
132
130
fs = admin_client .delete_topics ([our_topic ])
133
131
for topic , f in fs .items ():
134
- f .result ()
132
+ f .result ()
0 commit comments