diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java index bb2fe94ecf..953b27f508 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 the original author or authors. + * Copyright 2016-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -402,15 +402,13 @@ public Producer createProducer(@Nullable String txIdPrefixArg) { */ protected Producer createKafkaProducer() { if (this.clientIdPrefix == null) { - return new KafkaProducer<>(this.configs, this.keySerializerSupplier.get(), - this.valueSerializerSupplier.get()); + return createRawProducer(this.configs); } else { Map newConfigs = new HashMap<>(this.configs); newConfigs.put(ProducerConfig.CLIENT_ID_CONFIG, this.clientIdPrefix + "-" + this.clientIdCounter.incrementAndGet()); - return new KafkaProducer<>(newConfigs, this.keySerializerSupplier.get(), - this.valueSerializerSupplier.get()); + return createRawProducer(newConfigs); } } @@ -482,13 +480,16 @@ private CloseSafeProducer doCreateTxProducer(String prefix, String suffix, newProducerConfigs.put(ProducerConfig.CLIENT_ID_CONFIG, this.clientIdPrefix + "-" + this.clientIdCounter.incrementAndGet()); } - newProducer = new KafkaProducer<>(newProducerConfigs, this.keySerializerSupplier - .get(), this.valueSerializerSupplier.get()); + newProducer = createRawProducer(newProducerConfigs); newProducer.initTransactions(); - return new CloseSafeProducer<>(newProducer, this.cache.get(prefix), remover, + return new CloseSafeProducer<>(newProducer, getCache(prefix), remover, (String) newProducerConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG)); } + protected Producer createRawProducer(Map configs) { + return new KafkaProducer<>(configs, this.keySerializerSupplier.get(), this.valueSerializerSupplier.get()); + } + @Nullable protected BlockingQueue> getCache() { return getCache(this.transactionIdPrefix); @@ -573,6 +574,7 @@ protected static class CloseSafeProducer implements Producer { this.cache = cache; this.removeConsumerProducer = removeConsumerProducer; this.txId = txId; + LOGGER.debug(() -> "Created new Producer: " + this); } Producer getDelegate() { @@ -649,13 +651,19 @@ public void commitTransaction() throws ProducerFencedException { @Override public void abortTransaction() throws ProducerFencedException { LOGGER.debug(() -> toString() + " abortTransaction()"); - try { - this.delegate.abortTransaction(); + if (this.txFailed != null) { + LOGGER.debug(() -> "abortTransaction ignored - previous txFailed: " + this.txFailed.getMessage() + + ": " + this); } - catch (RuntimeException e) { - LOGGER.error(e, () -> "Abort failed: " + this); - this.txFailed = e; - throw e; + else { + try { + this.delegate.abortTransaction(); + } + catch (RuntimeException e) { + LOGGER.error(e, () -> "Abort failed: " + this); + this.txFailed = e; + throw e; + } } } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 50ed7719b6..c13498ec6e 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -54,6 +54,7 @@ import org.apache.kafka.common.MetricName; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.AuthorizationException; +import org.apache.kafka.common.errors.ProducerFencedException; import org.apache.kafka.common.errors.WakeupException; import org.springframework.context.ApplicationContext; @@ -1299,6 +1300,9 @@ public void doInTransactionWithoutResult(TransactionStatus s) { } }); } + catch (ProducerFencedException e) { + this.logger.error(e, "Producer fenced during transaction"); + } catch (RuntimeException e) { this.logger.error(e, "Transaction rolled back"); AfterRollbackProcessor afterRollbackProcessorToUse = @@ -1542,6 +1546,9 @@ public void doInTransactionWithoutResult(TransactionStatus s) { }); } + catch (ProducerFencedException e) { + this.logger.error(e, "Producer fenced during transaction"); + } catch (RuntimeException e) { this.logger.error(e, "Transaction rolled back"); recordAfterRollback(iterator, record, e); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java index 524f0cf56e..b7a05838cc 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java @@ -23,6 +23,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Properties; +import java.util.Queue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -363,9 +364,9 @@ public void testContainerTxProducerIsNotCached() throws Exception { try { ListenableFuture> future = template.send("txCache2", "foo"); future.get(10, TimeUnit.SECONDS); - assertThat(KafkaTestUtils.getPropertyValue(pf, "cache", Map.class)).hasSize(0); assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue(); - assertThat(KafkaTestUtils.getPropertyValue(pfTx, "cache", Map.class)).hasSize(0); + assertThat(KafkaTestUtils.getPropertyValue(pfTx, "cache", Map.class)).hasSize(1); + assertThat((Queue) KafkaTestUtils.getPropertyValue(pfTx, "cache", Map.class).get("fooTx.")).hasSize(0); } finally { container.stop(); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaProducerFactoryTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaProducerFactoryTests.java index ee3d07fe8f..ab401cd410 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaProducerFactoryTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaProducerFactoryTests.java @@ -17,8 +17,10 @@ package org.springframework.kafka.core; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; import static org.mockito.ArgumentMatchers.any; import static org.mockito.BDDMockito.willAnswer; +import static org.mockito.BDDMockito.willThrow; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -32,11 +34,13 @@ import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.ProducerFencedException; import org.junit.jupiter.api.Test; import org.mockito.InOrder; import org.springframework.context.ApplicationContext; import org.springframework.context.event.ContextStoppedEvent; +import org.springframework.kafka.support.TransactionSupport; import org.springframework.kafka.test.utils.KafkaTestUtils; import org.springframework.kafka.transaction.KafkaTransactionManager; import org.springframework.transaction.CannotCreateTransactionException; @@ -51,7 +55,7 @@ public class DefaultKafkaProducerFactoryTests { @SuppressWarnings({ "rawtypes", "unchecked" }) @Test - public void testProducerClosedAfterBadTransition() throws Exception { + void testProducerClosedAfterBadTransition() throws Exception { final Producer producer = mock(Producer.class); DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory(new HashMap<>()) { @@ -108,7 +112,7 @@ protected Producer createTransactionalProducer(String txIdPrefix) { @Test @SuppressWarnings({ "rawtypes", "unchecked" }) - public void testResetSingle() { + void testResetSingle() { final Producer producer = mock(Producer.class); ProducerFactory pf = new DefaultKafkaProducerFactory(new HashMap<>()) { @@ -131,7 +135,7 @@ protected Producer createKafkaProducer() { @Test @SuppressWarnings({ "rawtypes", "unchecked" }) - public void testResetTx() throws Exception { + void testResetTx() throws Exception { final Producer producer = mock(Producer.class); ApplicationContext ctx = mock(ApplicationContext.class); DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory(new HashMap<>()) { @@ -162,7 +166,7 @@ protected Producer createTransactionalProducer(String txIdPrefix) { @Test @SuppressWarnings({ "rawtypes", "unchecked" }) - public void testThreadLocal() { + void testThreadLocal() { final Producer producer = mock(Producer.class); DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory(new HashMap<>()) { @@ -190,4 +194,28 @@ protected Producer createKafkaProducer() { verify(producer).close(any(Duration.class)); } + @Test + @SuppressWarnings({ "rawtypes", "unchecked" }) + void testCleanUpAfterTxFence() { + final Producer producer = mock(Producer.class); + DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory(new HashMap<>()) { + + @Override + protected Producer createRawProducer(Map configs) { + return producer; + } + + }; + pf.setTransactionIdPrefix("tx."); + TransactionSupport.setTransactionIdSuffix("suffix"); + Producer aProducer = pf.createProducer(); + assertThat(KafkaTestUtils.getPropertyValue(pf, "consumerProducers", Map.class)).hasSize(1); + assertThat(aProducer).isNotNull(); + assertThat(KafkaTestUtils.getPropertyValue(aProducer, "cache")).isNotNull(); + willThrow(new ProducerFencedException("test")).given(producer).beginTransaction(); + assertThatExceptionOfType(ProducerFencedException.class).isThrownBy(() -> aProducer.beginTransaction()); + aProducer.close(); + assertThat(KafkaTestUtils.getPropertyValue(pf, "consumerProducers", Map.class)).hasSize(0); + } + } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java index 823c402e8d..ae70edc9a0 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java @@ -26,6 +26,7 @@ import static org.mockito.BDDMockito.given; import static org.mockito.BDDMockito.willAnswer; import static org.mockito.BDDMockito.willReturn; +import static org.mockito.BDDMockito.willThrow; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -60,6 +61,7 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.ProducerFencedException; import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.header.internals.RecordHeaders; import org.junit.jupiter.api.BeforeAll; @@ -669,6 +671,62 @@ public void testRollbackProcessorCrash() throws Exception { logger.info("Stop testRollbackNoRetries"); } + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + void testNoAfterRollbackWhenFenced() throws Exception { + Consumer consumer = mock(Consumer.class); + final TopicPartition topicPartition0 = new TopicPartition("foo", 0); + final TopicPartition topicPartition1 = new TopicPartition("foo", 1); + Map>> recordMap = new HashMap<>(); + recordMap.put(topicPartition0, Collections.singletonList(new ConsumerRecord<>("foo", 0, 0, "key", "value"))); + recordMap.put(topicPartition1, Collections.singletonList(new ConsumerRecord<>("foo", 1, 0, "key", "value"))); + ConsumerRecords records = new ConsumerRecords(recordMap); + final AtomicBoolean done = new AtomicBoolean(); + willAnswer(i -> { + if (done.compareAndSet(false, true)) { + return records; + } + else { + Thread.sleep(500); + return null; + } + }).given(consumer).poll(any(Duration.class)); + ConsumerFactory cf = mock(ConsumerFactory.class); + willReturn(consumer).given(cf).createConsumer("group", "", null, KafkaTestUtils.defaultPropertyOverrides()); + Producer producer = mock(Producer.class); + final CountDownLatch closeLatch = new CountDownLatch(1); + willAnswer(i -> { + closeLatch.countDown(); + return null; + }).given(producer).close(any()); + willThrow(new ProducerFencedException("test")).given(producer).commitTransaction(); + ProducerFactory pf = mock(ProducerFactory.class); + given(pf.transactionCapable()).willReturn(true); + given(pf.createProducer(isNull())).willReturn(producer); + KafkaTransactionManager tm = new KafkaTransactionManager(pf); + ContainerProperties props = new ContainerProperties(new TopicPartitionOffset("foo", 0), + new TopicPartitionOffset("foo", 1)); + props.setGroupId("group"); + props.setTransactionManager(tm); + props.setMessageListener((MessageListener) m -> { + }); + KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(cf, props); + AfterRollbackProcessor arp = mock(AfterRollbackProcessor.class); + given(arp.isProcessInTransaction()).willReturn(true); + container.setAfterRollbackProcessor(arp); + container.setBeanName("rollback"); + container.start(); + assertThat(closeLatch.await(10, TimeUnit.SECONDS)).isTrue(); + InOrder inOrder = inOrder(producer); + inOrder.verify(producer).beginTransaction(); + inOrder.verify(producer).commitTransaction(); + inOrder.verify(producer).close(any()); + + verify(arp, never()).process(any(), any(), any(), anyBoolean()); + + container.stop(); + } + @SuppressWarnings("serial") public static class SomeOtherTransactionManager extends AbstractPlatformTransactionManager {