Description
In what version(s) of Spring for Apache Kafka are you seeing this issue?
3.3.0
Describe the bug
I'm using RetryTopicConfiguration
to configure the non blocking retries for my consumer.
After upgrading to Spring Boot 3.4.0 I noticed that with each retry attempt a new traceId is created. Also, the traceparent
header on the Kafka message is accumulating a new id with each retry. I.e if it retries for 3 times, the message will have 3 traceparent
headers.
This was not the case in previous versions of Spring for Apache Kafka.
Looking at the logs I think the trace context is being reset after the exception is thrown from @KafkaListener
annotated method.
3.2.8
[ntainer#0-0-C-1] [e05006e2c016aab51e0ea8fd9f856c3a-4e11f19167499c04] c.m.e.k.Kafka.Consumer : Consuming message
[ntainer#0-0-C-1] [e05006e2c016aab51e0ea8fd9f856c3a-4e11f19167499c04] o.s.k.l.KafkaMessageListenerContainer : Commit list: {}
[ntainer#0-0-C-1] [e05006e2c016aab51e0ea8fd9f856c3a-4e11f19167499c04] k.r.DeadLetterPublishingRecovererFactory : Resolved topic: completed.retry
[ntainer#0-0-C-1] [e05006e2c016aab51e0ea8fd9f856c3a-4e11f19167499c04] k.r.DeadLetterPublishingRecovererFactory : Record: topic = completed, partition = 0, offset = 58, main topic = completed threw an error at topic completed. Sending to retry topic completed.retry.
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2874)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2815)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2779)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.lambda$doInvokeRecordListener$53(KafkaMessageListenerContainer.java:2702)
.
.
.
[ntainer#0-0-C-1] [e05006e2c016aab51e0ea8fd9f856c3a-ac9768534aa47b94] o.a.k.clients.producer.ProducerConfig : ProducerConfig values:
3.3.0
[ntainer#0-0-C-1] [9fc4895742222ff70aec850c2cb4311d-f30073bf3bd3c48c] c.m.e.k.Kafka.Consumer : Consuming message
[ntainer#0-0-C-1] [ ] o.s.k.l.KafkaMessageListenerContainer : Commit list: {}
[ntainer#0-0-C-1] [ ] k.r.DeadLetterPublishingRecovererFactory : Resolved topic: completed.retry
[ntainer#0-0-C-1] [ ] k.r.DeadLetterPublishingRecovererFactory : Record: topic = completed, partition = 0, offset = 47, main topic = completed threw an error at topic completed. Sending to retry topic completed.retry.
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2986)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2889)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2853)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2766)
.
.
.
[ntainer#0-0-C-1] [44c2097849a6467cfeaadddb4b32b9ae-b0ac20860d36c3ba] o.a.k.clients.producer.ProducerConfig : ProducerConfig values:
To Reproduce
Create a RetryTopicConfiguration bean:
@Bean
public RetryTopicConfiguration kafkaRetryConfiguration(
KafkaTemplate<Object, Object> kafkaTemplate) {
return RetryTopicConfigurationBuilder
.newInstance()
.fixedBackOff(consumerRetryProperties.getBackoff())
.useSingleTopicForSameIntervals()
.maxAttempts(consumerRetryProperties.getAttempts())
.includeTopics(consumerRetryProperties.getTopics())
.retryTopicSuffix(consumerRetryProperties.getRetryTopicSuffix())
.dltSuffix(consumerRetryProperties.getErrorTopicSuffix())
.doNotRetryOnDltFailure()
.retryOn(consumerRetryProperties.getRetryOn())
.dltHandlerMethod(consumerRetryProperties.getDltBeanName(), consumerRetryProperties.getDltMethod())
.doNotAutoCreateRetryTopics()
.create(kafkaTemplate);
}
Create a listener method.
@KafkaListener(
topics = {"${my.kafka.topics.completed}"},
clientIdPrefix = "${my.kafka.client-id-prefix}"
)
public void listen(@Payload ConsumerRecord<String, PayloadModel> consumerRecord) {
// consumer logic
}
Throw an "Retryable" exception from the listen method.
Expected behavior
TraceId should be propagated with each retry attempt or dlt publish.
Sample
N/A