Description
In what version(s) of Spring for Apache Kafka are you seeing this issue?
3.3.4
Describe the bug
Routing of messages to custom DLTs based on thrown exceptions is not working. The listener bean initialization fails with the error:
java.lang.IllegalArgumentException: In the destination topic chain, the type REUSABLE_RETRY_TOPIC can only be specified as the last retry topic.
Stacktrace:
Caused by: java.lang.IllegalArgumentException: In the destination topic chain, the type REUSABLE_RETRY_TOPIC can only be specified as the last retry topic.
at org.springframework.util.Assert.isTrue(Assert.java:116)
at org.springframework.kafka.retrytopic.DefaultDestinationTopicResolver.validateDestinations(DefaultDestinationTopicResolver.java:256)
at org.springframework.kafka.retrytopic.DefaultDestinationTopicResolver.addDestinationTopics(DefaultDestinationTopicResolver.java:240)
at org.springframework.kafka.retrytopic.DefaultDestinationTopicProcessor.lambda$processRegisteredDestinations$1(DefaultDestinationTopicProcessor.java:64)
at java.base/java.util.HashMap$Values.forEach(HashMap.java:1073)
at org.springframework.kafka.retrytopic.DefaultDestinationTopicProcessor.processRegisteredDestinations(DefaultDestinationTopicProcessor.java:64)
at org.springframework.kafka.retrytopic.RetryTopicConfigurer.processMainAndRetryListeners(RetryTopicConfigurer.java:321)
at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.processMainAndRetryListeners(KafkaListenerAnnotationBeanPostProcessor.java:544)
at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.processMainAndRetryListeners(KafkaListenerAnnotationBeanPostProcessor.java:517)
at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.processMultiMethodListeners(KafkaListenerAnnotationBeanPostProcessor.java:486)
at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.postProcessAfterInitialization(KafkaListenerAnnotationBeanPostProcessor.java:418)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsAfterInitialization(AbstractAutowireCapableBeanFactory.java:439)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1815)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:601)
... 41 more
I see that at DefaultDestinationTopicResolver.java:256 an assertion happens:
private void validateDestinations(List<DestinationTopic> destinationsToAdd) {
for (int i = 0; i < destinationsToAdd.size(); i++) {
DestinationTopic destination = destinationsToAdd.get(i);
if (destination.isReusableRetryTopic()) {
Assert.isTrue((i == (destinationsToAdd.size() - 1) ||
((i == (destinationsToAdd.size() - 2)) && (destinationsToAdd.get(i + 1).isDltTopic()))),
String.format("In the destination topic chain, the type %s can only be "
+ "specified as the last retry topic.", Type.REUSABLE_RETRY_TOPIC));
}
}
}
At this point, my destinationsToAdd
(see demo application attached) looks like:
Pos | Topic | Type |
---|---|---|
0 | test-topic | MAIN |
1 | test-topic.retry | REUSABLE_RETRY_TOPIC |
2 | test-topic.custom.dlt | DLT |
3 | test-topic.dlt | DLT |
As the topic with type REUSABLE_RETRY_TOPIC is not the last or last but one followed by a DLT, the assertion fails.
To Reproduce
Configure a RetryableTopic and set the exceptionBasedDltRouting parameter:
@KafkaListener(topics = "test-topic", id = "test-listener", idIsGroup = false, groupId = "test-group")
@RetryableTopic(
backoff = @Backoff(delay = 100),
attempts = "10",
autoCreateTopics = "false",
autoStartDltHandler = "true",
dltTopicSuffix = ".dlt",
retryTopicSuffix = ".retry",
exceptionBasedDltRouting = {
@ExceptionBasedDltDestination(
suffix = ".custom",
exceptions = {MyCustomException.class}
)}
)
Expected behavior
The listener bean is created without error and if the process fails multiple times due MyCustomException, the DLT containing the ".custom.dlt" suffix will be considered as the tarrget topic for the message before the general pupose DLT is considered.
Sample