Skip to content

GH-1369: Fix Fenced Consumer-based Producers #1378

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2019 the original author or authors.
* Copyright 2016-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -402,15 +402,13 @@ public Producer<K, V> createProducer(@Nullable String txIdPrefixArg) {
*/
protected Producer<K, V> createKafkaProducer() {
if (this.clientIdPrefix == null) {
return new KafkaProducer<>(this.configs, this.keySerializerSupplier.get(),
this.valueSerializerSupplier.get());
return createRawProducer(this.configs);
}
else {
Map<String, Object> newConfigs = new HashMap<>(this.configs);
newConfigs.put(ProducerConfig.CLIENT_ID_CONFIG,
this.clientIdPrefix + "-" + this.clientIdCounter.incrementAndGet());
return new KafkaProducer<>(newConfigs, this.keySerializerSupplier.get(),
this.valueSerializerSupplier.get());
return createRawProducer(newConfigs);
}
}

Expand Down Expand Up @@ -482,13 +480,16 @@ private CloseSafeProducer<K, V> doCreateTxProducer(String prefix, String suffix,
newProducerConfigs.put(ProducerConfig.CLIENT_ID_CONFIG,
this.clientIdPrefix + "-" + this.clientIdCounter.incrementAndGet());
}
newProducer = new KafkaProducer<>(newProducerConfigs, this.keySerializerSupplier
.get(), this.valueSerializerSupplier.get());
newProducer = createRawProducer(newProducerConfigs);
newProducer.initTransactions();
return new CloseSafeProducer<>(newProducer, this.cache.get(prefix), remover,
return new CloseSafeProducer<>(newProducer, getCache(prefix), remover,
(String) newProducerConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG));
}

protected Producer<K, V> createRawProducer(Map<String, Object> configs) {
return new KafkaProducer<>(configs, this.keySerializerSupplier.get(), this.valueSerializerSupplier.get());
}

@Nullable
protected BlockingQueue<CloseSafeProducer<K, V>> getCache() {
return getCache(this.transactionIdPrefix);
Expand Down Expand Up @@ -573,6 +574,7 @@ protected static class CloseSafeProducer<K, V> implements Producer<K, V> {
this.cache = cache;
this.removeConsumerProducer = removeConsumerProducer;
this.txId = txId;
LOGGER.debug(() -> "Created new Producer: " + this);
}

Producer<K, V> getDelegate() {
Expand Down Expand Up @@ -649,13 +651,19 @@ public void commitTransaction() throws ProducerFencedException {
@Override
public void abortTransaction() throws ProducerFencedException {
LOGGER.debug(() -> toString() + " abortTransaction()");
try {
this.delegate.abortTransaction();
if (this.txFailed != null) {
LOGGER.debug(() -> "abortTransaction ignored - previous txFailed: " + this.txFailed.getMessage()
+ ": " + this);
}
catch (RuntimeException e) {
LOGGER.error(e, () -> "Abort failed: " + this);
this.txFailed = e;
throw e;
else {
try {
this.delegate.abortTransaction();
}
catch (RuntimeException e) {
LOGGER.error(e, () -> "Abort failed: " + this);
this.txFailed = e;
throw e;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.WakeupException;

import org.springframework.context.ApplicationContext;
Expand Down Expand Up @@ -1299,6 +1300,9 @@ public void doInTransactionWithoutResult(TransactionStatus s) {
}
});
}
catch (ProducerFencedException e) {
this.logger.error(e, "Producer fenced during transaction");
}
catch (RuntimeException e) {
this.logger.error(e, "Transaction rolled back");
AfterRollbackProcessor<K, V> afterRollbackProcessorToUse =
Expand Down Expand Up @@ -1542,6 +1546,9 @@ public void doInTransactionWithoutResult(TransactionStatus s) {

});
}
catch (ProducerFencedException e) {
this.logger.error(e, "Producer fenced during transaction");
}
catch (RuntimeException e) {
this.logger.error(e, "Transaction rolled back");
recordAfterRollback(iterator, record, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -363,9 +364,9 @@ public void testContainerTxProducerIsNotCached() throws Exception {
try {
ListenableFuture<SendResult<Integer, String>> future = template.send("txCache2", "foo");
future.get(10, TimeUnit.SECONDS);
assertThat(KafkaTestUtils.getPropertyValue(pf, "cache", Map.class)).hasSize(0);
assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue();
assertThat(KafkaTestUtils.getPropertyValue(pfTx, "cache", Map.class)).hasSize(0);
assertThat(KafkaTestUtils.getPropertyValue(pfTx, "cache", Map.class)).hasSize(1);
assertThat((Queue) KafkaTestUtils.getPropertyValue(pfTx, "cache", Map.class).get("fooTx.")).hasSize(0);
}
finally {
container.stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
package org.springframework.kafka.core;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.BDDMockito.willAnswer;
import static org.mockito.BDDMockito.willThrow;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
Expand All @@ -32,11 +34,13 @@

import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.junit.jupiter.api.Test;
import org.mockito.InOrder;

import org.springframework.context.ApplicationContext;
import org.springframework.context.event.ContextStoppedEvent;
import org.springframework.kafka.support.TransactionSupport;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.kafka.transaction.KafkaTransactionManager;
import org.springframework.transaction.CannotCreateTransactionException;
Expand All @@ -51,7 +55,7 @@ public class DefaultKafkaProducerFactoryTests {

@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void testProducerClosedAfterBadTransition() throws Exception {
void testProducerClosedAfterBadTransition() throws Exception {
final Producer producer = mock(Producer.class);
DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory(new HashMap<>()) {

Expand Down Expand Up @@ -108,7 +112,7 @@ protected Producer createTransactionalProducer(String txIdPrefix) {

@Test
@SuppressWarnings({ "rawtypes", "unchecked" })
public void testResetSingle() {
void testResetSingle() {
final Producer producer = mock(Producer.class);
ProducerFactory pf = new DefaultKafkaProducerFactory(new HashMap<>()) {

Expand All @@ -131,7 +135,7 @@ protected Producer createKafkaProducer() {

@Test
@SuppressWarnings({ "rawtypes", "unchecked" })
public void testResetTx() throws Exception {
void testResetTx() throws Exception {
final Producer producer = mock(Producer.class);
ApplicationContext ctx = mock(ApplicationContext.class);
DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory(new HashMap<>()) {
Expand Down Expand Up @@ -162,7 +166,7 @@ protected Producer createTransactionalProducer(String txIdPrefix) {

@Test
@SuppressWarnings({ "rawtypes", "unchecked" })
public void testThreadLocal() {
void testThreadLocal() {
final Producer producer = mock(Producer.class);
DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory(new HashMap<>()) {

Expand Down Expand Up @@ -190,4 +194,28 @@ protected Producer createKafkaProducer() {
verify(producer).close(any(Duration.class));
}

@Test
@SuppressWarnings({ "rawtypes", "unchecked" })
void testCleanUpAfterTxFence() {
final Producer producer = mock(Producer.class);
DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory(new HashMap<>()) {

@Override
protected Producer createRawProducer(Map configs) {
return producer;
}

};
pf.setTransactionIdPrefix("tx.");
TransactionSupport.setTransactionIdSuffix("suffix");
Producer aProducer = pf.createProducer();
assertThat(KafkaTestUtils.getPropertyValue(pf, "consumerProducers", Map.class)).hasSize(1);
assertThat(aProducer).isNotNull();
assertThat(KafkaTestUtils.getPropertyValue(aProducer, "cache")).isNotNull();
willThrow(new ProducerFencedException("test")).given(producer).beginTransaction();
assertThatExceptionOfType(ProducerFencedException.class).isThrownBy(() -> aProducer.beginTransaction());
aProducer.close();
assertThat(KafkaTestUtils.getPropertyValue(pf, "consumerProducers", Map.class)).hasSize(0);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.willAnswer;
import static org.mockito.BDDMockito.willReturn;
import static org.mockito.BDDMockito.willThrow;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
Expand Down Expand Up @@ -60,6 +61,7 @@
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.junit.jupiter.api.BeforeAll;
Expand Down Expand Up @@ -669,6 +671,62 @@ public void testRollbackProcessorCrash() throws Exception {
logger.info("Stop testRollbackNoRetries");
}

@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
void testNoAfterRollbackWhenFenced() throws Exception {
Consumer consumer = mock(Consumer.class);
final TopicPartition topicPartition0 = new TopicPartition("foo", 0);
final TopicPartition topicPartition1 = new TopicPartition("foo", 1);
Map<TopicPartition, List<ConsumerRecord<String, String>>> recordMap = new HashMap<>();
recordMap.put(topicPartition0, Collections.singletonList(new ConsumerRecord<>("foo", 0, 0, "key", "value")));
recordMap.put(topicPartition1, Collections.singletonList(new ConsumerRecord<>("foo", 1, 0, "key", "value")));
ConsumerRecords records = new ConsumerRecords(recordMap);
final AtomicBoolean done = new AtomicBoolean();
willAnswer(i -> {
if (done.compareAndSet(false, true)) {
return records;
}
else {
Thread.sleep(500);
return null;
}
}).given(consumer).poll(any(Duration.class));
ConsumerFactory cf = mock(ConsumerFactory.class);
willReturn(consumer).given(cf).createConsumer("group", "", null, KafkaTestUtils.defaultPropertyOverrides());
Producer producer = mock(Producer.class);
final CountDownLatch closeLatch = new CountDownLatch(1);
willAnswer(i -> {
closeLatch.countDown();
return null;
}).given(producer).close(any());
willThrow(new ProducerFencedException("test")).given(producer).commitTransaction();
ProducerFactory pf = mock(ProducerFactory.class);
given(pf.transactionCapable()).willReturn(true);
given(pf.createProducer(isNull())).willReturn(producer);
KafkaTransactionManager tm = new KafkaTransactionManager(pf);
ContainerProperties props = new ContainerProperties(new TopicPartitionOffset("foo", 0),
new TopicPartitionOffset("foo", 1));
props.setGroupId("group");
props.setTransactionManager(tm);
props.setMessageListener((MessageListener) m -> {
});
KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(cf, props);
AfterRollbackProcessor arp = mock(AfterRollbackProcessor.class);
given(arp.isProcessInTransaction()).willReturn(true);
container.setAfterRollbackProcessor(arp);
container.setBeanName("rollback");
container.start();
assertThat(closeLatch.await(10, TimeUnit.SECONDS)).isTrue();
InOrder inOrder = inOrder(producer);
inOrder.verify(producer).beginTransaction();
inOrder.verify(producer).commitTransaction();
inOrder.verify(producer).close(any());

verify(arp, never()).process(any(), any(), any(), anyBoolean());

container.stop();
}

@SuppressWarnings("serial")
public static class SomeOtherTransactionManager extends AbstractPlatformTransactionManager {

Expand Down