From a5d80b7511de83954304b8a915d205d5544978e9 Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Thu, 15 Feb 2024 14:31:37 -0500 Subject: [PATCH 1/3] GH-3032: RecoveryCallback on RetryingDeserializer Fixes: #3032 --- .../modules/ROOT/pages/kafka/serdes.adoc | 2 + .../serializer/RetryingDeserializer.java | 24 ++++++++--- .../serializer/RetryingDeserializerTests.java | 42 +++++++++++++++++-- 3 files changed, 58 insertions(+), 10 deletions(-) diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/serdes.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/serdes.adoc index d41fa77f61..a2616389fd 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/serdes.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/serdes.adoc @@ -397,6 +397,8 @@ ConsumerFactory cf = new DefaultKafkaConsumerFactory(myConsumerConfigs, new RetryingDeserializer(myUnreliableValueDeserializer, retryTemplate)); ---- +Starting with version `3.1.2`, a `RecoveryCallback` can be set on the `RetryingDeserializer` optionally. + Refer to the https://github.com/spring-projects/spring-retry[spring-retry] project for configuration of the `RetryTemplate` with a retry policy, back off policy, etc. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/RetryingDeserializer.java b/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/RetryingDeserializer.java index 5e2e7df137..e3c10af566 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/RetryingDeserializer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/RetryingDeserializer.java @@ -22,6 +22,8 @@ import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.serialization.Deserializer; +import org.springframework.lang.Nullable; +import org.springframework.retry.RecoveryCallback; import org.springframework.retry.RetryOperations; import org.springframework.util.Assert; @@ -30,12 +32,10 @@ * deserialization in case of transient errors. * * @param Type to be deserialized into. - * * @author Gary Russell * @author Wang Zhiyang - * + * @author Soby Chacko * @since 2.3 - * */ public class RetryingDeserializer implements Deserializer { @@ -43,6 +43,9 @@ public class RetryingDeserializer implements Deserializer { private final RetryOperations retryOperations; + @Nullable + private RecoveryCallback recoveryCallback; + public RetryingDeserializer(Deserializer delegate, RetryOperations retryOperations) { Assert.notNull(delegate, "the 'delegate' deserializer cannot be null"); Assert.notNull(retryOperations, "the 'retryOperations' deserializer cannot be null"); @@ -57,17 +60,17 @@ public void configure(Map configs, boolean isKey) { @Override public T deserialize(String topic, byte[] data) { - return this.retryOperations.execute(context -> this.delegate.deserialize(topic, data)); + return this.retryOperations.execute(context -> this.delegate.deserialize(topic, data), this.recoveryCallback); } @Override public T deserialize(String topic, Headers headers, byte[] data) { - return this.retryOperations.execute(context -> this.delegate.deserialize(topic, headers, data)); + return this.retryOperations.execute(context -> this.delegate.deserialize(topic, headers, data), this.recoveryCallback); } @Override public T deserialize(String topic, Headers headers, ByteBuffer data) { - return this.retryOperations.execute(context -> this.delegate.deserialize(topic, headers, data)); + return this.retryOperations.execute(context -> this.delegate.deserialize(topic, headers, data), this.recoveryCallback); } @Override @@ -75,4 +78,13 @@ public void close() { this.delegate.close(); } + /** + * Set a recovery callback to execute when the retries are exhausted. + * @param recoveryCallback {@link RecoveryCallback} to execute + * @since 3.1.2 + */ + public void setRecoveryCallback(@Nullable RecoveryCallback recoveryCallback) { + this.recoveryCallback = recoveryCallback; + } + } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/serializer/RetryingDeserializerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/support/serializer/RetryingDeserializerTests.java index 837471711f..0acc6cfad5 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/support/serializer/RetryingDeserializerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/serializer/RetryingDeserializerTests.java @@ -21,6 +21,8 @@ import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeaders; @@ -33,14 +35,13 @@ /** * @author Gary Russell * @author Wang Zhiyang - * + * @author Soby Chacko * @since 2.3 - * */ -public class RetryingDeserializerTests { +class RetryingDeserializerTests { @Test - void testRetry() { + void basicRetryingDeserializer() { Deser delegate = new Deser(); RetryingDeserializer rdes = new RetryingDeserializer<>(delegate, new RetryTemplate()); assertThat(rdes.deserialize("foo", "bar".getBytes())).isEqualTo("bar"); @@ -54,6 +55,39 @@ void testRetry() { rdes.close(); } + @Test + void retryDeserializerWithRecoveryCallback() throws InterruptedException { + NonRecoverableDeser delegate = new NonRecoverableDeser(); + RetryingDeserializer rdes = new RetryingDeserializer<>(delegate, new RetryTemplate()); + rdes.setRecoveryCallback(context -> { + delegate.latch.countDown(); + return null; + }); + assertThat(rdes.deserialize("my-topic", "my-data".getBytes())).isNull(); + assertThat(delegate.latch.await(10, TimeUnit.SECONDS)).isTrue(); + rdes.close(); + } + + public static class NonRecoverableDeser implements Deserializer { + + CountDownLatch latch = new CountDownLatch(4); + + @Override + public void configure(Map configs, boolean isKey) { + } + + @Override + public String deserialize(String topic, byte[] data) { + latch.countDown(); + throw new RuntimeException(); + } + + @Override + public void close() { + // empty + } + } + public static class Deser implements Deserializer { int n; From f6b24e8713a50598c0243aae6ac3802da28c72c6 Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Thu, 15 Feb 2024 16:11:26 -0500 Subject: [PATCH 2/3] PR review --- .../serializer/RetryingDeserializer.java | 18 ++++---- .../serializer/RetryingDeserializerTests.java | 46 ++++++------------- 2 files changed, 24 insertions(+), 40 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/RetryingDeserializer.java b/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/RetryingDeserializer.java index e3c10af566..fe01c2449e 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/RetryingDeserializer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/RetryingDeserializer.java @@ -53,6 +53,15 @@ public RetryingDeserializer(Deserializer delegate, RetryOperations retryOpera this.retryOperations = retryOperations; } + /** + * Set a recovery callback to execute when the retries are exhausted. + * @param recoveryCallback {@link RecoveryCallback} to execute + * @since 3.1.2 + */ + public void setRecoveryCallback(@Nullable RecoveryCallback recoveryCallback) { + this.recoveryCallback = recoveryCallback; + } + @Override public void configure(Map configs, boolean isKey) { this.delegate.configure(configs, isKey); @@ -78,13 +87,4 @@ public void close() { this.delegate.close(); } - /** - * Set a recovery callback to execute when the retries are exhausted. - * @param recoveryCallback {@link RecoveryCallback} to execute - * @since 3.1.2 - */ - public void setRecoveryCallback(@Nullable RecoveryCallback recoveryCallback) { - this.recoveryCallback = recoveryCallback; - } - } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/serializer/RetryingDeserializerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/support/serializer/RetryingDeserializerTests.java index 0acc6cfad5..94156b2e3d 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/support/serializer/RetryingDeserializerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/serializer/RetryingDeserializerTests.java @@ -17,12 +17,14 @@ package org.springframework.kafka.support.serializer; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeaders; @@ -30,6 +32,8 @@ import org.apache.kafka.common.utils.Utils; import org.junit.jupiter.api.Test; +import org.springframework.retry.RecoveryCallback; +import org.springframework.retry.RetryContext; import org.springframework.retry.support.RetryTemplate; /** @@ -55,37 +59,17 @@ void basicRetryingDeserializer() { rdes.close(); } + @SuppressWarnings("unchecked") @Test - void retryDeserializerWithRecoveryCallback() throws InterruptedException { - NonRecoverableDeser delegate = new NonRecoverableDeser(); - RetryingDeserializer rdes = new RetryingDeserializer<>(delegate, new RetryTemplate()); - rdes.setRecoveryCallback(context -> { - delegate.latch.countDown(); - return null; - }); - assertThat(rdes.deserialize("my-topic", "my-data".getBytes())).isNull(); - assertThat(delegate.latch.await(10, TimeUnit.SECONDS)).isTrue(); - rdes.close(); - } - - public static class NonRecoverableDeser implements Deserializer { - - CountDownLatch latch = new CountDownLatch(4); - - @Override - public void configure(Map configs, boolean isKey) { - } - - @Override - public String deserialize(String topic, byte[] data) { - latch.countDown(); + void retryingDeserializerWithRecoveryCallback() throws Exception { + RetryingDeserializer rdes = new RetryingDeserializer<>((s, b) -> { throw new RuntimeException(); - } - - @Override - public void close() { - // empty - } + }, new RetryTemplate()); + RecoveryCallback recoveryCallback = mock(RecoveryCallback.class); + rdes.setRecoveryCallback(recoveryCallback); + rdes.deserialize("my-topic", "my-data".getBytes()); + verify(recoveryCallback, times(1)).recover(any(RetryContext.class)); + rdes.close(); } public static class Deser implements Deserializer { From d4f6a80a9157f6f73facaea439b43d06ce6d8a7b Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Thu, 15 Feb 2024 16:37:01 -0500 Subject: [PATCH 3/3] PR review --- .../support/serializer/RetryingDeserializerTests.java | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/serializer/RetryingDeserializerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/support/serializer/RetryingDeserializerTests.java index 94156b2e3d..fb1d3846d1 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/support/serializer/RetryingDeserializerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/serializer/RetryingDeserializerTests.java @@ -19,7 +19,6 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.any; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import java.nio.ByteBuffer; @@ -59,17 +58,15 @@ void basicRetryingDeserializer() { rdes.close(); } - @SuppressWarnings("unchecked") @Test void retryingDeserializerWithRecoveryCallback() throws Exception { RetryingDeserializer rdes = new RetryingDeserializer<>((s, b) -> { throw new RuntimeException(); }, new RetryTemplate()); - RecoveryCallback recoveryCallback = mock(RecoveryCallback.class); + RecoveryCallback recoveryCallback = mock(); rdes.setRecoveryCallback(recoveryCallback); rdes.deserialize("my-topic", "my-data".getBytes()); - verify(recoveryCallback, times(1)).recover(any(RetryContext.class)); - rdes.close(); + verify(recoveryCallback).recover(any(RetryContext.class)); } public static class Deser implements Deserializer {