@@ -33,74 +33,70 @@ def create_consumers(kafka_cluster, topic, group_id, client_id, Protocol):
33
33
34
34
35
35
def verify_describe_consumer_groups (kafka_cluster , admin_client , topic ):
36
+ def create_consumer_groups ():
37
+ """Create consumer groups with new and old protocols."""
38
+ group_ids = {
39
+ "new1" : f"test-group_new1-{ uuid .uuid4 ()} " ,
40
+ "new2" : f"test-group_new2-{ uuid .uuid4 ()} " ,
41
+ "old1" : f"test-group_old1-{ uuid .uuid4 ()} " ,
42
+ "old2" : f"test-group_old2-{ uuid .uuid4 ()} " ,
43
+ }
44
+ client_ids = {
45
+ "new" : ["test-client1" , "test-client2" ],
46
+ "old" : ["test-client3" , "test-client4" ],
47
+ }
48
+
49
+ consumers = [
50
+ create_consumers (kafka_cluster , topic , group_ids ["new1" ], client_ids ["new" ][0 ], "consumer" ),
51
+ create_consumers (kafka_cluster , topic , group_ids ["new2" ], client_ids ["new" ][1 ], "consumer" ),
52
+ create_consumers (kafka_cluster , topic , group_ids ["old1" ], client_ids ["old" ][0 ], "classic" ),
53
+ create_consumers (kafka_cluster , topic , group_ids ["old2" ], client_ids ["old" ][1 ], "classic" ),
54
+ ]
55
+ return group_ids , client_ids , consumers
56
+
57
+ def verify_consumer_group_results (fs , expected_group_ids , expected_type , expected_clients ):
58
+ """Verify the results of consumer group descriptions."""
59
+ for group_id , f in fs .items ():
60
+ result = f .result ()
61
+ assert result .group_id in expected_group_ids
62
+ assert result .is_simple_consumer_group is False
63
+ assert result .state == ConsumerGroupState .STABLE
64
+ assert result .type == expected_type
65
+ assert len (result .members ) == 1
66
+ for member in result .members :
67
+ assert member .client_id in expected_clients
68
+ assert member .assignment .topic_partitions == partition
69
+
70
+ # Create consumer groups
71
+ group_ids , client_ids , consumers = create_consumer_groups ()
72
+ partition = [TopicPartition (topic , 0 )]
36
73
37
- group_id_new1 = f"test-group_new1-{ uuid .uuid4 ()} "
38
- group_id_new2 = f"test-group_new2-{ uuid .uuid4 ()} "
39
- group_id_old1 = f"test-group_old1-{ uuid .uuid4 ()} "
40
- group_id_old2 = f"test-group_old2-{ uuid .uuid4 ()} "
41
-
42
- client_id1 = "test-client1"
43
- client_id2 = "test-client2"
44
- client_id3 = "test-client3"
45
- client_id4 = "test-client4"
46
-
47
- consumers = []
48
-
49
- # Create two groups with new group protocol
50
- consumers .append (create_consumers (kafka_cluster , topic , group_id_new1 , client_id1 , "consumer" ))
51
- consumers .append (create_consumers (kafka_cluster , topic , group_id_new2 , client_id2 , "consumer" ))
52
-
53
- # Create two groups with old group protocol
54
- consumers .append (create_consumers (kafka_cluster , topic , group_id_old1 , client_id3 , "classic" ))
55
- consumers .append (create_consumers (kafka_cluster , topic , group_id_old2 , client_id4 , "classic" ))
74
+ # Describe and verify new group protocol consumer groups
75
+ fs_new = admin_client .describe_consumer_groups ([group_ids ["new1" ], group_ids ["new2" ]])
76
+ verify_consumer_group_results (fs_new , [group_ids ["new1" ], group_ids ["new2" ]],
77
+ ConsumerGroupType .CONSUMER , client_ids ["new" ])
56
78
57
- partition = [TopicPartition (topic , 0 )]
79
+ # Describe and verify old group protocol consumer groups
80
+ fs_old = admin_client .describe_consumer_groups ([group_ids ["old1" ], group_ids ["old2" ]])
81
+ verify_consumer_group_results (fs_old , [group_ids ["old1" ], group_ids ["old2" ]],
82
+ ConsumerGroupType .CLASSIC , client_ids ["old" ])
58
83
59
- # We will pass 3 requests, one containing the two groups created with new
60
- # group protocol and the other containing the two groups created with old
61
- # group protocol and the third containing all the groups and verify the results.
62
- fs1 = admin_client .describe_consumer_groups (group_ids = [group_id_new1 , group_id_new2 ])
63
- for group_id , f in fs1 .items ():
64
- result = f .result ()
65
- assert result .group_id in [group_id_new1 , group_id_new2 ]
66
- assert result .is_simple_consumer_group is False
67
- assert result .state == ConsumerGroupState .STABLE
68
- assert result .type == ConsumerGroupType .CONSUMER
69
- assert len (result .members ) == 1
70
- for member in result .members :
71
- assert member .client_id in [client_id1 , client_id2 ]
72
- assert member .assignment .topic_partitions == partition
73
-
74
- fs2 = admin_client .describe_consumer_groups (group_ids = [group_id_old1 , group_id_old2 ])
75
- for group_id , f in fs2 .items ():
76
- result = f .result ()
77
- assert result .group_id in [group_id_old1 , group_id_old2 ]
78
- assert result .is_simple_consumer_group is False
79
- assert result .state == ConsumerGroupState .STABLE
80
- assert result .type == ConsumerGroupType .CLASSIC
81
- assert len (result .members ) == 1
82
- for member in result .members :
83
- assert member .client_id in [client_id3 , client_id4 ]
84
- assert member .assignment .topic_partitions == partition
85
-
86
- fs3 = admin_client .describe_consumer_groups (group_ids = [group_id_new1 , group_id_new2 , group_id_old1 , group_id_old2 ])
87
- for group_id , f in fs3 .items ():
84
+ # Describe and verify all consumer groups
85
+ fs_all = admin_client .describe_consumer_groups (list (group_ids .values ()))
86
+ for group_id , f in fs_all .items ():
88
87
result = f .result ()
89
- assert result .group_id in [ group_id_new1 , group_id_new2 , group_id_old1 , group_id_old2 ]
88
+ assert result .group_id in group_ids . values ()
90
89
assert result .is_simple_consumer_group is False
91
90
assert result .state == ConsumerGroupState .STABLE
92
- if result .group_id in [group_id_new1 , group_id_new2 ]:
91
+ if result .group_id in [group_ids [ "new1" ], group_ids [ "new2" ] ]:
93
92
assert result .type == ConsumerGroupType .CONSUMER
93
+ assert result .members [0 ].client_id in client_ids ["new" ]
94
94
else :
95
95
assert result .type == ConsumerGroupType .CLASSIC
96
- assert len (result .members ) == 1
97
- for member in result .members :
98
- if result .group_id in [group_id_new1 , group_id_new2 ]:
99
- assert member .client_id in [client_id1 , client_id2 ]
100
- else :
101
- assert member .client_id in [client_id3 , client_id4 ]
102
- assert member .assignment .topic_partitions == partition
96
+ assert result .members [0 ].client_id in client_ids ["old" ]
97
+ assert result .members [0 ].assignment .topic_partitions == partition
103
98
99
+ # Close all consumers
104
100
for consumer in consumers :
105
101
consumer .close ()
106
102
0 commit comments