Skip to content

Commit f44193e

Browse files
garyrussellartembilan
authored andcommitted
GH-1369: Fix Fenced Consumer-based Producers
Resolves #1369 1. `CloseSafeProducer.cache` was incorrectly null (direct access to `this.cache` instead of `getCache()` Prevented closure of a fenced `consumerProducer` 2. Don't call the `AfterRollbackProcessor` when a producer is fenced - the partitions will have already been revoked so seeking is inappropriate and starting a new transaction will fence the new (current) producer. **cherry-pick to 2.3.x if clean**
1 parent ba78721 commit f44193e

File tree

5 files changed

+122
-20
lines changed

5 files changed

+122
-20
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 the original author or authors.
2+
* Copyright 2016-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -402,15 +402,13 @@ public Producer<K, V> createProducer(@Nullable String txIdPrefixArg) {
402402
*/
403403
protected Producer<K, V> createKafkaProducer() {
404404
if (this.clientIdPrefix == null) {
405-
return new KafkaProducer<>(this.configs, this.keySerializerSupplier.get(),
406-
this.valueSerializerSupplier.get());
405+
return createRawProducer(this.configs);
407406
}
408407
else {
409408
Map<String, Object> newConfigs = new HashMap<>(this.configs);
410409
newConfigs.put(ProducerConfig.CLIENT_ID_CONFIG,
411410
this.clientIdPrefix + "-" + this.clientIdCounter.incrementAndGet());
412-
return new KafkaProducer<>(newConfigs, this.keySerializerSupplier.get(),
413-
this.valueSerializerSupplier.get());
411+
return createRawProducer(newConfigs);
414412
}
415413
}
416414

@@ -482,13 +480,16 @@ private CloseSafeProducer<K, V> doCreateTxProducer(String prefix, String suffix,
482480
newProducerConfigs.put(ProducerConfig.CLIENT_ID_CONFIG,
483481
this.clientIdPrefix + "-" + this.clientIdCounter.incrementAndGet());
484482
}
485-
newProducer = new KafkaProducer<>(newProducerConfigs, this.keySerializerSupplier
486-
.get(), this.valueSerializerSupplier.get());
483+
newProducer = createRawProducer(newProducerConfigs);
487484
newProducer.initTransactions();
488-
return new CloseSafeProducer<>(newProducer, this.cache.get(prefix), remover,
485+
return new CloseSafeProducer<>(newProducer, getCache(prefix), remover,
489486
(String) newProducerConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG));
490487
}
491488

489+
protected Producer<K, V> createRawProducer(Map<String, Object> configs) {
490+
return new KafkaProducer<>(configs, this.keySerializerSupplier.get(), this.valueSerializerSupplier.get());
491+
}
492+
492493
@Nullable
493494
protected BlockingQueue<CloseSafeProducer<K, V>> getCache() {
494495
return getCache(this.transactionIdPrefix);
@@ -573,6 +574,7 @@ protected static class CloseSafeProducer<K, V> implements Producer<K, V> {
573574
this.cache = cache;
574575
this.removeConsumerProducer = removeConsumerProducer;
575576
this.txId = txId;
577+
LOGGER.debug(() -> "Created new Producer: " + this);
576578
}
577579

578580
Producer<K, V> getDelegate() {
@@ -649,13 +651,19 @@ public void commitTransaction() throws ProducerFencedException {
649651
@Override
650652
public void abortTransaction() throws ProducerFencedException {
651653
LOGGER.debug(() -> toString() + " abortTransaction()");
652-
try {
653-
this.delegate.abortTransaction();
654+
if (this.txFailed != null) {
655+
LOGGER.debug(() -> "abortTransaction ignored - previous txFailed: " + this.txFailed.getMessage()
656+
+ ": " + this);
654657
}
655-
catch (RuntimeException e) {
656-
LOGGER.error(e, () -> "Abort failed: " + this);
657-
this.txFailed = e;
658-
throw e;
658+
else {
659+
try {
660+
this.delegate.abortTransaction();
661+
}
662+
catch (RuntimeException e) {
663+
LOGGER.error(e, () -> "Abort failed: " + this);
664+
this.txFailed = e;
665+
throw e;
666+
}
659667
}
660668
}
661669

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import org.apache.kafka.common.MetricName;
5555
import org.apache.kafka.common.TopicPartition;
5656
import org.apache.kafka.common.errors.AuthorizationException;
57+
import org.apache.kafka.common.errors.ProducerFencedException;
5758
import org.apache.kafka.common.errors.WakeupException;
5859

5960
import org.springframework.context.ApplicationContext;
@@ -1299,6 +1300,9 @@ public void doInTransactionWithoutResult(TransactionStatus s) {
12991300
}
13001301
});
13011302
}
1303+
catch (ProducerFencedException e) {
1304+
this.logger.error(e, "Producer fenced during transaction");
1305+
}
13021306
catch (RuntimeException e) {
13031307
this.logger.error(e, "Transaction rolled back");
13041308
AfterRollbackProcessor<K, V> afterRollbackProcessorToUse =
@@ -1542,6 +1546,9 @@ public void doInTransactionWithoutResult(TransactionStatus s) {
15421546

15431547
});
15441548
}
1549+
catch (ProducerFencedException e) {
1550+
this.logger.error(e, "Producer fenced during transaction");
1551+
}
15451552
catch (RuntimeException e) {
15461553
this.logger.error(e, "Transaction rolled back");
15471554
recordAfterRollback(iterator, record, e);

spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.HashMap;
2424
import java.util.Map;
2525
import java.util.Properties;
26+
import java.util.Queue;
2627
import java.util.concurrent.CountDownLatch;
2728
import java.util.concurrent.TimeUnit;
2829
import java.util.stream.Collectors;
@@ -363,9 +364,9 @@ public void testContainerTxProducerIsNotCached() throws Exception {
363364
try {
364365
ListenableFuture<SendResult<Integer, String>> future = template.send("txCache2", "foo");
365366
future.get(10, TimeUnit.SECONDS);
366-
assertThat(KafkaTestUtils.getPropertyValue(pf, "cache", Map.class)).hasSize(0);
367367
assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue();
368-
assertThat(KafkaTestUtils.getPropertyValue(pfTx, "cache", Map.class)).hasSize(0);
368+
assertThat(KafkaTestUtils.getPropertyValue(pfTx, "cache", Map.class)).hasSize(1);
369+
assertThat((Queue) KafkaTestUtils.getPropertyValue(pfTx, "cache", Map.class).get("fooTx.")).hasSize(0);
369370
}
370371
finally {
371372
container.stop();

spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaProducerFactoryTests.java

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@
1717
package org.springframework.kafka.core;
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
2021
import static org.mockito.ArgumentMatchers.any;
2122
import static org.mockito.BDDMockito.willAnswer;
23+
import static org.mockito.BDDMockito.willThrow;
2224
import static org.mockito.Mockito.inOrder;
2325
import static org.mockito.Mockito.mock;
2426
import static org.mockito.Mockito.verify;
@@ -32,11 +34,13 @@
3234

3335
import org.apache.kafka.clients.producer.Producer;
3436
import org.apache.kafka.common.KafkaException;
37+
import org.apache.kafka.common.errors.ProducerFencedException;
3538
import org.junit.jupiter.api.Test;
3639
import org.mockito.InOrder;
3740

3841
import org.springframework.context.ApplicationContext;
3942
import org.springframework.context.event.ContextStoppedEvent;
43+
import org.springframework.kafka.support.TransactionSupport;
4044
import org.springframework.kafka.test.utils.KafkaTestUtils;
4145
import org.springframework.kafka.transaction.KafkaTransactionManager;
4246
import org.springframework.transaction.CannotCreateTransactionException;
@@ -51,7 +55,7 @@ public class DefaultKafkaProducerFactoryTests {
5155

5256
@SuppressWarnings({ "rawtypes", "unchecked" })
5357
@Test
54-
public void testProducerClosedAfterBadTransition() throws Exception {
58+
void testProducerClosedAfterBadTransition() throws Exception {
5559
final Producer producer = mock(Producer.class);
5660
DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory(new HashMap<>()) {
5761

@@ -108,7 +112,7 @@ protected Producer createTransactionalProducer(String txIdPrefix) {
108112

109113
@Test
110114
@SuppressWarnings({ "rawtypes", "unchecked" })
111-
public void testResetSingle() {
115+
void testResetSingle() {
112116
final Producer producer = mock(Producer.class);
113117
ProducerFactory pf = new DefaultKafkaProducerFactory(new HashMap<>()) {
114118

@@ -131,7 +135,7 @@ protected Producer createKafkaProducer() {
131135

132136
@Test
133137
@SuppressWarnings({ "rawtypes", "unchecked" })
134-
public void testResetTx() throws Exception {
138+
void testResetTx() throws Exception {
135139
final Producer producer = mock(Producer.class);
136140
ApplicationContext ctx = mock(ApplicationContext.class);
137141
DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory(new HashMap<>()) {
@@ -162,7 +166,7 @@ protected Producer createTransactionalProducer(String txIdPrefix) {
162166

163167
@Test
164168
@SuppressWarnings({ "rawtypes", "unchecked" })
165-
public void testThreadLocal() {
169+
void testThreadLocal() {
166170
final Producer producer = mock(Producer.class);
167171
DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory(new HashMap<>()) {
168172

@@ -190,4 +194,28 @@ protected Producer createKafkaProducer() {
190194
verify(producer).close(any(Duration.class));
191195
}
192196

197+
@Test
198+
@SuppressWarnings({ "rawtypes", "unchecked" })
199+
void testCleanUpAfterTxFence() {
200+
final Producer producer = mock(Producer.class);
201+
DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory(new HashMap<>()) {
202+
203+
@Override
204+
protected Producer createRawProducer(Map configs) {
205+
return producer;
206+
}
207+
208+
};
209+
pf.setTransactionIdPrefix("tx.");
210+
TransactionSupport.setTransactionIdSuffix("suffix");
211+
Producer aProducer = pf.createProducer();
212+
assertThat(KafkaTestUtils.getPropertyValue(pf, "consumerProducers", Map.class)).hasSize(1);
213+
assertThat(aProducer).isNotNull();
214+
assertThat(KafkaTestUtils.getPropertyValue(aProducer, "cache")).isNotNull();
215+
willThrow(new ProducerFencedException("test")).given(producer).beginTransaction();
216+
assertThatExceptionOfType(ProducerFencedException.class).isThrownBy(() -> aProducer.beginTransaction());
217+
aProducer.close();
218+
assertThat(KafkaTestUtils.getPropertyValue(pf, "consumerProducers", Map.class)).hasSize(0);
219+
}
220+
193221
}

spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import static org.mockito.BDDMockito.given;
2727
import static org.mockito.BDDMockito.willAnswer;
2828
import static org.mockito.BDDMockito.willReturn;
29+
import static org.mockito.BDDMockito.willThrow;
2930
import static org.mockito.Mockito.inOrder;
3031
import static org.mockito.Mockito.mock;
3132
import static org.mockito.Mockito.never;
@@ -60,6 +61,7 @@
6061
import org.apache.kafka.clients.producer.ProducerConfig;
6162
import org.apache.kafka.clients.producer.ProducerRecord;
6263
import org.apache.kafka.common.TopicPartition;
64+
import org.apache.kafka.common.errors.ProducerFencedException;
6365
import org.apache.kafka.common.header.internals.RecordHeader;
6466
import org.apache.kafka.common.header.internals.RecordHeaders;
6567
import org.junit.jupiter.api.BeforeAll;
@@ -669,6 +671,62 @@ public void testRollbackProcessorCrash() throws Exception {
669671
logger.info("Stop testRollbackNoRetries");
670672
}
671673

674+
@SuppressWarnings({ "rawtypes", "unchecked" })
675+
@Test
676+
void testNoAfterRollbackWhenFenced() throws Exception {
677+
Consumer consumer = mock(Consumer.class);
678+
final TopicPartition topicPartition0 = new TopicPartition("foo", 0);
679+
final TopicPartition topicPartition1 = new TopicPartition("foo", 1);
680+
Map<TopicPartition, List<ConsumerRecord<String, String>>> recordMap = new HashMap<>();
681+
recordMap.put(topicPartition0, Collections.singletonList(new ConsumerRecord<>("foo", 0, 0, "key", "value")));
682+
recordMap.put(topicPartition1, Collections.singletonList(new ConsumerRecord<>("foo", 1, 0, "key", "value")));
683+
ConsumerRecords records = new ConsumerRecords(recordMap);
684+
final AtomicBoolean done = new AtomicBoolean();
685+
willAnswer(i -> {
686+
if (done.compareAndSet(false, true)) {
687+
return records;
688+
}
689+
else {
690+
Thread.sleep(500);
691+
return null;
692+
}
693+
}).given(consumer).poll(any(Duration.class));
694+
ConsumerFactory cf = mock(ConsumerFactory.class);
695+
willReturn(consumer).given(cf).createConsumer("group", "", null, KafkaTestUtils.defaultPropertyOverrides());
696+
Producer producer = mock(Producer.class);
697+
final CountDownLatch closeLatch = new CountDownLatch(1);
698+
willAnswer(i -> {
699+
closeLatch.countDown();
700+
return null;
701+
}).given(producer).close(any());
702+
willThrow(new ProducerFencedException("test")).given(producer).commitTransaction();
703+
ProducerFactory pf = mock(ProducerFactory.class);
704+
given(pf.transactionCapable()).willReturn(true);
705+
given(pf.createProducer(isNull())).willReturn(producer);
706+
KafkaTransactionManager tm = new KafkaTransactionManager(pf);
707+
ContainerProperties props = new ContainerProperties(new TopicPartitionOffset("foo", 0),
708+
new TopicPartitionOffset("foo", 1));
709+
props.setGroupId("group");
710+
props.setTransactionManager(tm);
711+
props.setMessageListener((MessageListener) m -> {
712+
});
713+
KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(cf, props);
714+
AfterRollbackProcessor arp = mock(AfterRollbackProcessor.class);
715+
given(arp.isProcessInTransaction()).willReturn(true);
716+
container.setAfterRollbackProcessor(arp);
717+
container.setBeanName("rollback");
718+
container.start();
719+
assertThat(closeLatch.await(10, TimeUnit.SECONDS)).isTrue();
720+
InOrder inOrder = inOrder(producer);
721+
inOrder.verify(producer).beginTransaction();
722+
inOrder.verify(producer).commitTransaction();
723+
inOrder.verify(producer).close(any());
724+
725+
verify(arp, never()).process(any(), any(), any(), anyBoolean());
726+
727+
container.stop();
728+
}
729+
672730
@SuppressWarnings("serial")
673731
public static class SomeOtherTransactionManager extends AbstractPlatformTransactionManager {
674732

0 commit comments

Comments
 (0)