Skip to content

GH-3032: RecoveryCallback on RetryingDeserializer #3035

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Feb 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,8 @@ ConsumerFactory cf = new DefaultKafkaConsumerFactory(myConsumerConfigs,
new RetryingDeserializer(myUnreliableValueDeserializer, retryTemplate));
----

Starting with version `3.1.2`, a `RecoveryCallback` can be set on the `RetryingDeserializer` optionally.

Refer to the https://github.com/spring-projects/spring-retry[spring-retry] project for configuration of the `RetryTemplate` with a retry policy, back off policy, etc.


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Deserializer;

import org.springframework.lang.Nullable;
import org.springframework.retry.RecoveryCallback;
import org.springframework.retry.RetryOperations;
import org.springframework.util.Assert;

Expand All @@ -30,44 +32,54 @@
* deserialization in case of transient errors.
*
* @param <T> Type to be deserialized into.
*
* @author Gary Russell
* @author Wang Zhiyang
*
* @author Soby Chacko
* @since 2.3
*
*/
public class RetryingDeserializer<T> implements Deserializer<T> {

private final Deserializer<T> delegate;

private final RetryOperations retryOperations;

@Nullable
private RecoveryCallback<T> recoveryCallback;

public RetryingDeserializer(Deserializer<T> delegate, RetryOperations retryOperations) {
Assert.notNull(delegate, "the 'delegate' deserializer cannot be null");
Assert.notNull(retryOperations, "the 'retryOperations' deserializer cannot be null");
this.delegate = delegate;
this.retryOperations = retryOperations;
}

/**
* Set a recovery callback to execute when the retries are exhausted.
* @param recoveryCallback {@link RecoveryCallback} to execute
* @since 3.1.2
*/
public void setRecoveryCallback(@Nullable RecoveryCallback<T> recoveryCallback) {
this.recoveryCallback = recoveryCallback;
}

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
this.delegate.configure(configs, isKey);
}

@Override
public T deserialize(String topic, byte[] data) {
return this.retryOperations.execute(context -> this.delegate.deserialize(topic, data));
return this.retryOperations.execute(context -> this.delegate.deserialize(topic, data), this.recoveryCallback);
}

@Override
public T deserialize(String topic, Headers headers, byte[] data) {
return this.retryOperations.execute(context -> this.delegate.deserialize(topic, headers, data));
return this.retryOperations.execute(context -> this.delegate.deserialize(topic, headers, data), this.recoveryCallback);
}

@Override
public T deserialize(String topic, Headers headers, ByteBuffer data) {
return this.retryOperations.execute(context -> this.delegate.deserialize(topic, headers, data));
return this.retryOperations.execute(context -> this.delegate.deserialize(topic, headers, data), this.recoveryCallback);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
package org.springframework.kafka.support.serializer;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
Expand All @@ -28,19 +31,20 @@
import org.apache.kafka.common.utils.Utils;
import org.junit.jupiter.api.Test;

import org.springframework.retry.RecoveryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.support.RetryTemplate;

/**
* @author Gary Russell
* @author Wang Zhiyang
*
* @author Soby Chacko
* @since 2.3
*
*/
public class RetryingDeserializerTests {
class RetryingDeserializerTests {

@Test
void testRetry() {
void basicRetryingDeserializer() {
Deser delegate = new Deser();
RetryingDeserializer<String> rdes = new RetryingDeserializer<>(delegate, new RetryTemplate());
assertThat(rdes.deserialize("foo", "bar".getBytes())).isEqualTo("bar");
Expand All @@ -54,6 +58,17 @@ void testRetry() {
rdes.close();
}

@Test
void retryingDeserializerWithRecoveryCallback() throws Exception {
RetryingDeserializer<String> rdes = new RetryingDeserializer<>((s, b) -> {
throw new RuntimeException();
}, new RetryTemplate());
RecoveryCallback<String> recoveryCallback = mock();
rdes.setRecoveryCallback(recoveryCallback);
rdes.deserialize("my-topic", "my-data".getBytes());
verify(recoveryCallback).recover(any(RetryContext.class));
}

public static class Deser implements Deserializer<String> {

int n;
Expand Down