Skip to content

Trace context is being reset in between retry attempts and dlt publication #3816

Open
@msatmarean18

Description

@msatmarean18

In what version(s) of Spring for Apache Kafka are you seeing this issue?
3.3.0

Describe the bug

I'm using RetryTopicConfiguration to configure the non blocking retries for my consumer.
After upgrading to Spring Boot 3.4.0 I noticed that with each retry attempt a new traceId is created. Also, the traceparent header on the Kafka message is accumulating a new id with each retry. I.e if it retries for 3 times, the message will have 3 traceparent headers.
This was not the case in previous versions of Spring for Apache Kafka.

Looking at the logs I think the trace context is being reset after the exception is thrown from @KafkaListenerannotated method.

3.2.8

[ntainer#0-0-C-1] [e05006e2c016aab51e0ea8fd9f856c3a-4e11f19167499c04] c.m.e.k.Kafka.Consumer    : Consuming message
[ntainer#0-0-C-1] [e05006e2c016aab51e0ea8fd9f856c3a-4e11f19167499c04] o.s.k.l.KafkaMessageListenerContainer    : Commit list: {}
[ntainer#0-0-C-1] [e05006e2c016aab51e0ea8fd9f856c3a-4e11f19167499c04] k.r.DeadLetterPublishingRecovererFactory : Resolved topic: completed.retry
[ntainer#0-0-C-1] [e05006e2c016aab51e0ea8fd9f856c3a-4e11f19167499c04] k.r.DeadLetterPublishingRecovererFactory : Record: topic = completed, partition = 0, offset = 58, main topic = completed threw an error at topic completed. Sending to retry topic completed.retry.

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2874)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2815)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2779)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.lambda$doInvokeRecordListener$53(KafkaMessageListenerContainer.java:2702)
.
.
.
[ntainer#0-0-C-1] [e05006e2c016aab51e0ea8fd9f856c3a-ac9768534aa47b94] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values:

3.3.0

[ntainer#0-0-C-1] [9fc4895742222ff70aec850c2cb4311d-f30073bf3bd3c48c] c.m.e.k.Kafka.Consumer    : Consuming message
[ntainer#0-0-C-1] [                                                 ] o.s.k.l.KafkaMessageListenerContainer    : Commit list: {}
[ntainer#0-0-C-1] [                                                 ] k.r.DeadLetterPublishingRecovererFactory : Resolved topic: completed.retry
[ntainer#0-0-C-1] [                                                 ] k.r.DeadLetterPublishingRecovererFactory : Record: topic = completed, partition = 0, offset = 47, main topic = completed threw an error at topic completed. Sending to retry topic completed.retry.

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2986)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2889)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2853)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2766)
.
.
.

[ntainer#0-0-C-1] [44c2097849a6467cfeaadddb4b32b9ae-b0ac20860d36c3ba] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 

To Reproduce

Create a RetryTopicConfiguration bean:

 @Bean
    public RetryTopicConfiguration kafkaRetryConfiguration(
            KafkaTemplate<Object, Object> kafkaTemplate) {

        return RetryTopicConfigurationBuilder
                .newInstance()
                .fixedBackOff(consumerRetryProperties.getBackoff())
                .useSingleTopicForSameIntervals()
                .maxAttempts(consumerRetryProperties.getAttempts())
                .includeTopics(consumerRetryProperties.getTopics())
                .retryTopicSuffix(consumerRetryProperties.getRetryTopicSuffix())
                .dltSuffix(consumerRetryProperties.getErrorTopicSuffix())
                .doNotRetryOnDltFailure()
                .retryOn(consumerRetryProperties.getRetryOn())
                .dltHandlerMethod(consumerRetryProperties.getDltBeanName(), consumerRetryProperties.getDltMethod())
                .doNotAutoCreateRetryTopics()
                .create(kafkaTemplate);
    }

Create a listener method.

    @KafkaListener(
            topics = {"${my.kafka.topics.completed}"},
            clientIdPrefix = "${my.kafka.client-id-prefix}"
    )

    public void listen(@Payload ConsumerRecord<String, PayloadModel> consumerRecord) {
// consumer logic
}
   

Throw an "Retryable" exception from the listen method.

Expected behavior

TraceId should be propagated with each retry attempt or dlt publish.

Sample

N/A

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions