spring-kafka icon indicating copy to clipboard operation
spring-kafka copied to clipboard

Add a way to pass objects as values for @KafkaListerner properties

Open geertvb opened this issue 1 year ago • 3 comments
trafficstars

Expected Behavior

Have a way to pass objects or bean references as values for @KafkaListener properties (KafkaConsumer properties).

For example to use the bean reference #{__listener.filterPredicate} as a configuration property for a kafka-client ConsumerInterceptor. See also Wiring Spring Beans into Producer/Consumer Interceptors https://docs.spring.io/spring-kafka/reference/kafka/interceptors.html

@Slf4j
@Component
@RequiredArgsConstructor
public class MessageListener {

    @Getter
    private final Predicate<ConsumerRecord<?, ?>> filterPredicate;

    @KafkaListener(
        topics = "events",
        properties = {
            "interceptor.classes=org.example.FilterInterceptor",
            "filter.predicate=#{__listener.filterPredicate}"})
    public void handleEvent(ConsumerRecord<String, String> record) {
        log.info("Handling record: {}", record.value());
    }

}

Current Behavior

KafkaListener properties only accept string values. See javadoc SpEL expressions must resolve to a String, a @{link String[]} or a Collection<String>

Attempting to use a bean reference will result in an exception with the message. No converter found capable of converting from type [eu.europa.ec.test.kafka.FilterRecordPredicate] to type [java.lang.String]

Context

Currently we use either a consumer factory customizer (In case it applies to all consumers) or custom consumer factory beans (if it is consumer specific). Especially in the latter case we prefer to use the default spring boot consumer factory and annotations on the kafka listener.

    @Override
    public void customize(DefaultKafkaConsumerFactory<?, ?> consumerFactory) {
        consumerFactory.updateConfigs(Map.of(
                    INTERCEPTOR_CLASSES_CONFIG, FilterRecordInterceptor.class.getName(),
                    "filter.predicate", filterPredicate));
    }

geertvb avatar Oct 21 '24 07:10 geertvb