Skip to content

Spring Managed Consumer Interceptors #2244

Open
@frosiere

Description

@frosiere

Following #2049, Spring Kafka may also support Spring managed interceptors for standard consumers and streams.

In pure Kafka, interceptors are specified through a configuration entry. This entry contains the list of classes related to the interceptors. This list of class is then converted into a list of interceptor instances (see AbstractConfig#getConfiguredInstances). As the interceptor is instantiated by Kafka itself, there is no way to inject dependencies into the interceptor expect by using the trick with the configure method (see https://docs.spring.io/spring-kafka/docs/current/reference/html/#interceptors).
An update in Kafka to support instances instead of classes (technical limitation?) could help a lot.

So, the proposal would be to extend ProducerConfig, ConsumerConfig and StreamsConfig as follow to complete the list of interceptors with the Spring managed interceptors.

// example for streams
public class SpringAwareStreamConfig extends StreamsConfig {

    private final List<ProducerInterceptor<?, ?>> producerInterceptors;
    private final List<ConsumerInterceptor<?, ?>> consumerInterceptors;

    public SpringAwareStreamConfig(Map<?, ?> props,
                                   boolean doLog,
                                   List<ProducerInterceptor<?, ?>> producerInterceptors,
                                   List<ConsumerInterceptor<?, ?>> consumerInterceptors) {
        super(props, doLog);
        this.producerInterceptors = producerInterceptors;
        this.consumerInterceptors = consumerInterceptors;
    }

    @Override
    public <T> List<T> getConfiguredInstances(List<String> classNames, Class<T> t, Map<String, Object> configOverrides) {
        final var configuredInstances = super.getConfiguredInstances(classNames, t, configOverrides);
        if (ConsumerInterceptor.class.equals(t)) {
            configuredInstances.addAll((Collection<? extends T>) consumerInterceptors);
        }
        if (ProducerInterceptor.class.equals(t)) {
            configuredInstances.addAll((Collection<? extends T>) consumerInterceptors);
        }
        return configuredInstances;
    }
}

These new Kafka config classes would then be respectively instantiated in

  • DefaultKafkaProducerFactory#createRawProducer
  • DefaultKafkaConsumerFactory#createRawConsumer
  • StreamsBuilderFactoryBean#start

This approach would let Kafka handle the calls to the interceptor methods instead of having to call these methods in Spring Kafka itself.

Any comment, feedback, other proposal is more than welcome.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions