Skip to content

In the destination topic chain, the type REUSABLE_RETRY_TOPIC can only be specified as the last retry topic #3834

Open
@FabioBentoLuiz

Description

@FabioBentoLuiz

In what version(s) of Spring for Apache Kafka are you seeing this issue?
3.3.4

Describe the bug

Routing of messages to custom DLTs based on thrown exceptions is not working. The listener bean initialization fails with the error:

java.lang.IllegalArgumentException: In the destination topic chain, the type REUSABLE_RETRY_TOPIC can only be specified as the last retry topic.

Stacktrace:

Caused by: java.lang.IllegalArgumentException: In the destination topic chain, the type REUSABLE_RETRY_TOPIC can only be specified as the last retry topic.
at org.springframework.util.Assert.isTrue(Assert.java:116)
at org.springframework.kafka.retrytopic.DefaultDestinationTopicResolver.validateDestinations(DefaultDestinationTopicResolver.java:256)
at org.springframework.kafka.retrytopic.DefaultDestinationTopicResolver.addDestinationTopics(DefaultDestinationTopicResolver.java:240)
at org.springframework.kafka.retrytopic.DefaultDestinationTopicProcessor.lambda$processRegisteredDestinations$1(DefaultDestinationTopicProcessor.java:64)
at java.base/java.util.HashMap$Values.forEach(HashMap.java:1073)
at org.springframework.kafka.retrytopic.DefaultDestinationTopicProcessor.processRegisteredDestinations(DefaultDestinationTopicProcessor.java:64)
at org.springframework.kafka.retrytopic.RetryTopicConfigurer.processMainAndRetryListeners(RetryTopicConfigurer.java:321)
at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.processMainAndRetryListeners(KafkaListenerAnnotationBeanPostProcessor.java:544)
at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.processMainAndRetryListeners(KafkaListenerAnnotationBeanPostProcessor.java:517)
at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.processMultiMethodListeners(KafkaListenerAnnotationBeanPostProcessor.java:486)
at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.postProcessAfterInitialization(KafkaListenerAnnotationBeanPostProcessor.java:418)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsAfterInitialization(AbstractAutowireCapableBeanFactory.java:439)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1815)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:601)
... 41 more

I see that at DefaultDestinationTopicResolver.java:256 an assertion happens:

private void validateDestinations(List<DestinationTopic> destinationsToAdd) {
	for (int i = 0; i < destinationsToAdd.size(); i++) {
		DestinationTopic destination = destinationsToAdd.get(i);
		if (destination.isReusableRetryTopic()) {
			Assert.isTrue((i == (destinationsToAdd.size() - 1) ||
					((i == (destinationsToAdd.size() - 2)) && (destinationsToAdd.get(i + 1).isDltTopic()))),
					String.format("In the destination topic chain, the type %s can only be "
							+ "specified as the last retry topic.", Type.REUSABLE_RETRY_TOPIC));
		}
	}
}

At this point, my destinationsToAdd (see demo application attached) looks like:

Pos Topic Type
0 test-topic MAIN
1 test-topic.retry REUSABLE_RETRY_TOPIC
2 test-topic.custom.dlt DLT
3 test-topic.dlt DLT

As the topic with type REUSABLE_RETRY_TOPIC is not the last or last but one followed by a DLT, the assertion fails.

To Reproduce

Configure a RetryableTopic and set the exceptionBasedDltRouting parameter:

@KafkaListener(topics = "test-topic", id = "test-listener", idIsGroup = false, groupId = "test-group")
@RetryableTopic(
        backoff = @Backoff(delay = 100),
        attempts = "10",
        autoCreateTopics = "false",
        autoStartDltHandler = "true",
        dltTopicSuffix = ".dlt",
        retryTopicSuffix = ".retry",
        exceptionBasedDltRouting = {
                @ExceptionBasedDltDestination(
                        suffix = ".custom",
                        exceptions = {MyCustomException.class}
                )}
)

Expected behavior

The listener bean is created without error and if the process fails multiple times due MyCustomException, the DLT containing the ".custom.dlt" suffix will be considered as the tarrget topic for the message before the general pupose DLT is considered.

Sample

demo1.zip

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions