spring-kafka icon indicating copy to clipboard operation
spring-kafka copied to clipboard

Provide a tondem RecordFilterStrategy with MessageConverter.

Open ioanngolovko opened this issue 5 months ago • 0 comments

Expected Behavior

Provide a tondem RecordFilterStrategy with MessageConverter.

Current Behavior

MessageConverter invokes after RecordFilterStrategy logic.

Context

If we configure MessageConverter (Json for example) -> then we expect to filter the converted Message, not just the deserialized ConsumerRecord (that is deserialized with String/Bytes/ByteArray SerDe). So for JsonDeserializer we must conver message inside our filter with our own logic, with ObjectMapper for example. It's dirty.

Filtering feature seems like incomplete due-to missing conjunction with Spring Messaging.

As you can see, here we try to filter original Consumer record and then invoke delegate logic. https://github.com/spring-projects/spring-kafka/blob/3.2.x/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/FilteringMessageListenerAdapter.java#L71

Possible Solution

  • We must check if MessageConverter is configured for current MessageListenerContainer and build the right adapters chain
  • Refactor chain to allow new DelegatingFilterStrategy to decide what kind of data to filter, ConsumerRecord or Message.

ioanngolovko avatar Sep 13 '24 06:09 ioanngolovko