From 0c75fc7417fb0cb94f1bf689c792cc718bbcd37b Mon Sep 17 00:00:00 2001 From: Sanghyeok An Date: Sun, 1 Jun 2025 12:46:50 +0900 Subject: [PATCH 01/15] spring-projectsGH-3542: Adds the ability to add record interceptors instead of override them Fixes: #3542 Issue link: #3542 What? Change `RecordInterceptor` to `List` in `MessageListenerContainer`. Why? To allow adding multiple `RecordInterceptor` instances instead of overriding the existing one. Currently, only a single `RecordInterceptor` is supported. Users may want to register multiple `RecordInterceptors`. There are some workarounds, but they are not clean or ideal solutions. By supporting `List, users can add their own interceptors via `setRecordInterceptor(...)`. Signed-off-by: Sanghyeok An --- .../AbstractMessageListenerContainer.java | 12 ++- .../ConcurrentMessageListenerContainer.java | 5 +- .../KafkaMessageListenerContainer.java | 74 ++++++++++--------- 3 files changed, 53 insertions(+), 38 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java index b4b3dc8d4e..365a606da4 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java @@ -16,6 +16,7 @@ package org.springframework.kafka.listener; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.List; @@ -114,7 +115,8 @@ public abstract class AbstractMessageListenerContainer private int topicCheckTimeout = DEFAULT_TOPIC_CHECK_TIMEOUT; - private @Nullable RecordInterceptor recordInterceptor; + // TODO: HERE ASH + private List> recordInterceptors = new ArrayList<>(); private @Nullable BatchInterceptor batchInterceptor; @@ -460,8 +462,8 @@ public void setKafkaAdmin(KafkaAdmin kafkaAdmin) { this.kafkaAdmin = kafkaAdmin; } - protected @Nullable RecordInterceptor getRecordInterceptor() { - return this.recordInterceptor; + protected List> getRecordInterceptors() { + return this.recordInterceptors; } /** @@ -472,7 +474,9 @@ public void setKafkaAdmin(KafkaAdmin kafkaAdmin) { * @see #setInterceptBeforeTx(boolean) */ public void setRecordInterceptor(@Nullable RecordInterceptor recordInterceptor) { - this.recordInterceptor = recordInterceptor; + if (recordInterceptor != null) { + this.recordInterceptors.add(recordInterceptor); + } } protected @Nullable BatchInterceptor getBatchInterceptor() { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java index 5d75f93db6..6ea64a30ed 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java @@ -60,6 +60,7 @@ * @author Tomaz Fernandes * @author Wang Zhiyang * @author Lokesh Alamuri + * @author Sanghyeok An */ public class ConcurrentMessageListenerContainer extends AbstractMessageListenerContainer { @@ -282,7 +283,9 @@ private void configureChildContainer(int index, KafkaMessageListenerContainer 1 || this.alwaysClientIdSuffix ? "-" + index : ""); container.setCommonErrorHandler(getCommonErrorHandler()); container.setAfterRollbackProcessor(getAfterRollbackProcessor()); - container.setRecordInterceptor(getRecordInterceptor()); + for (RecordInterceptor recordInterceptor : getRecordInterceptors()) { + container.setRecordInterceptor(recordInterceptor); + } container.setBatchInterceptor(getBatchInterceptor()); container.setInterceptBeforeTx(isInterceptBeforeTx()); container.setListenerInfo(getListenerInfo()); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 8362cc4144..2e484ac2e6 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -714,17 +714,17 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume private final @Nullable Duration syncCommitTimeout; - private final @Nullable RecordInterceptor recordInterceptor = + private final List> recordInterceptors = !isInterceptBeforeTx() || this.transactionManager == null - ? getRecordInterceptor() - : null; + ? getRecordInterceptors() + : new ArrayList<>(); - private final @Nullable RecordInterceptor earlyRecordInterceptor = + private final List> earlyRecordInterceptors = isInterceptBeforeTx() && this.transactionManager != null - ? getRecordInterceptor() - : null; + ? getRecordInterceptors() + : new ArrayList<>(); - private final @Nullable RecordInterceptor commonRecordInterceptor = getRecordInterceptor(); + private final List> commonRecordInterceptors = getRecordInterceptors(); private final @Nullable BatchInterceptor batchInterceptor = !isInterceptBeforeTx() || this.transactionManager == null @@ -738,7 +738,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume private final @Nullable BatchInterceptor commonBatchInterceptor = getBatchInterceptor(); - private final @Nullable ThreadStateProcessor pollThreadStateProcessor; + private final List pollThreadStateProcessor; private final ConsumerSeekCallback seekCallback = new InitialOrIdleSeekCallback(); @@ -1040,9 +1040,20 @@ private void obtainClusterId() { } } - @Nullable - private ThreadStateProcessor setUpPollProcessor(boolean batch) { - return batch ? this.commonBatchInterceptor : this.commonRecordInterceptor; + private List setUpPollProcessor(boolean batch) { + if (batch) { + if (this.commonBatchInterceptor != null) { + List threadStateProcessors = new ArrayList<>(); + threadStateProcessors.add(this.commonBatchInterceptor); + return threadStateProcessors; + } + else { + return new ArrayList<>(); + } + } + else { + return new ArrayList<>(this.commonRecordInterceptors); + } } @Nullable @@ -1548,9 +1559,7 @@ private void invokeIfHaveRecords(@Nullable ConsumerRecords records) { } private void clearThreadState() { - if (this.pollThreadStateProcessor != null) { - this.pollThreadStateProcessor.clearThreadState(this.consumer); - } + this.pollThreadStateProcessor.forEach(threadStateProcessor -> threadStateProcessor.clearThreadState(this.consumer)); } private void checkIdlePartitions() { @@ -1708,9 +1717,7 @@ private ConsumerRecords pollConsumer() { } private void beforePoll() { - if (this.pollThreadStateProcessor != null) { - this.pollThreadStateProcessor.setupThreadState(this.consumer); - } + this.pollThreadStateProcessor.forEach(threadStateProcessor -> threadStateProcessor.setupThreadState(this.consumer)); } private synchronized void captureOffsets(ConsumerRecords records) { @@ -2548,9 +2555,7 @@ private void invokeRecordListenerInTx(final ConsumerRecords records) { this.logger.error(ex, "Transaction rolled back"); recordAfterRollback(iterator, cRecord, ex); } - if (this.commonRecordInterceptor != null) { - this.commonRecordInterceptor.afterRecord(cRecord, this.consumer); - } + this.commonRecordInterceptors.forEach(interceptor -> interceptor.afterRecord(cRecord, this.consumer)); if (this.nackSleepDurationMillis >= 0) { handleNack(records, cRecord); break; @@ -2627,9 +2632,7 @@ private void doInvokeWithRecords(final ConsumerRecords records) { } this.logger.trace(() -> "Processing " + KafkaUtils.format(cRecord)); doInvokeRecordListener(cRecord, iterator); - if (this.commonRecordInterceptor != null) { - this.commonRecordInterceptor.afterRecord(cRecord, this.consumer); - } + this.commonRecordInterceptors.forEach(interceptor -> interceptor.afterRecord(cRecord, this.consumer)); if (this.nackSleepDurationMillis >= 0) { handleNack(records, cRecord); break; @@ -2680,14 +2683,16 @@ private ConsumerRecords checkEarlyIntercept(ConsumerRecords nextArg) private ConsumerRecord checkEarlyIntercept(ConsumerRecord recordArg) { internalHeaders(recordArg); ConsumerRecord cRecord = recordArg; - if (this.earlyRecordInterceptor != null) { - cRecord = this.earlyRecordInterceptor.intercept(cRecord, this.consumer); + + for (RecordInterceptor earlyRecordInterceptor : this.earlyRecordInterceptors) { + cRecord = earlyRecordInterceptor.intercept(cRecord, this.consumer); if (cRecord == null) { this.logger.debug(() -> "RecordInterceptor returned null, skipping: " - + KafkaUtils.format(recordArg)); + + KafkaUtils.format(recordArg)); ackCurrent(recordArg); - this.earlyRecordInterceptor.success(recordArg, this.consumer); - this.earlyRecordInterceptor.afterRecord(recordArg, this.consumer); + earlyRecordInterceptor.success(recordArg, this.consumer); + earlyRecordInterceptor.afterRecord(recordArg, this.consumer); + break; } } return cRecord; @@ -2848,13 +2853,13 @@ private void commitOffsetsIfNeededAfterHandlingError(final ConsumerRecord } private void recordInterceptAfter(ConsumerRecord records, @Nullable Exception exception) { - if (this.commonRecordInterceptor != null) { + if (!this.commonRecordInterceptors.isEmpty()) { try { if (exception == null) { - this.commonRecordInterceptor.success(records, this.consumer); + this.commonRecordInterceptors.forEach(interceptor -> interceptor.success(records, this.consumer)); } else { - this.commonRecordInterceptor.failure(records, exception, this.consumer); + this.commonRecordInterceptors.forEach(interceptor -> interceptor.failure(records, exception, this.consumer)); } } catch (Exception e) { @@ -2888,8 +2893,11 @@ private void invokeOnMessage(final ConsumerRecord cRecord) { private void doInvokeOnMessage(final ConsumerRecord recordArg) { ConsumerRecord cRecord = recordArg; - if (this.recordInterceptor != null) { - cRecord = this.recordInterceptor.intercept(cRecord, this.consumer); + for (RecordInterceptor recordInterceptor : this.recordInterceptors) { + cRecord = recordInterceptor.intercept(cRecord, this.consumer); + if (cRecord == null) { + break; + } } if (cRecord == null) { this.logger.debug(() -> "RecordInterceptor returned null, skipping: " From 5896ec2854b8e850d21722fa542e3861daef3ae7 Mon Sep 17 00:00:00 2001 From: Sanghyeok An Date: Sun, 1 Jun 2025 13:04:48 +0900 Subject: [PATCH 02/15] Remove useless comment and code. Signed-off-by: Sanghyeok An --- .../AbstractMessageListenerContainer.java | 1 - .../KafkaMessageListenerContainer.java | 18 ++++++++---------- 2 files changed, 8 insertions(+), 11 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java index 365a606da4..5b83c87a6b 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java @@ -115,7 +115,6 @@ public abstract class AbstractMessageListenerContainer private int topicCheckTimeout = DEFAULT_TOPIC_CHECK_TIMEOUT; - // TODO: HERE ASH private List> recordInterceptors = new ArrayList<>(); private @Nullable BatchInterceptor batchInterceptor; diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 2e484ac2e6..bbaa486e9b 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -2853,19 +2853,17 @@ private void commitOffsetsIfNeededAfterHandlingError(final ConsumerRecord } private void recordInterceptAfter(ConsumerRecord records, @Nullable Exception exception) { - if (!this.commonRecordInterceptors.isEmpty()) { - try { - if (exception == null) { - this.commonRecordInterceptors.forEach(interceptor -> interceptor.success(records, this.consumer)); - } - else { - this.commonRecordInterceptors.forEach(interceptor -> interceptor.failure(records, exception, this.consumer)); - } + try { + if (exception == null) { + this.commonRecordInterceptors.forEach(interceptor -> interceptor.success(records, this.consumer)); } - catch (Exception e) { - this.logger.error(e, "RecordInterceptor.success/failure threw an exception"); + else { + this.commonRecordInterceptors.forEach(interceptor -> interceptor.failure(records, exception, this.consumer)); } } + catch (Exception e) { + this.logger.error(e, "RecordInterceptor.success/failure threw an exception"); + } } private void invokeOnMessage(final ConsumerRecord cRecord) { From 7a6a385120df2452fd42ace25d4fe69d553aaa3a Mon Sep 17 00:00:00 2001 From: Sanghyeok An Date: Sun, 1 Jun 2025 22:21:30 +0900 Subject: [PATCH 03/15] Revert 2 commits. Signed-off-by: Sanghyeok An --- .../AbstractMessageListenerContainer.java | 11 +-- .../ConcurrentMessageListenerContainer.java | 5 +- .../KafkaMessageListenerContainer.java | 86 +++++++++---------- 3 files changed, 45 insertions(+), 57 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java index 5b83c87a6b..b4b3dc8d4e 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java @@ -16,7 +16,6 @@ package org.springframework.kafka.listener; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.List; @@ -115,7 +114,7 @@ public abstract class AbstractMessageListenerContainer private int topicCheckTimeout = DEFAULT_TOPIC_CHECK_TIMEOUT; - private List> recordInterceptors = new ArrayList<>(); + private @Nullable RecordInterceptor recordInterceptor; private @Nullable BatchInterceptor batchInterceptor; @@ -461,8 +460,8 @@ public void setKafkaAdmin(KafkaAdmin kafkaAdmin) { this.kafkaAdmin = kafkaAdmin; } - protected List> getRecordInterceptors() { - return this.recordInterceptors; + protected @Nullable RecordInterceptor getRecordInterceptor() { + return this.recordInterceptor; } /** @@ -473,9 +472,7 @@ protected List> getRecordInterceptors() { * @see #setInterceptBeforeTx(boolean) */ public void setRecordInterceptor(@Nullable RecordInterceptor recordInterceptor) { - if (recordInterceptor != null) { - this.recordInterceptors.add(recordInterceptor); - } + this.recordInterceptor = recordInterceptor; } protected @Nullable BatchInterceptor getBatchInterceptor() { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java index 6ea64a30ed..5d75f93db6 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java @@ -60,7 +60,6 @@ * @author Tomaz Fernandes * @author Wang Zhiyang * @author Lokesh Alamuri - * @author Sanghyeok An */ public class ConcurrentMessageListenerContainer extends AbstractMessageListenerContainer { @@ -283,9 +282,7 @@ private void configureChildContainer(int index, KafkaMessageListenerContainer 1 || this.alwaysClientIdSuffix ? "-" + index : ""); container.setCommonErrorHandler(getCommonErrorHandler()); container.setAfterRollbackProcessor(getAfterRollbackProcessor()); - for (RecordInterceptor recordInterceptor : getRecordInterceptors()) { - container.setRecordInterceptor(recordInterceptor); - } + container.setRecordInterceptor(getRecordInterceptor()); container.setBatchInterceptor(getBatchInterceptor()); container.setInterceptBeforeTx(isInterceptBeforeTx()); container.setListenerInfo(getListenerInfo()); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index bbaa486e9b..8362cc4144 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -714,17 +714,17 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume private final @Nullable Duration syncCommitTimeout; - private final List> recordInterceptors = + private final @Nullable RecordInterceptor recordInterceptor = !isInterceptBeforeTx() || this.transactionManager == null - ? getRecordInterceptors() - : new ArrayList<>(); + ? getRecordInterceptor() + : null; - private final List> earlyRecordInterceptors = + private final @Nullable RecordInterceptor earlyRecordInterceptor = isInterceptBeforeTx() && this.transactionManager != null - ? getRecordInterceptors() - : new ArrayList<>(); + ? getRecordInterceptor() + : null; - private final List> commonRecordInterceptors = getRecordInterceptors(); + private final @Nullable RecordInterceptor commonRecordInterceptor = getRecordInterceptor(); private final @Nullable BatchInterceptor batchInterceptor = !isInterceptBeforeTx() || this.transactionManager == null @@ -738,7 +738,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume private final @Nullable BatchInterceptor commonBatchInterceptor = getBatchInterceptor(); - private final List pollThreadStateProcessor; + private final @Nullable ThreadStateProcessor pollThreadStateProcessor; private final ConsumerSeekCallback seekCallback = new InitialOrIdleSeekCallback(); @@ -1040,20 +1040,9 @@ private void obtainClusterId() { } } - private List setUpPollProcessor(boolean batch) { - if (batch) { - if (this.commonBatchInterceptor != null) { - List threadStateProcessors = new ArrayList<>(); - threadStateProcessors.add(this.commonBatchInterceptor); - return threadStateProcessors; - } - else { - return new ArrayList<>(); - } - } - else { - return new ArrayList<>(this.commonRecordInterceptors); - } + @Nullable + private ThreadStateProcessor setUpPollProcessor(boolean batch) { + return batch ? this.commonBatchInterceptor : this.commonRecordInterceptor; } @Nullable @@ -1559,7 +1548,9 @@ private void invokeIfHaveRecords(@Nullable ConsumerRecords records) { } private void clearThreadState() { - this.pollThreadStateProcessor.forEach(threadStateProcessor -> threadStateProcessor.clearThreadState(this.consumer)); + if (this.pollThreadStateProcessor != null) { + this.pollThreadStateProcessor.clearThreadState(this.consumer); + } } private void checkIdlePartitions() { @@ -1717,7 +1708,9 @@ private ConsumerRecords pollConsumer() { } private void beforePoll() { - this.pollThreadStateProcessor.forEach(threadStateProcessor -> threadStateProcessor.setupThreadState(this.consumer)); + if (this.pollThreadStateProcessor != null) { + this.pollThreadStateProcessor.setupThreadState(this.consumer); + } } private synchronized void captureOffsets(ConsumerRecords records) { @@ -2555,7 +2548,9 @@ private void invokeRecordListenerInTx(final ConsumerRecords records) { this.logger.error(ex, "Transaction rolled back"); recordAfterRollback(iterator, cRecord, ex); } - this.commonRecordInterceptors.forEach(interceptor -> interceptor.afterRecord(cRecord, this.consumer)); + if (this.commonRecordInterceptor != null) { + this.commonRecordInterceptor.afterRecord(cRecord, this.consumer); + } if (this.nackSleepDurationMillis >= 0) { handleNack(records, cRecord); break; @@ -2632,7 +2627,9 @@ private void doInvokeWithRecords(final ConsumerRecords records) { } this.logger.trace(() -> "Processing " + KafkaUtils.format(cRecord)); doInvokeRecordListener(cRecord, iterator); - this.commonRecordInterceptors.forEach(interceptor -> interceptor.afterRecord(cRecord, this.consumer)); + if (this.commonRecordInterceptor != null) { + this.commonRecordInterceptor.afterRecord(cRecord, this.consumer); + } if (this.nackSleepDurationMillis >= 0) { handleNack(records, cRecord); break; @@ -2683,16 +2680,14 @@ private ConsumerRecords checkEarlyIntercept(ConsumerRecords nextArg) private ConsumerRecord checkEarlyIntercept(ConsumerRecord recordArg) { internalHeaders(recordArg); ConsumerRecord cRecord = recordArg; - - for (RecordInterceptor earlyRecordInterceptor : this.earlyRecordInterceptors) { - cRecord = earlyRecordInterceptor.intercept(cRecord, this.consumer); + if (this.earlyRecordInterceptor != null) { + cRecord = this.earlyRecordInterceptor.intercept(cRecord, this.consumer); if (cRecord == null) { this.logger.debug(() -> "RecordInterceptor returned null, skipping: " - + KafkaUtils.format(recordArg)); + + KafkaUtils.format(recordArg)); ackCurrent(recordArg); - earlyRecordInterceptor.success(recordArg, this.consumer); - earlyRecordInterceptor.afterRecord(recordArg, this.consumer); - break; + this.earlyRecordInterceptor.success(recordArg, this.consumer); + this.earlyRecordInterceptor.afterRecord(recordArg, this.consumer); } } return cRecord; @@ -2853,17 +2848,19 @@ private void commitOffsetsIfNeededAfterHandlingError(final ConsumerRecord } private void recordInterceptAfter(ConsumerRecord records, @Nullable Exception exception) { - try { - if (exception == null) { - this.commonRecordInterceptors.forEach(interceptor -> interceptor.success(records, this.consumer)); + if (this.commonRecordInterceptor != null) { + try { + if (exception == null) { + this.commonRecordInterceptor.success(records, this.consumer); + } + else { + this.commonRecordInterceptor.failure(records, exception, this.consumer); + } } - else { - this.commonRecordInterceptors.forEach(interceptor -> interceptor.failure(records, exception, this.consumer)); + catch (Exception e) { + this.logger.error(e, "RecordInterceptor.success/failure threw an exception"); } } - catch (Exception e) { - this.logger.error(e, "RecordInterceptor.success/failure threw an exception"); - } } private void invokeOnMessage(final ConsumerRecord cRecord) { @@ -2891,11 +2888,8 @@ private void invokeOnMessage(final ConsumerRecord cRecord) { private void doInvokeOnMessage(final ConsumerRecord recordArg) { ConsumerRecord cRecord = recordArg; - for (RecordInterceptor recordInterceptor : this.recordInterceptors) { - cRecord = recordInterceptor.intercept(cRecord, this.consumer); - if (cRecord == null) { - break; - } + if (this.recordInterceptor != null) { + cRecord = this.recordInterceptor.intercept(cRecord, this.consumer); } if (cRecord == null) { this.logger.debug(() -> "RecordInterceptor returned null, skipping: " From 7d94fcadaf644c62fdee9067cc97e60bc0aaa8e6 Mon Sep 17 00:00:00 2001 From: Sanghyeok An Date: Sun, 1 Jun 2025 22:51:34 +0900 Subject: [PATCH 04/15] Add new API addRecordInterceptor. Signed-off-by: Sanghyeok An --- .../AbstractMessageListenerContainer.java | 18 ++++++++++++++++++ .../listener/CompositeRecordInterceptor.java | 8 ++++++++ 2 files changed, 26 insertions(+) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java index b4b3dc8d4e..d3e019edb4 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java @@ -475,6 +475,24 @@ public void setRecordInterceptor(@Nullable RecordInterceptor recordInterce this.recordInterceptor = recordInterceptor; } + /** + * Add an interceptor to be called before calling the record listener + * if the {@link AbstractMessageListenerContainer} is configured with a {@link CompositeRecordInterceptor}. + * If a {@link CompositeRecordInterceptor} is not configured, the {@link AbstractMessageListenerContainer} + * will not add {@link RecordInterceptor}. + * Does not apply to batch listeners. + * @param recordInterceptor the interceptor. + * @since 4.0 + */ + public void addRecordInterceptor(RecordInterceptor recordInterceptor) { + if (this.recordInterceptor instanceof CompositeRecordInterceptor compositeRecordInterceptor) { + compositeRecordInterceptor.addRecordInterceptor(recordInterceptor); + } + else { + this.logger.warn("Failed to add record interceptor."); + } + } + protected @Nullable BatchInterceptor getBatchInterceptor() { return this.batchInterceptor; } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/CompositeRecordInterceptor.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/CompositeRecordInterceptor.java index 88014d7d8a..d5aa882de1 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/CompositeRecordInterceptor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/CompositeRecordInterceptor.java @@ -92,4 +92,12 @@ public void afterRecord(ConsumerRecord record, Consumer consumer) { this.delegates.forEach(del -> del.afterRecord(record, consumer)); } + /** + * Add an {@link RecordInterceptor} to delegates. + * @param recordInterceptor the interceptor. + */ + public void addRecordInterceptor(RecordInterceptor recordInterceptor) { + this.delegates.add(recordInterceptor); + } + } From d38216b6a008797641dd4d5972845ff06bd009eb Mon Sep 17 00:00:00 2001 From: Sanghyeok An Date: Sun, 1 Jun 2025 23:15:13 +0900 Subject: [PATCH 05/15] Add an CompositeRecordInterceptor to addRecordInterceptor API. Signed-off-by: Sanghyeok An --- .../listener/AbstractMessageListenerContainer.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java index d3e019edb4..989029581b 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java @@ -477,9 +477,9 @@ public void setRecordInterceptor(@Nullable RecordInterceptor recordInterce /** * Add an interceptor to be called before calling the record listener - * if the {@link AbstractMessageListenerContainer} is configured with a {@link CompositeRecordInterceptor}. - * If a {@link CompositeRecordInterceptor} is not configured, the {@link AbstractMessageListenerContainer} - * will not add {@link RecordInterceptor}. + * If the {@link AbstractMessageListenerContainer} is not configured with a + * {@link CompositeRecordInterceptor}, {@link CompositeRecordInterceptor} will be created internally + * and configured to hold the added interceptor. * Does not apply to batch listeners. * @param recordInterceptor the interceptor. * @since 4.0 @@ -488,8 +488,12 @@ public void addRecordInterceptor(RecordInterceptor recordInterceptor) { if (this.recordInterceptor instanceof CompositeRecordInterceptor compositeRecordInterceptor) { compositeRecordInterceptor.addRecordInterceptor(recordInterceptor); } + else if (this.recordInterceptor == null) { + this.recordInterceptor = new CompositeRecordInterceptor<>(recordInterceptor); + } else { - this.logger.warn("Failed to add record interceptor."); + this.recordInterceptor = new CompositeRecordInterceptor<>(this.recordInterceptor, + recordInterceptor); } } From b84556e9bdd793686c3edc3987f003bf9a768412 Mon Sep 17 00:00:00 2001 From: Sanghyeok An Date: Sun, 1 Jun 2025 23:55:39 +0900 Subject: [PATCH 06/15] Revert 2 commits. Signed-off-by: Sanghyeok An --- .../AbstractMessageListenerContainer.java | 22 ------------------- .../listener/CompositeRecordInterceptor.java | 8 ------- 2 files changed, 30 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java index 989029581b..b4b3dc8d4e 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java @@ -475,28 +475,6 @@ public void setRecordInterceptor(@Nullable RecordInterceptor recordInterce this.recordInterceptor = recordInterceptor; } - /** - * Add an interceptor to be called before calling the record listener - * If the {@link AbstractMessageListenerContainer} is not configured with a - * {@link CompositeRecordInterceptor}, {@link CompositeRecordInterceptor} will be created internally - * and configured to hold the added interceptor. - * Does not apply to batch listeners. - * @param recordInterceptor the interceptor. - * @since 4.0 - */ - public void addRecordInterceptor(RecordInterceptor recordInterceptor) { - if (this.recordInterceptor instanceof CompositeRecordInterceptor compositeRecordInterceptor) { - compositeRecordInterceptor.addRecordInterceptor(recordInterceptor); - } - else if (this.recordInterceptor == null) { - this.recordInterceptor = new CompositeRecordInterceptor<>(recordInterceptor); - } - else { - this.recordInterceptor = new CompositeRecordInterceptor<>(this.recordInterceptor, - recordInterceptor); - } - } - protected @Nullable BatchInterceptor getBatchInterceptor() { return this.batchInterceptor; } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/CompositeRecordInterceptor.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/CompositeRecordInterceptor.java index d5aa882de1..88014d7d8a 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/CompositeRecordInterceptor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/CompositeRecordInterceptor.java @@ -92,12 +92,4 @@ public void afterRecord(ConsumerRecord record, Consumer consumer) { this.delegates.forEach(del -> del.afterRecord(record, consumer)); } - /** - * Add an {@link RecordInterceptor} to delegates. - * @param recordInterceptor the interceptor. - */ - public void addRecordInterceptor(RecordInterceptor recordInterceptor) { - this.delegates.add(recordInterceptor); - } - } From defe8fb9eb59a105ade8df069946043d25391089 Mon Sep 17 00:00:00 2001 From: Sanghyeok An Date: Sun, 1 Jun 2025 23:56:53 +0900 Subject: [PATCH 07/15] Add new API addRecordInterceptor. Signed-off-by: Sanghyeok An --- .../kafka/listener/CompositeRecordInterceptor.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/CompositeRecordInterceptor.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/CompositeRecordInterceptor.java index 88014d7d8a..d5aa882de1 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/CompositeRecordInterceptor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/CompositeRecordInterceptor.java @@ -92,4 +92,12 @@ public void afterRecord(ConsumerRecord record, Consumer consumer) { this.delegates.forEach(del -> del.afterRecord(record, consumer)); } + /** + * Add an {@link RecordInterceptor} to delegates. + * @param recordInterceptor the interceptor. + */ + public void addRecordInterceptor(RecordInterceptor recordInterceptor) { + this.delegates.add(recordInterceptor); + } + } From 21a7717f17e2c9608d6362e71e9f97635da8afd6 Mon Sep 17 00:00:00 2001 From: Sanghyeok An Date: Tue, 3 Jun 2025 09:40:32 +0900 Subject: [PATCH 08/15] Addressing PR review Signed-off-by: Sanghyeok An --- .../kafka/listener/CompositeRecordInterceptor.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/CompositeRecordInterceptor.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/CompositeRecordInterceptor.java index d5aa882de1..9147694416 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/CompositeRecordInterceptor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/CompositeRecordInterceptor.java @@ -35,6 +35,7 @@ * * @author Artem Bilan * @author Gary Russell + * @author Sanghyeok An * @since 2.3 * */ @@ -95,6 +96,7 @@ public void afterRecord(ConsumerRecord record, Consumer consumer) { /** * Add an {@link RecordInterceptor} to delegates. * @param recordInterceptor the interceptor. + * @since 4.0 */ public void addRecordInterceptor(RecordInterceptor recordInterceptor) { this.delegates.add(recordInterceptor); From 0d9010d9de6e2119de88da30947698f433c6aa33 Mon Sep 17 00:00:00 2001 From: Sanghyeok An Date: Tue, 3 Jun 2025 10:53:30 +0900 Subject: [PATCH 09/15] Addressing PR review Signed-off-by: Sanghyeok An --- .../kafka/listener/AbstractMessageListenerContainer.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java index b4b3dc8d4e..14c3a8052c 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java @@ -460,7 +460,11 @@ public void setKafkaAdmin(KafkaAdmin kafkaAdmin) { this.kafkaAdmin = kafkaAdmin; } - protected @Nullable RecordInterceptor getRecordInterceptor() { + /** + * Get the {@link RecordInterceptor} for modification, if configured. + * @return the {@link RecordInterceptor}, or {@code null} if not configured + */ + public @Nullable RecordInterceptor getRecordInterceptor() { return this.recordInterceptor; } From 61c4ad24014d0027e1ae639b9b02d2a520fdd3e4 Mon Sep 17 00:00:00 2001 From: Sanghyeok An Date: Wed, 4 Jun 2025 21:35:30 +0900 Subject: [PATCH 10/15] Addressing PR review Signed-off-by: Sanghyeok An --- .../antora/modules/ROOT/pages/whats-new.adoc | 6 +++ .../AbstractMessageListenerContainer.java | 1 + .../KafkaMessageListenerContainerTests.java | 51 +++++++++++++++---- 3 files changed, 47 insertions(+), 11 deletions(-) diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc index 72ee58b4bc..aa4258e46e 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc @@ -76,3 +76,9 @@ For details, see xref:kafka/receiving-messages/rebalance-listeners.adoc#new-reba The `DefaultKafkaHeaderMapper` and `SimpleKafkaHeaderMapper` support multi-value header mapping for Kafka records. More details are available in xref:kafka/headers.adoc#multi-value-header[Support multi-value header mapping]. + +[[x40-add-record-interceptor]] +=== Configure additional `RecordInterceptor` + +The `KafkaMessageListenerContainer` and `ConcurrentMessageListenerContainer` support `getRecordInterceptor()`. +If the returned interceptor is an instance of `CompositeRecordInterceptor`, additional `RecordInterceptor` instances can be added to it. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java index 14c3a8052c..faf9882bd0 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java @@ -463,6 +463,7 @@ public void setKafkaAdmin(KafkaAdmin kafkaAdmin) { /** * Get the {@link RecordInterceptor} for modification, if configured. * @return the {@link RecordInterceptor}, or {@code null} if not configured + * @since 4.0 */ public @Nullable RecordInterceptor getRecordInterceptor() { return this.recordInterceptor; diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java index 5b4919b2ee..ae0c26cf6f 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java @@ -3842,7 +3842,7 @@ public void onMessage(ConsumerRecord data) { containerProps.setClientId("clientId"); CountDownLatch afterLatch = new CountDownLatch(1); - RecordInterceptor recordInterceptor = spy(new RecordInterceptor() { + RecordInterceptor recordInterceptor1 = spy(new RecordInterceptor() { @Override public @NonNull ConsumerRecord intercept(ConsumerRecord record, @@ -3858,25 +3858,54 @@ public void clearThreadState(Consumer consumer) { }); + RecordInterceptor recordInterceptor2 = spy(new RecordInterceptor() { + + @Override + public @NonNull ConsumerRecord intercept(ConsumerRecord record, + Consumer consumer) { + + return record; + } + + @Override + public void clearThreadState(Consumer consumer) { + afterLatch.countDown(); + } + + }); + KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(cf, containerProps); - container.setRecordInterceptor(recordInterceptor); + container.setRecordInterceptor(new CompositeRecordInterceptor<>()); + if (container.getRecordInterceptor() instanceof CompositeRecordInterceptor composite) { + composite.addRecordInterceptor(recordInterceptor1); + composite.addRecordInterceptor(recordInterceptor2); + } + container.start(); assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); assertThat(afterLatch.await(10, TimeUnit.SECONDS)).isTrue(); - InOrder inOrder = inOrder(recordInterceptor, messageListener, consumer); - inOrder.verify(recordInterceptor).setupThreadState(eq(consumer)); + InOrder inOrder = inOrder(recordInterceptor1, recordInterceptor2, messageListener, consumer); + inOrder.verify(recordInterceptor1).setupThreadState(eq(consumer)); + inOrder.verify(recordInterceptor2).setupThreadState(eq(consumer)); inOrder.verify(consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT)); - inOrder.verify(recordInterceptor).intercept(eq(firstRecord), eq(consumer)); + inOrder.verify(recordInterceptor1).intercept(eq(firstRecord), eq(consumer)); + inOrder.verify(recordInterceptor2).intercept(eq(firstRecord), eq(consumer)); inOrder.verify(messageListener).onMessage(eq(firstRecord)); - inOrder.verify(recordInterceptor).success(eq(firstRecord), eq(consumer)); - inOrder.verify(recordInterceptor).afterRecord(eq(firstRecord), eq(consumer)); - inOrder.verify(recordInterceptor).intercept(eq(secondRecord), eq(consumer)); + inOrder.verify(recordInterceptor1).success(eq(firstRecord), eq(consumer)); + inOrder.verify(recordInterceptor2).success(eq(firstRecord), eq(consumer)); + inOrder.verify(recordInterceptor1).afterRecord(eq(firstRecord), eq(consumer)); + inOrder.verify(recordInterceptor2).afterRecord(eq(firstRecord), eq(consumer)); + inOrder.verify(recordInterceptor1).intercept(eq(secondRecord), eq(consumer)); + inOrder.verify(recordInterceptor2).intercept(eq(secondRecord), eq(consumer)); inOrder.verify(messageListener).onMessage(eq(secondRecord)); - inOrder.verify(recordInterceptor).success(eq(secondRecord), eq(consumer)); - inOrder.verify(recordInterceptor).afterRecord(eq(secondRecord), eq(consumer)); - inOrder.verify(recordInterceptor).clearThreadState(eq(consumer)); + inOrder.verify(recordInterceptor1).success(eq(secondRecord), eq(consumer)); + inOrder.verify(recordInterceptor2).success(eq(secondRecord), eq(consumer)); + inOrder.verify(recordInterceptor1).afterRecord(eq(secondRecord), eq(consumer)); + inOrder.verify(recordInterceptor2).afterRecord(eq(secondRecord), eq(consumer)); + inOrder.verify(recordInterceptor1).clearThreadState(eq(consumer)); + inOrder.verify(recordInterceptor2).clearThreadState(eq(consumer)); container.stop(); } From 0a6c0205f62c1f96f5fe4523d6634eeafceacf4b Mon Sep 17 00:00:00 2001 From: Sanghyeok An Date: Wed, 4 Jun 2025 23:09:10 +0900 Subject: [PATCH 11/15] Addressing PR review Signed-off-by: Sanghyeok An --- .../kafka/listener/KafkaMessageListenerContainerTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java index ae0c26cf6f..0bcc4d8774 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java @@ -3862,7 +3862,7 @@ public void clearThreadState(Consumer consumer) { @Override public @NonNull ConsumerRecord intercept(ConsumerRecord record, - Consumer consumer) { + Consumer consumer) { return record; } From d68491310d6741382ddfcabdd1b1db325061c4c2 Mon Sep 17 00:00:00 2001 From: Sanghyeok An Date: Fri, 6 Jun 2025 08:56:42 +0900 Subject: [PATCH 12/15] Addressing PR review -s Signed-off-by: Sanghyeok An --- .../kafka/receiving-messages/message-listener-container.adoc | 5 ++++- .../src/main/antora/modules/ROOT/pages/whats-new.adoc | 1 + 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/message-listener-container.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/message-listener-container.adoc index 4670e15c15..5cdec5db00 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/message-listener-container.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/message-listener-container.adoc @@ -22,6 +22,10 @@ IMPORTANT: If the interceptor mutates the record (by creating a new one), the `t The `CompositeRecordInterceptor` and `CompositeBatchInterceptor` can be used to invoke multiple interceptors. +Starting with version 4.0, `AbstractMessageListenerContainer` exposes `getRecordInterceptor()` as a public method. +If the returned interceptor is an instance of `CompositeRecordInterceptor`, additional `RecordInterceptor` instances can be added to it even after the container instance extending `AbstractMessageListenerContainer` has been created and a `RecordInterceptor` has already been configured. + + By default, starting with version 2.8, when using transactions, the interceptor is invoked before the transaction has started. You can set the listener container's `interceptBeforeTx` property to `false` to invoke the interceptor after the transaction has started instead. Starting with version 2.9, this will apply to any transaction manager, not just `KafkaAwareTransactionManager`+++s+++. @@ -265,4 +269,3 @@ The listener containers implement `SmartLifecycle`, and `autoStartup` is `true` The containers are started in a late phase (`Integer.MAX-VALUE - 100`). Other components that implement `SmartLifecycle`, to handle data from listeners, should be started in an earlier phase. The `- 100` leaves room for later phases to enable components to be auto-started after the containers. - diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc index aa4258e46e..92daa4005d 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc @@ -82,3 +82,4 @@ More details are available in xref:kafka/headers.adoc#multi-value-header[Support The `KafkaMessageListenerContainer` and `ConcurrentMessageListenerContainer` support `getRecordInterceptor()`. If the returned interceptor is an instance of `CompositeRecordInterceptor`, additional `RecordInterceptor` instances can be added to it. +More details are available in xref:kafka/receiving-messages/message-listener-container.adoc#message-listener-container[Message Listener Containers]. From 7150d689d8ac781d38d45b2828a6ddf31d687707 Mon Sep 17 00:00:00 2001 From: Sanghyeok An Date: Sat, 7 Jun 2025 08:58:39 +0900 Subject: [PATCH 13/15] Addressing PR review. Signed-off-by: Sanghyeok An --- .../message-listener-container.adoc | 19 +++++++++++++++++++ .../antora/modules/ROOT/pages/whats-new.adoc | 5 ++--- 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/message-listener-container.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/message-listener-container.adoc index 5cdec5db00..e293b79a54 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/message-listener-container.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/message-listener-container.adoc @@ -24,7 +24,26 @@ The `CompositeRecordInterceptor` and `CompositeBatchInterceptor` can be used to Starting with version 4.0, `AbstractMessageListenerContainer` exposes `getRecordInterceptor()` as a public method. If the returned interceptor is an instance of `CompositeRecordInterceptor`, additional `RecordInterceptor` instances can be added to it even after the container instance extending `AbstractMessageListenerContainer` has been created and a `RecordInterceptor` has already been configured. +The following example shows how to do so: + +[source, java] +---- +public void configureRecordInterceptor(KafkaMessageListenerContainer container) { + CompositeRecordInterceptor compositeInterceptor; + if (container.getRecordInterceptor() instanceof CompositeRecordInterceptor interceptor) { + compositeInterceptor = interceptor; + } else { + compositeInterceptor = new CompositeRecordInterceptor<>(); + } + + RecordInterceptor recordInterceptor1 = new RecordInterceptor() {...}; + RecordInterceptor recordInterceptor2 = new RecordInterceptor() {...}; + + compositeInterceptor.addRecordInterceptor(recordInterceptor1); + compositeInterceptor.addRecordInterceptor(recordInterceptor2); +} +---- By default, starting with version 2.8, when using transactions, the interceptor is invoked before the transaction has started. You can set the listener container's `interceptBeforeTx` property to `false` to invoke the interceptor after the transaction has started instead. diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc index 92daa4005d..72d821ba98 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc @@ -80,6 +80,5 @@ More details are available in xref:kafka/headers.adoc#multi-value-header[Support [[x40-add-record-interceptor]] === Configure additional `RecordInterceptor` -The `KafkaMessageListenerContainer` and `ConcurrentMessageListenerContainer` support `getRecordInterceptor()`. -If the returned interceptor is an instance of `CompositeRecordInterceptor`, additional `RecordInterceptor` instances can be added to it. -More details are available in xref:kafka/receiving-messages/message-listener-container.adoc#message-listener-container[Message Listener Containers]. +Listener containers now support interceptor customization via `getRecordInterceptor()`. +See the xref:kafka/receiving-messages/message-listener-container.adoc#message-listener-container[Message Listener Containers] section for details. From 77b2051e7760f81cd51fe8189f78099eaf6ce13f Mon Sep 17 00:00:00 2001 From: Sanghyeok An Date: Sat, 7 Jun 2025 09:00:45 +0900 Subject: [PATCH 14/15] Add missing comments. Signed-off-by: Sanghyeok An --- .../kafka/receiving-messages/message-listener-container.adoc | 1 + 1 file changed, 1 insertion(+) diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/message-listener-container.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/message-listener-container.adoc index e293b79a54..313aff36bf 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/message-listener-container.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/message-listener-container.adoc @@ -35,6 +35,7 @@ public void configureRecordInterceptor(KafkaMessageListenerContainer(); + container.setRecordInterceptor(compositeInterceptor); } RecordInterceptor recordInterceptor1 = new RecordInterceptor() {...}; From 327debf66dc53f6696d9d24f6a95e6390b155087 Mon Sep 17 00:00:00 2001 From: Sanghyeok An Date: Sat, 7 Jun 2025 10:04:33 +0900 Subject: [PATCH 15/15] Addressing PR review. Signed-off-by: Sanghyeok An --- .../receiving-messages/message-listener-container.adoc | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/message-listener-container.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/message-listener-container.adoc index 313aff36bf..3461b74c59 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/message-listener-container.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/message-listener-container.adoc @@ -31,13 +31,18 @@ The following example shows how to do so: public void configureRecordInterceptor(KafkaMessageListenerContainer container) { CompositeRecordInterceptor compositeInterceptor; - if (container.getRecordInterceptor() instanceof CompositeRecordInterceptor interceptor) { + RecordInterceptor previousInterceptor = container.getRecordInterceptor(); + if (previousInterceptor instanceof CompositeRecordInterceptor interceptor) { compositeInterceptor = interceptor; } else { compositeInterceptor = new CompositeRecordInterceptor<>(); container.setRecordInterceptor(compositeInterceptor); } + if (previousInterceptor != null) { + compositeRecordInterceptor.addRecordInterceptor(previousInterceptor); + } + RecordInterceptor recordInterceptor1 = new RecordInterceptor() {...}; RecordInterceptor recordInterceptor2 = new RecordInterceptor() {...};