Skip to content

Commit 00fd11b

Browse files
GH-3145 : Sample for next gen consumer-group rebalance protocol
Fixes: #3145 * Add a sample demonstrating the next-generation consumer-group rebalance protocol. * Provide a docker-compose script for spinning up a `kraft` based Kafka broker since the new consumer protocol only works in `kraft` mode. * Add README explaining the sample.
1 parent bd8b82e commit 00fd11b

File tree

13 files changed

+613
-0
lines changed

13 files changed

+613
-0
lines changed

samples/README.adoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,4 @@
66
* sample-04 - Topic based (non-blocking) retry
77
* sample-05 - Global embedded Kafka testing
88
* sample-06 - Kafka Streams tests with TopologyTestDriver
9+
* sample-07 - The New consumer rebalance protocol in spring-kafka

samples/sample-07/.gitignore

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
HELP.md
2+
.gradle
3+
build/
4+
!gradle/wrapper/gradle-wrapper.jar
5+
!**/src/main/**/build/
6+
!**/src/test/**/build/
7+
8+
### STS ###
9+
.apt_generated
10+
.classpath
11+
.factorypath
12+
.project
13+
.settings
14+
.springBeans
15+
.sts4-cache
16+
bin/
17+
!**/src/main/**/bin/
18+
!**/src/test/**/bin/
19+
20+
### IntelliJ IDEA ###
21+
.idea
22+
*.iws
23+
*.iml
24+
*.ipr
25+
out/
26+
!**/src/main/**/out/
27+
!**/src/test/**/out/
28+
29+
### NetBeans ###
30+
/nbproject/private/
31+
/nbbuild/
32+
/dist/
33+
/nbdist/
34+
/.nb-gradle/
35+
36+
### VS Code ###
37+
.vscode/

samples/sample-07/README.adoc

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
== Sample 7
2+
3+
This sample demonstrates the application of the new consumer rebalance protocol in Spring for Apache Kafka.
4+
5+
The new consumer rebalance protocol refers to the Server Side rebalance protocol proposed in link:https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol[KIP-848].
6+
7+
`Spring Boot` starts the `Kafka Broker` container defined in the `compose.yaml` file upon startup.
8+
9+
```yaml
10+
version: '3'
11+
services:
12+
broker:
13+
image: bitnami/kafka:3.7.0
14+
...
15+
# KIP-848
16+
KAFKA_CFG_GROUP_COORDINATOR_REBALANCE_PROTOCOLS: "classic,consumer"
17+
KAFKA_CFG_TRANSACTION_PARTITION_VERIFICATION_ENABLE: "false"
18+
```
19+
20+
The config of `group.protocol = conumser` should be added to `Consumer` configuration to apply new consumer rebalance protocol.
21+
22+
The `group.protocol` can be configured in the `resources/application.yaml` as follows:
23+
24+
```yaml
25+
spring:
26+
kafka:
27+
consumer:
28+
properties:
29+
group.protocol: consumer
30+
```
31+
32+
Next, the `Consumer` created by `@KafkaListener` will request a subscription to the `test-topic` from the `Broker`.
33+
34+
The `Broker` will then send the Topic Partition Assign information to the `Consumer`. This means that the `Consumer` rebalancing has finished, and the `Consumer` has started to poll messages.
35+
36+
```java
37+
@Component
38+
public class Sample07KafkaListener {
39+
40+
@KafkaListener(topics = "test-topic", groupId = "sample07-1")
41+
public void listenWithGroup1(String message) {
42+
System.out.println("Received message at group sample07-1: " + message);
43+
}
44+
45+
@KafkaListener(topics = "test-topic", groupId = "sample07-2")
46+
public void listenWithGroup2(String message) {
47+
System.out.println("Received message at group sample07-2: " + message);
48+
}
49+
}
50+
```

samples/sample-07/build.gradle

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
plugins {
2+
id 'java'
3+
id 'org.springframework.boot' version '3.3.0-SNAPSHOT'
4+
id 'io.spring.dependency-management' version '1.1.5'
5+
}
6+
7+
group = 'com.example'
8+
version = '3.2.0-SNAPSHOT'
9+
10+
java {
11+
sourceCompatibility = '17'
12+
}
13+
14+
repositories {
15+
mavenCentral()
16+
maven { url 'https://repo.spring.io/milestone' }
17+
maven { url 'https://repo.spring.io/snapshot' }
18+
}
19+
20+
dependencies {
21+
implementation 'org.springframework.boot:spring-boot-starter'
22+
implementation 'org.springframework.kafka:spring-kafka'
23+
developmentOnly 'org.springframework.boot:spring-boot-docker-compose'
24+
25+
testImplementation 'org.springframework.boot:spring-boot-starter-test'
26+
testImplementation 'org.springframework.kafka:spring-kafka-test'
27+
testImplementation 'org.springframework.boot:spring-boot-testcontainers'
28+
}
29+
30+
tasks.named('test') {
31+
useJUnitPlatform()
32+
}

samples/sample-07/compose.yaml

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
version: '3'
2+
services:
3+
broker:
4+
image: bitnami/kafka:3.7.0
5+
hostname: broker
6+
container_name: broker
7+
ports:
8+
- "9092:9092"
9+
- "10000:9094"
10+
environment:
11+
# Kraft Settings
12+
KAFKA_CFG_NODE_ID: 0
13+
KAFKA_KRAFT_CLUSTER_ID: HsDBs9l6UUmQq7Y5E6bNlw
14+
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@localhost:9093
15+
KAFKA_CFG_PROCESS_ROLES: controller,broker
16+
17+
# Listeners
18+
KAFKA_CFG_LISTENERS: INTERNAL://broker:29092, PLAINTEXT://0.0.0.0:9092, EXTERNAL://:9094, CONTROLLER://:9093
19+
KAFKA_CFG_ADVERTISED_LISTENERS: INTERNAL://broker:29092, PLAINTEXT://broker:9092, EXTERNAL://127.0.0.1:10000
20+
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT,INTERNAL:PLAINTEXT
21+
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
22+
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "true"
23+
KAFKA_CFG_INTER_BROKER_LISTENER_NAME: INTERNAL
24+
KAFKA_CFG_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
25+
26+
# Clustering
27+
KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR: 1
28+
KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
29+
KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
30+
KAFKA_CFG_DEFAULT_REPLICATION_FACTOR: 1
31+
32+
# KIP-848
33+
KAFKA_CFG_GROUP_COORDINATOR_REBALANCE_PROTOCOLS: "classic,consumer"
34+
KAFKA_CFG_TRANSACTION_PARTITION_VERIFICATION_ENABLE: "false"
Binary file not shown.
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
distributionBase=GRADLE_USER_HOME
2+
distributionPath=wrapper/dists
3+
distributionUrl=https\://services.gradle.org/distributions/gradle-8.7-bin.zip
4+
networkTimeout=10000
5+
validateDistributionUrl=true
6+
zipStoreBase=GRADLE_USER_HOME
7+
zipStorePath=wrapper/dists

0 commit comments

Comments
 (0)