Skip to content

Provide a recovery callback for the execute method of RetryingDeserializer  #3032

Closed
@vspiliop

Description

@vspiliop

Expected Behavior

Provide a recovery callback for the execute method of org.springframework.kafka.support.serializer.RetryingDeserializer . Maybe make necessary fields protected from private and create class RetryingAndRecoveringDeserializer extends RetryingDeserializer

Current Behavior

No such option at the moment.

I would like to be able to re-try for example HTTP 500 Schema Registry errors in a blocking way for e.g. one minute and then trigger the ErrorHandlingDeserializer so that we can re-consume and start the blocking re-tries and so on so forth. This is helpful for avoiding consumer group re-balancing, by retrying forever in a blocking fashion. But if it is not a recoverable error (e.g. HTTP 400 Schema Registry), then I would like to skip the event consumed (by instructing ErrorHandlingDeserializer to skip).

An example could be:

  @Bean
  public ErrorHandlingDeserializer<Object> retryingValueDeserializer(
      RetryTemplate retryRegistryhttp500ErrorsTemplate,
      SchemaRegistryClient schemaRegistryClient) {
    return new ErrorHandlingDeserializer<>(
        new RetryingAndRecoveringDeserializer<>(
            new KafkaAvroDeserializer(schemaRegistryClient),
            retryRegistryhttp500ErrorsTemplate,
            this::triggerErrorHandlingDeserializerToReconsumeOrSkip));
  }

  private <T> T triggerErrorHandlingDeserializerToReconsumeOrSkip(RetryContext context) {
    if (isRecoverable(context)) {
      throw new BlockingRetriesExhaustedRecoverableException();
    } else {
      throw new BlockingRetriesExhaustedNonRecoverableException();
    }
  }

  /**
   * Non-recoverable exceptions are not retried by the blocking retry template and retry count is
   * always 1.
   */
  private boolean isRecoverable(RetryContext context) {
    return context.getRetryCount() != 1;
  }

  @Bean
  public RetryTemplate retryRegistryhttp500ErrorsTemplate() {

    ExpressionRetryPolicy retryRegistry5xxErrors =
        new ExpressionRetryPolicy(
            "cause instanceof T(io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException) and cause?.errorCode >= 500");
    retryRegistry5xxErrors.setMaxAttempts(blockingRetries);

    return RetryTemplate.builder()
        .customPolicy(retryRegistry5xxErrors)
        .fixedBackoff(Duration.ofSeconds(1).toMillis())
        .build();
  }

//...

  private CommonErrorHandler errorHandler() {

    final var parentErrorHandler =
        new CommonDelegatingErrorHandler(
            new DefaultErrorHandler(
                new FixedBackOff(0L, 0L)));

    DefaultErrorHandler retryForeverHandler =
        new DefaultErrorHandler(
            new FixedBackOff(2000L, FixedBackOff.UNLIMITED_ATTEMPTS));
    retryForeverHandler.removeClassification(DeserializationException.class);

    parentErrorHandler.setCauseChainTraversing(true);
    parentErrorHandler.addDelegate(
        BlockingRetriesExhaustedRecoverableException.class, retryForeverHandler);

    return parentErrorHandler;
  }

Context

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions