Description
In what version(s) of Spring for Apache Kafka are you seeing this issue?
For example:
2.8.11 (but see in the currrent master as well)
Describe the bug
In DefaultAfterRollbackProcessor
there's isProcessInTransaction()
flag described as:
/**
* Return true to invoke
* {@link #process(List, Consumer, MessageListenerContainer, Exception, boolean, ContainerProperties.EOSMode)}
* in a new transaction. Because the container cannot infer the desired behavior, the
* processor is responsible for sending the offset to the transaction if it decides to
* skip the failing record.
* @return true to run in a transaction; default false.
* @since 2.2.5
* @see #process(List, Consumer, MessageListenerContainer, Exception, boolean,
* ContainerProperties.EOSMode)
*/
But this is not considered in process()
method which is a part of the public interface of this class and one can expect to work. So, if you call this method yourself, in case you use isCommitRecovered()
flag plus use transactional kafka template, you will end up with "no transaction" exception. This flag is only considered by default from org.springframework.kafka.listener.KafkaMessageListenerContainer.ListenerConsumer#recordAfterRollback
.
To Reproduce
With transactional KafkaTemplate
set isCommitRecovered()
to true and try to execute process()
method yourself. This end with a "no transaction" exception.
Expected behavior
The above invocation should be wrapped properly into transaction.