Skip to content

Commit f91f8a9

Browse files
chickenchickenloveartembilan
authored andcommitted
GH-2806: Receiving an empty list with RecordFilterStrategy
Fixes: #2806 Motivation: Receiving an empty list when using `RecordFilterStrategy` on batch messages In the current batch mode, even if the `RecordFilterStrategy` filters all records resulting in an Empty List being returned, the KafkaListener is still invoked. In contrast, in single record mode, if record are filtered, the `KafkaListener` is not called. This difference in behavior between the two modes can cause confusion for users. Modifications: Add public method `isAnyManualAck()` to `Acknowledgment` to verify that `manualAck` is needed on `FilteringBatchMessageListenerAdapter`. Modify `FilteringBatchMessageListenerAdapter`. add field `consumerAware` as final (IMHO, we don't need to calculate it every single call `onMessage()`). add logic (if empty list and manual Ack == true, KafkaListener will be invoked. If empty list and manual Ack == false, `KafkaListener` will not be invoked even if listener is kind of ConsumerAware. In detail, See Discussion section below. Result: Receiving an empty list when using RecordFilterStrategy on batch messages #2806 When the RecordFilterStrategy filters all records and returns an Empty List, the KafkaListener is invoked only if it is in manual ACK mode. Discussion: When using a ConsumerAware Listener, commits can be made using `Consumer.commitSync()` and `Consumer.commitAsync()`. However, when using a `ConsumerAwareAckListener`, it seems possible that commits using the Consumer and commits using Ack could be processed simultaneously. That situation seems quite ambiguous.
1 parent 3c2bd93 commit f91f8a9

File tree

5 files changed

+269
-26
lines changed

5 files changed

+269
-26
lines changed

spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/filtering.adoc

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,3 +27,63 @@ public void listen(Thing thing) {
2727
}
2828
----
2929

30+
Starting with version 3.3, Ignoring empty batches that result from filtering by `RecordFilterStrategy` is supported.
31+
When implementing `RecordFilterStrategy`, it can be configured through `ignoreEmptyBatch()`.
32+
The default setting is `false`, indicating `KafkaListener` will be invoked even if all `ConsumerRecord` s are filtered out.
33+
34+
If `true` is returned, the `KafkaListener` [underline]#will not be invoked# when all `ConsumerRecord` are filtered out.
35+
However, commit to broker, will still be executed.
36+
37+
If `false` is returned, the `KafkaListener` [underline]#will be invoked# when all `ConsumerRecord` are filtered out.
38+
39+
Here are some examples.
40+
41+
[source,java]
42+
----
43+
public class IgnoreEmptyBatchRecordFilterStrategy implements RecordFilterStrategy {
44+
...
45+
@Override
46+
public List<ConsumerRecord<String, String>> filterBatch(
47+
List<ConsumerRecord<String, String>> consumerRecords) {
48+
return List.of();
49+
}
50+
51+
@Override
52+
public boolean ignoreEmptyBatch() {
53+
return true;
54+
}
55+
};
56+
57+
// NOTE: ignoreEmptyBatchRecordFilterStrategy is bean name of IgnoreEmptyBatchRecordFilterStrategy instance.
58+
@KafkaListener(id = "filtered", topics = "topic", filter = "ignoreEmptyBatchRecordFilterStrategy")
59+
public void listen(List<Thing> things) {
60+
...
61+
}
62+
----
63+
In this case, `IgnoreEmptyBatchRecordFilterStrategy` always returns empty list and return `true` as result of `ignoreEmptyBatch()`.
64+
Thus `KafkaListener#listen(...)` never will be invoked at all.
65+
66+
[source,java]
67+
----
68+
public class NotIgnoreEmptyBatchRecordFilterStrategy implements RecordFilterStrategy {
69+
...
70+
@Override
71+
public List<ConsumerRecord<String, String>> filterBatch(
72+
List<ConsumerRecord<String, String>> consumerRecords) {
73+
return List.of();
74+
}
75+
76+
@Override
77+
public boolean ignoreEmptyBatch() {
78+
return false;
79+
}
80+
};
81+
82+
// NOTE: notIgnoreEmptyBatchRecordFilterStrategy is bean name of NotIgnoreEmptyBatchRecordFilterStrategy instance.
83+
@KafkaListener(id = "filtered", topics = "topic", filter = "notIgnoreEmptyBatchRecordFilterStrategy")
84+
public void listen(List<Thing> things) {
85+
...
86+
}
87+
----
88+
However, in this case, `IgnoreEmptyBatchRecordFilterStrategy` always returns empty list and return `false` as result of `ignoreEmptyBatch()`.
89+
Thus `KafkaListener#listen(...)` always will be invoked.

spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,9 @@ A new method, `getGroupId()`, has been added to the `ConsumerSeekCallback` inter
1919
This method allows for more selective seek operations by targeting only the desired consumer group.
2020
For more details, see xref:kafka/seek.adoc#seek[Seek API Docs].
2121

22+
[[x33-new-option-ignore-empty-batch]]
23+
=== Configurable Handling of Empty Batches in Kafka Listener with RecordFilterStrategy
2224

25+
`RecordFilterStrategy` now supports ignoring empty batches that result from filtering.
26+
This can be configured through overriding default method `ignoreEmptyBatch()`, which defaults to false, ensuring `KafkaListener` is invoked even if all `ConsumerRecords` are filtered out.
27+
For more details, see xref:kafka/receiving-messages/filtering.adoc[Message receive filtering Docs].

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/FilteringBatchMessageListenerAdapter.java

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2021 the original author or authors.
2+
* Copyright 2016-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -36,6 +36,7 @@
3636
* @param <V> the value type.
3737
*
3838
* @author Gary Russell
39+
* @author Sanghyeok An
3940
*
4041
*/
4142
public class FilteringBatchMessageListenerAdapter<K, V>
@@ -44,16 +45,16 @@ public class FilteringBatchMessageListenerAdapter<K, V>
4445

4546
private final boolean ackDiscarded;
4647

48+
private final boolean consumerAware;
49+
4750
/**
4851
* Create an instance with the supplied strategy and delegate listener.
4952
* @param delegate the delegate.
5053
* @param recordFilterStrategy the filter.
5154
*/
5255
public FilteringBatchMessageListenerAdapter(BatchMessageListener<K, V> delegate,
5356
RecordFilterStrategy<K, V> recordFilterStrategy) {
54-
55-
super(delegate, recordFilterStrategy);
56-
this.ackDiscarded = false;
57+
this(delegate, recordFilterStrategy, false);
5758
}
5859

5960
/**
@@ -71,22 +72,25 @@ public FilteringBatchMessageListenerAdapter(BatchMessageListener<K, V> delegate,
7172

7273
super(delegate, recordFilterStrategy);
7374
this.ackDiscarded = ackDiscarded;
75+
this.consumerAware = this.delegateType.equals(ListenerType.ACKNOWLEDGING_CONSUMER_AWARE) ||
76+
this.delegateType.equals(ListenerType.CONSUMER_AWARE);
7477
}
7578

7679
@Override
7780
public void onMessage(List<ConsumerRecord<K, V>> records, @Nullable Acknowledgment acknowledgment,
7881
Consumer<?, ?> consumer) {
7982

80-
List<ConsumerRecord<K, V>> consumerRecords = getRecordFilterStrategy().filterBatch(records);
83+
final RecordFilterStrategy<K, V> recordFilterStrategy = getRecordFilterStrategy();
84+
final List<ConsumerRecord<K, V>> consumerRecords = recordFilterStrategy.filterBatch(records);
8185
Assert.state(consumerRecords != null, "filter returned null from filterBatch");
82-
boolean consumerAware = this.delegateType.equals(ListenerType.ACKNOWLEDGING_CONSUMER_AWARE)
83-
|| this.delegateType.equals(ListenerType.CONSUMER_AWARE);
84-
/*
85-
* An empty list goes to the listener if ackDiscarded is false and the listener can ack
86-
* either through the acknowledgment
87-
*/
88-
if (consumerRecords.size() > 0 || consumerAware
89-
|| (!this.ackDiscarded && this.delegateType.equals(ListenerType.ACKNOWLEDGING))) {
86+
87+
if (recordFilterStrategy.ignoreEmptyBatch() &&
88+
consumerRecords.isEmpty() &&
89+
acknowledgment != null) {
90+
acknowledgment.acknowledge();
91+
}
92+
else if (!consumerRecords.isEmpty() || this.consumerAware
93+
|| (!this.ackDiscarded && this.delegateType.equals(ListenerType.ACKNOWLEDGING))) {
9094
invokeDelegate(consumerRecords, acknowledgment, consumer);
9195
}
9296
else {
@@ -98,6 +102,7 @@ public void onMessage(List<ConsumerRecord<K, V>> records, @Nullable Acknowledgme
98102

99103
private void invokeDelegate(List<ConsumerRecord<K, V>> consumerRecords, Acknowledgment acknowledgment,
100104
Consumer<?, ?> consumer) {
105+
101106
switch (this.delegateType) {
102107
case ACKNOWLEDGING_CONSUMER_AWARE:
103108
this.delegate.onMessage(consumerRecords, acknowledgment, consumer);

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/RecordFilterStrategy.java

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2021 the original author or authors.
2+
* Copyright 2016-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,11 +16,12 @@
1616

1717
package org.springframework.kafka.listener.adapter;
1818

19-
import java.util.Iterator;
2019
import java.util.List;
2120

2221
import org.apache.kafka.clients.consumer.ConsumerRecord;
2322

23+
import org.springframework.kafka.listener.BatchMessageListener;
24+
2425
/**
2526
* Implementations of this interface can signal that a record about
2627
* to be delivered to a message listener should be discarded instead
@@ -30,7 +31,7 @@
3031
* @param <V> the value type.
3132
*
3233
* @author Gary Russell
33-
*
34+
* @author Sanghyeok An
3435
*/
3536
public interface RecordFilterStrategy<K, V> {
3637

@@ -49,13 +50,20 @@ public interface RecordFilterStrategy<K, V> {
4950
* @since 2.8
5051
*/
5152
default List<ConsumerRecord<K, V>> filterBatch(List<ConsumerRecord<K, V>> records) {
52-
Iterator<ConsumerRecord<K, V>> iterator = records.iterator();
53-
while (iterator.hasNext()) {
54-
if (filter(iterator.next())) {
55-
iterator.remove();
56-
}
57-
}
53+
records.removeIf(this::filter);
5854
return records;
5955
}
6056

57+
/**
58+
* Determine whether {@link FilteringBatchMessageListenerAdapter} should invoke
59+
* the {@link BatchMessageListener} when all {@link ConsumerRecord}s in a batch have been filtered out
60+
* resulting in empty list. By default, do invoke the {@link BatchMessageListener} (return false).
61+
* @return true for {@link FilteringBatchMessageListenerAdapter} to {@link BatchMessageListener}
62+
* when all {@link ConsumerRecord} in a batch filtered out
63+
* @since 3.3
64+
*/
65+
default boolean ignoreEmptyBatch() {
66+
return false;
67+
}
68+
6169
}

0 commit comments

Comments
 (0)