spring-kafka
spring-kafka copied to clipboard
Spring Managed Consumer Interceptors
Following https://github.com/spring-projects/spring-kafka/issues/2049, Spring Kafka may also support Spring managed interceptors for standard consumers and streams.
In pure Kafka, interceptors are specified through a configuration entry. This entry contains the list of classes related to the interceptors. This list of class is then converted into a list of interceptor instances (see AbstractConfig#getConfiguredInstances). As the interceptor is instantiated by Kafka itself, there is no way to inject dependencies into the interceptor expect by using the trick with the configure method (see https://docs.spring.io/spring-kafka/docs/current/reference/html/#interceptors). An update in Kafka to support instances instead of classes (technical limitation?) could help a lot.
So, the proposal would be to extend ProducerConfig, ConsumerConfig and StreamsConfig as follow to complete the list of interceptors with the Spring managed interceptors.
// example for streams
public class SpringAwareStreamConfig extends StreamsConfig {
private final List<ProducerInterceptor<?, ?>> producerInterceptors;
private final List<ConsumerInterceptor<?, ?>> consumerInterceptors;
public SpringAwareStreamConfig(Map<?, ?> props,
boolean doLog,
List<ProducerInterceptor<?, ?>> producerInterceptors,
List<ConsumerInterceptor<?, ?>> consumerInterceptors) {
super(props, doLog);
this.producerInterceptors = producerInterceptors;
this.consumerInterceptors = consumerInterceptors;
}
@Override
public <T> List<T> getConfiguredInstances(List<String> classNames, Class<T> t, Map<String, Object> configOverrides) {
final var configuredInstances = super.getConfiguredInstances(classNames, t, configOverrides);
if (ConsumerInterceptor.class.equals(t)) {
configuredInstances.addAll((Collection<? extends T>) consumerInterceptors);
}
if (ProducerInterceptor.class.equals(t)) {
configuredInstances.addAll((Collection<? extends T>) consumerInterceptors);
}
return configuredInstances;
}
}
These new Kafka config classes would then be respectively instantiated in
- DefaultKafkaProducerFactory#createRawProducer
- DefaultKafkaConsumerFactory#createRawConsumer
- StreamsBuilderFactoryBean#start
This approach would let Kafka handle the calls to the interceptor methods instead of having to call these methods in Spring Kafka itself.
Any comment, feedback, other proposal is more than welcome.
Interesting; I was not aware of those methods; much better than the hack in the docs you referenced.
Sounds like a plan. Looking forward to seeing a PR.
Unfortunately, the proposal I did is not applicable for producers and consumers as the constructor accepting a producer/consumer config is not public (default modifier). Issue has been created on the Kafka project to increase the visibility of this constructor. See https://issues.apache.org/jira/browse/KAFKA-13864
I could still drop a PR for streams but this would be incomplete.