spring-kafka
spring-kafka copied to clipboard
Provide a tondem RecordFilterStrategy with MessageConverter.
Expected Behavior
Provide a tondem RecordFilterStrategy with MessageConverter.
Current Behavior
MessageConverter invokes after RecordFilterStrategy logic.
Context
If we configure MessageConverter (Json for example) -> then we expect to filter the converted Message
, not just the deserialized ConsumerRecord
(that is deserialized with String/Bytes/ByteArray SerDe). So for JsonDeserializer we must conver message inside our filter
with our own logic, with ObjectMapper
for example. It's dirty.
Filtering feature seems like incomplete due-to missing conjunction with Spring Messaging.
As you can see, here we try to filter original Consumer record and then invoke delegate logic. https://github.com/spring-projects/spring-kafka/blob/3.2.x/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/FilteringMessageListenerAdapter.java#L71
Possible Solution
- We must check if MessageConverter is configured for current
MessageListenerContainer
and build the right adapters chain - Refactor chain to allow new
DelegatingFilterStrategy
to decide what kind of data to filter,ConsumerRecord
orMessage
.