Description
Expected Behavior
Provide a recovery callback for the execute method of org.springframework.kafka.support.serializer.RetryingDeserializer
. Maybe make necessary fields protected from private and create class RetryingAndRecoveringDeserializer extends RetryingDeserializer
Current Behavior
No such option at the moment.
I would like to be able to re-try for example HTTP 500 Schema Registry errors in a blocking way for e.g. one minute and then trigger the ErrorHandlingDeserializer
so that we can re-consume and start the blocking re-tries and so on so forth. This is helpful for avoiding consumer group re-balancing, by retrying forever in a blocking fashion. But if it is not a recoverable error (e.g. HTTP 400 Schema Registry), then I would like to skip the event consumed (by instructing ErrorHandlingDeserializer
to skip).
An example could be:
@Bean
public ErrorHandlingDeserializer<Object> retryingValueDeserializer(
RetryTemplate retryRegistryhttp500ErrorsTemplate,
SchemaRegistryClient schemaRegistryClient) {
return new ErrorHandlingDeserializer<>(
new RetryingAndRecoveringDeserializer<>(
new KafkaAvroDeserializer(schemaRegistryClient),
retryRegistryhttp500ErrorsTemplate,
this::triggerErrorHandlingDeserializerToReconsumeOrSkip));
}
private <T> T triggerErrorHandlingDeserializerToReconsumeOrSkip(RetryContext context) {
if (isRecoverable(context)) {
throw new BlockingRetriesExhaustedRecoverableException();
} else {
throw new BlockingRetriesExhaustedNonRecoverableException();
}
}
/**
* Non-recoverable exceptions are not retried by the blocking retry template and retry count is
* always 1.
*/
private boolean isRecoverable(RetryContext context) {
return context.getRetryCount() != 1;
}
@Bean
public RetryTemplate retryRegistryhttp500ErrorsTemplate() {
ExpressionRetryPolicy retryRegistry5xxErrors =
new ExpressionRetryPolicy(
"cause instanceof T(io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException) and cause?.errorCode >= 500");
retryRegistry5xxErrors.setMaxAttempts(blockingRetries);
return RetryTemplate.builder()
.customPolicy(retryRegistry5xxErrors)
.fixedBackoff(Duration.ofSeconds(1).toMillis())
.build();
}
//...
private CommonErrorHandler errorHandler() {
final var parentErrorHandler =
new CommonDelegatingErrorHandler(
new DefaultErrorHandler(
new FixedBackOff(0L, 0L)));
DefaultErrorHandler retryForeverHandler =
new DefaultErrorHandler(
new FixedBackOff(2000L, FixedBackOff.UNLIMITED_ATTEMPTS));
retryForeverHandler.removeClassification(DeserializationException.class);
parentErrorHandler.setCauseChainTraversing(true);
parentErrorHandler.addDelegate(
BlockingRetriesExhaustedRecoverableException.class, retryForeverHandler);
return parentErrorHandler;
}
Context