spring-kafka icon indicating copy to clipboard operation
spring-kafka copied to clipboard

Spring Managed Consumer Interceptors

Open frosiere opened this issue 2 years ago • 2 comments

Following https://github.com/spring-projects/spring-kafka/issues/2049, Spring Kafka may also support Spring managed interceptors for standard consumers and streams.

In pure Kafka, interceptors are specified through a configuration entry. This entry contains the list of classes related to the interceptors. This list of class is then converted into a list of interceptor instances (see AbstractConfig#getConfiguredInstances). As the interceptor is instantiated by Kafka itself, there is no way to inject dependencies into the interceptor expect by using the trick with the configure method (see https://docs.spring.io/spring-kafka/docs/current/reference/html/#interceptors). An update in Kafka to support instances instead of classes (technical limitation?) could help a lot.

So, the proposal would be to extend ProducerConfig, ConsumerConfig and StreamsConfig as follow to complete the list of interceptors with the Spring managed interceptors.

// example for streams
public class SpringAwareStreamConfig extends StreamsConfig {

    private final List<ProducerInterceptor<?, ?>> producerInterceptors;
    private final List<ConsumerInterceptor<?, ?>> consumerInterceptors;

    public SpringAwareStreamConfig(Map<?, ?> props,
                                   boolean doLog,
                                   List<ProducerInterceptor<?, ?>> producerInterceptors,
                                   List<ConsumerInterceptor<?, ?>> consumerInterceptors) {
        super(props, doLog);
        this.producerInterceptors = producerInterceptors;
        this.consumerInterceptors = consumerInterceptors;
    }

    @Override
    public <T> List<T> getConfiguredInstances(List<String> classNames, Class<T> t, Map<String, Object> configOverrides) {
        final var configuredInstances = super.getConfiguredInstances(classNames, t, configOverrides);
        if (ConsumerInterceptor.class.equals(t)) {
            configuredInstances.addAll((Collection<? extends T>) consumerInterceptors);
        }
        if (ProducerInterceptor.class.equals(t)) {
            configuredInstances.addAll((Collection<? extends T>) consumerInterceptors);
        }
        return configuredInstances;
    }
}

These new Kafka config classes would then be respectively instantiated in

  • DefaultKafkaProducerFactory#createRawProducer
  • DefaultKafkaConsumerFactory#createRawConsumer
  • StreamsBuilderFactoryBean#start

This approach would let Kafka handle the calls to the interceptor methods instead of having to call these methods in Spring Kafka itself.

Any comment, feedback, other proposal is more than welcome.

frosiere avatar Apr 27 '22 11:04 frosiere

Interesting; I was not aware of those methods; much better than the hack in the docs you referenced.

Sounds like a plan. Looking forward to seeing a PR.

garyrussell avatar Apr 27 '22 14:04 garyrussell

Unfortunately, the proposal I did is not applicable for producers and consumers as the constructor accepting a producer/consumer config is not public (default modifier). Issue has been created on the Kafka project to increase the visibility of this constructor. See https://issues.apache.org/jira/browse/KAFKA-13864

I could still drop a PR for streams but this would be incomplete.

frosiere avatar Apr 30 '22 20:04 frosiere