diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/message-listener-container.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/message-listener-container.adoc index 4670e15c15..3461b74c59 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/message-listener-container.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/message-listener-container.adoc @@ -22,6 +22,35 @@ IMPORTANT: If the interceptor mutates the record (by creating a new one), the `t The `CompositeRecordInterceptor` and `CompositeBatchInterceptor` can be used to invoke multiple interceptors. +Starting with version 4.0, `AbstractMessageListenerContainer` exposes `getRecordInterceptor()` as a public method. +If the returned interceptor is an instance of `CompositeRecordInterceptor`, additional `RecordInterceptor` instances can be added to it even after the container instance extending `AbstractMessageListenerContainer` has been created and a `RecordInterceptor` has already been configured. +The following example shows how to do so: + +[source, java] +---- +public void configureRecordInterceptor(KafkaMessageListenerContainer container) { + CompositeRecordInterceptor compositeInterceptor; + + RecordInterceptor previousInterceptor = container.getRecordInterceptor(); + if (previousInterceptor instanceof CompositeRecordInterceptor interceptor) { + compositeInterceptor = interceptor; + } else { + compositeInterceptor = new CompositeRecordInterceptor<>(); + container.setRecordInterceptor(compositeInterceptor); + } + + if (previousInterceptor != null) { + compositeRecordInterceptor.addRecordInterceptor(previousInterceptor); + } + + RecordInterceptor recordInterceptor1 = new RecordInterceptor() {...}; + RecordInterceptor recordInterceptor2 = new RecordInterceptor() {...}; + + compositeInterceptor.addRecordInterceptor(recordInterceptor1); + compositeInterceptor.addRecordInterceptor(recordInterceptor2); +} +---- + By default, starting with version 2.8, when using transactions, the interceptor is invoked before the transaction has started. You can set the listener container's `interceptBeforeTx` property to `false` to invoke the interceptor after the transaction has started instead. Starting with version 2.9, this will apply to any transaction manager, not just `KafkaAwareTransactionManager`+++s+++. @@ -265,4 +294,3 @@ The listener containers implement `SmartLifecycle`, and `autoStartup` is `true` The containers are started in a late phase (`Integer.MAX-VALUE - 100`). Other components that implement `SmartLifecycle`, to handle data from listeners, should be started in an earlier phase. The `- 100` leaves room for later phases to enable components to be auto-started after the containers. - diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc index 72ee58b4bc..72d821ba98 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc @@ -76,3 +76,9 @@ For details, see xref:kafka/receiving-messages/rebalance-listeners.adoc#new-reba The `DefaultKafkaHeaderMapper` and `SimpleKafkaHeaderMapper` support multi-value header mapping for Kafka records. More details are available in xref:kafka/headers.adoc#multi-value-header[Support multi-value header mapping]. + +[[x40-add-record-interceptor]] +=== Configure additional `RecordInterceptor` + +Listener containers now support interceptor customization via `getRecordInterceptor()`. +See the xref:kafka/receiving-messages/message-listener-container.adoc#message-listener-container[Message Listener Containers] section for details. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java index b4b3dc8d4e..faf9882bd0 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java @@ -460,7 +460,12 @@ public void setKafkaAdmin(KafkaAdmin kafkaAdmin) { this.kafkaAdmin = kafkaAdmin; } - protected @Nullable RecordInterceptor getRecordInterceptor() { + /** + * Get the {@link RecordInterceptor} for modification, if configured. + * @return the {@link RecordInterceptor}, or {@code null} if not configured + * @since 4.0 + */ + public @Nullable RecordInterceptor getRecordInterceptor() { return this.recordInterceptor; } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/CompositeRecordInterceptor.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/CompositeRecordInterceptor.java index 88014d7d8a..9147694416 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/CompositeRecordInterceptor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/CompositeRecordInterceptor.java @@ -35,6 +35,7 @@ * * @author Artem Bilan * @author Gary Russell + * @author Sanghyeok An * @since 2.3 * */ @@ -92,4 +93,13 @@ public void afterRecord(ConsumerRecord record, Consumer consumer) { this.delegates.forEach(del -> del.afterRecord(record, consumer)); } + /** + * Add an {@link RecordInterceptor} to delegates. + * @param recordInterceptor the interceptor. + * @since 4.0 + */ + public void addRecordInterceptor(RecordInterceptor recordInterceptor) { + this.delegates.add(recordInterceptor); + } + } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java index 5b4919b2ee..0bcc4d8774 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java @@ -3842,7 +3842,7 @@ public void onMessage(ConsumerRecord data) { containerProps.setClientId("clientId"); CountDownLatch afterLatch = new CountDownLatch(1); - RecordInterceptor recordInterceptor = spy(new RecordInterceptor() { + RecordInterceptor recordInterceptor1 = spy(new RecordInterceptor() { @Override public @NonNull ConsumerRecord intercept(ConsumerRecord record, @@ -3858,25 +3858,54 @@ public void clearThreadState(Consumer consumer) { }); + RecordInterceptor recordInterceptor2 = spy(new RecordInterceptor() { + + @Override + public @NonNull ConsumerRecord intercept(ConsumerRecord record, + Consumer consumer) { + + return record; + } + + @Override + public void clearThreadState(Consumer consumer) { + afterLatch.countDown(); + } + + }); + KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(cf, containerProps); - container.setRecordInterceptor(recordInterceptor); + container.setRecordInterceptor(new CompositeRecordInterceptor<>()); + if (container.getRecordInterceptor() instanceof CompositeRecordInterceptor composite) { + composite.addRecordInterceptor(recordInterceptor1); + composite.addRecordInterceptor(recordInterceptor2); + } + container.start(); assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); assertThat(afterLatch.await(10, TimeUnit.SECONDS)).isTrue(); - InOrder inOrder = inOrder(recordInterceptor, messageListener, consumer); - inOrder.verify(recordInterceptor).setupThreadState(eq(consumer)); + InOrder inOrder = inOrder(recordInterceptor1, recordInterceptor2, messageListener, consumer); + inOrder.verify(recordInterceptor1).setupThreadState(eq(consumer)); + inOrder.verify(recordInterceptor2).setupThreadState(eq(consumer)); inOrder.verify(consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT)); - inOrder.verify(recordInterceptor).intercept(eq(firstRecord), eq(consumer)); + inOrder.verify(recordInterceptor1).intercept(eq(firstRecord), eq(consumer)); + inOrder.verify(recordInterceptor2).intercept(eq(firstRecord), eq(consumer)); inOrder.verify(messageListener).onMessage(eq(firstRecord)); - inOrder.verify(recordInterceptor).success(eq(firstRecord), eq(consumer)); - inOrder.verify(recordInterceptor).afterRecord(eq(firstRecord), eq(consumer)); - inOrder.verify(recordInterceptor).intercept(eq(secondRecord), eq(consumer)); + inOrder.verify(recordInterceptor1).success(eq(firstRecord), eq(consumer)); + inOrder.verify(recordInterceptor2).success(eq(firstRecord), eq(consumer)); + inOrder.verify(recordInterceptor1).afterRecord(eq(firstRecord), eq(consumer)); + inOrder.verify(recordInterceptor2).afterRecord(eq(firstRecord), eq(consumer)); + inOrder.verify(recordInterceptor1).intercept(eq(secondRecord), eq(consumer)); + inOrder.verify(recordInterceptor2).intercept(eq(secondRecord), eq(consumer)); inOrder.verify(messageListener).onMessage(eq(secondRecord)); - inOrder.verify(recordInterceptor).success(eq(secondRecord), eq(consumer)); - inOrder.verify(recordInterceptor).afterRecord(eq(secondRecord), eq(consumer)); - inOrder.verify(recordInterceptor).clearThreadState(eq(consumer)); + inOrder.verify(recordInterceptor1).success(eq(secondRecord), eq(consumer)); + inOrder.verify(recordInterceptor2).success(eq(secondRecord), eq(consumer)); + inOrder.verify(recordInterceptor1).afterRecord(eq(secondRecord), eq(consumer)); + inOrder.verify(recordInterceptor2).afterRecord(eq(secondRecord), eq(consumer)); + inOrder.verify(recordInterceptor1).clearThreadState(eq(consumer)); + inOrder.verify(recordInterceptor2).clearThreadState(eq(consumer)); container.stop(); }