Skip to content

GH-3542: Adds the ability to add record interceptors instead of override them #3937

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,35 @@ IMPORTANT: If the interceptor mutates the record (by creating a new one), the `t

The `CompositeRecordInterceptor` and `CompositeBatchInterceptor` can be used to invoke multiple interceptors.

Starting with version 4.0, `AbstractMessageListenerContainer` exposes `getRecordInterceptor()` as a public method.
If the returned interceptor is an instance of `CompositeRecordInterceptor`, additional `RecordInterceptor` instances can be added to it even after the container instance extending `AbstractMessageListenerContainer` has been created and a `RecordInterceptor` has already been configured.
The following example shows how to do so:

[source, java]
----
public void configureRecordInterceptor(KafkaMessageListenerContainer<Integer, String> container) {
CompositeRecordInterceptor compositeInterceptor;

RecordInterceptor<Integer, String> previousInterceptor = container.getRecordInterceptor();
if (previousInterceptor instanceof CompositeRecordInterceptor interceptor) {
compositeInterceptor = interceptor;
} else {
compositeInterceptor = new CompositeRecordInterceptor<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like just returned interceptor is not preserved in a new composite.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for that.
I added a new commit to fix that!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, look into my comment again .
that interceptor variable is out of use after this. I believe it has to be propagated to ctor of new composite .

container.setRecordInterceptor(compositeInterceptor);
}

if (previousInterceptor != null) {
compositeRecordInterceptor.addRecordInterceptor(previousInterceptor);
}

RecordInterceptor<Integer, String> recordInterceptor1 = new RecordInterceptor() {...};
RecordInterceptor<Integer, String> recordInterceptor2 = new RecordInterceptor() {...};

compositeInterceptor.addRecordInterceptor(recordInterceptor1);
compositeInterceptor.addRecordInterceptor(recordInterceptor2);
}
----

By default, starting with version 2.8, when using transactions, the interceptor is invoked before the transaction has started.
You can set the listener container's `interceptBeforeTx` property to `false` to invoke the interceptor after the transaction has started instead.
Starting with version 2.9, this will apply to any transaction manager, not just `KafkaAwareTransactionManager`+++s+++.
Expand Down Expand Up @@ -265,4 +294,3 @@ The listener containers implement `SmartLifecycle`, and `autoStartup` is `true`
The containers are started in a late phase (`Integer.MAX-VALUE - 100`).
Other components that implement `SmartLifecycle`, to handle data from listeners, should be started in an earlier phase.
The `- 100` leaves room for later phases to enable components to be auto-started after the containers.

Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,9 @@ For details, see xref:kafka/receiving-messages/rebalance-listeners.adoc#new-reba

The `DefaultKafkaHeaderMapper` and `SimpleKafkaHeaderMapper` support multi-value header mapping for Kafka records.
More details are available in xref:kafka/headers.adoc#multi-value-header[Support multi-value header mapping].

[[x40-add-record-interceptor]]
=== Configure additional `RecordInterceptor`

Listener containers now support interceptor customization via `getRecordInterceptor()`.
See the xref:kafka/receiving-messages/message-listener-container.adoc#message-listener-container[Message Listener Containers] section for details.
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,12 @@ public void setKafkaAdmin(KafkaAdmin kafkaAdmin) {
this.kafkaAdmin = kafkaAdmin;
}

protected @Nullable RecordInterceptor<K, V> getRecordInterceptor() {
/**
* Get the {@link RecordInterceptor} for modification, if configured.
* @return the {@link RecordInterceptor}, or {@code null} if not configured
* @since 4.0
*/
public @Nullable RecordInterceptor<K, V> getRecordInterceptor() {
return this.recordInterceptor;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
*
* @author Artem Bilan
* @author Gary Russell
* @author Sanghyeok An
* @since 2.3
*
*/
Expand Down Expand Up @@ -92,4 +93,13 @@ public void afterRecord(ConsumerRecord<K, V> record, Consumer<K, V> consumer) {
this.delegates.forEach(del -> del.afterRecord(record, consumer));
}

/**
* Add an {@link RecordInterceptor} to delegates.
* @param recordInterceptor the interceptor.
* @since 4.0
*/
public void addRecordInterceptor(RecordInterceptor<K, V> recordInterceptor) {
this.delegates.add(recordInterceptor);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3842,7 +3842,7 @@ public void onMessage(ConsumerRecord<Integer, String> data) {
containerProps.setClientId("clientId");

CountDownLatch afterLatch = new CountDownLatch(1);
RecordInterceptor<Integer, String> recordInterceptor = spy(new RecordInterceptor<Integer, String>() {
RecordInterceptor<Integer, String> recordInterceptor1 = spy(new RecordInterceptor<Integer, String>() {

@Override
public @NonNull ConsumerRecord<Integer, String> intercept(ConsumerRecord<Integer, String> record,
Expand All @@ -3858,25 +3858,54 @@ public void clearThreadState(Consumer<?, ?> consumer) {

});

RecordInterceptor<Integer, String> recordInterceptor2 = spy(new RecordInterceptor<Integer, String>() {

@Override
public @NonNull ConsumerRecord<Integer, String> intercept(ConsumerRecord<Integer, String> record,
Consumer<Integer, String> consumer) {

return record;
}

@Override
public void clearThreadState(Consumer<?, ?> consumer) {
afterLatch.countDown();
}

});

KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
container.setRecordInterceptor(recordInterceptor);
container.setRecordInterceptor(new CompositeRecordInterceptor<>());
if (container.getRecordInterceptor() instanceof CompositeRecordInterceptor<Integer, String> composite) {
composite.addRecordInterceptor(recordInterceptor1);
composite.addRecordInterceptor(recordInterceptor2);
}

container.start();
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(afterLatch.await(10, TimeUnit.SECONDS)).isTrue();

InOrder inOrder = inOrder(recordInterceptor, messageListener, consumer);
inOrder.verify(recordInterceptor).setupThreadState(eq(consumer));
InOrder inOrder = inOrder(recordInterceptor1, recordInterceptor2, messageListener, consumer);
inOrder.verify(recordInterceptor1).setupThreadState(eq(consumer));
inOrder.verify(recordInterceptor2).setupThreadState(eq(consumer));
inOrder.verify(consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
inOrder.verify(recordInterceptor).intercept(eq(firstRecord), eq(consumer));
inOrder.verify(recordInterceptor1).intercept(eq(firstRecord), eq(consumer));
inOrder.verify(recordInterceptor2).intercept(eq(firstRecord), eq(consumer));
inOrder.verify(messageListener).onMessage(eq(firstRecord));
inOrder.verify(recordInterceptor).success(eq(firstRecord), eq(consumer));
inOrder.verify(recordInterceptor).afterRecord(eq(firstRecord), eq(consumer));
inOrder.verify(recordInterceptor).intercept(eq(secondRecord), eq(consumer));
inOrder.verify(recordInterceptor1).success(eq(firstRecord), eq(consumer));
inOrder.verify(recordInterceptor2).success(eq(firstRecord), eq(consumer));
inOrder.verify(recordInterceptor1).afterRecord(eq(firstRecord), eq(consumer));
inOrder.verify(recordInterceptor2).afterRecord(eq(firstRecord), eq(consumer));
inOrder.verify(recordInterceptor1).intercept(eq(secondRecord), eq(consumer));
inOrder.verify(recordInterceptor2).intercept(eq(secondRecord), eq(consumer));
inOrder.verify(messageListener).onMessage(eq(secondRecord));
inOrder.verify(recordInterceptor).success(eq(secondRecord), eq(consumer));
inOrder.verify(recordInterceptor).afterRecord(eq(secondRecord), eq(consumer));
inOrder.verify(recordInterceptor).clearThreadState(eq(consumer));
inOrder.verify(recordInterceptor1).success(eq(secondRecord), eq(consumer));
inOrder.verify(recordInterceptor2).success(eq(secondRecord), eq(consumer));
inOrder.verify(recordInterceptor1).afterRecord(eq(secondRecord), eq(consumer));
inOrder.verify(recordInterceptor2).afterRecord(eq(secondRecord), eq(consumer));
inOrder.verify(recordInterceptor1).clearThreadState(eq(consumer));
inOrder.verify(recordInterceptor2).clearThreadState(eq(consumer));
container.stop();
}

Expand Down