spring-kafka icon indicating copy to clipboard operation
spring-kafka copied to clipboard

Adds the ability to add record interceptors instead of override them

Open Nyamiou opened this issue 1 year ago • 5 comments
trafficstars

Expected Behavior

I'm using a Spring Cloud Steam ListenerContainerCustomizer to customize a AbstractMessageListenerContainer in order to add a record interceptor. I would expect to be able to add a record interceptor at the last position, or to be able to get the current record interceptor, create a CompositeRecordInterceptor to do the same thing and then set it (either is fine).

Current Behavior

The issue I have is that only setRecordInterceptor(...) is public, not getRecordInterceptor() and there is no addRecordInterceptor(...) either. I would like to keep the existing record interceptors there and just add mine in last position but there seem to be no easy way to do this.

Context

I have a code similar to this:

@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> listenerContainerCustomizer() {
	return (container, destinationName, group) -> container.setRecordInterceptor(new MyRecordInterceptor());
}

That I want to use in a small company internal library, but I don't want to remove the other projects the ability to use their own record interceptors.

I tried to use Spring Cloud Steam ChannelInterceptor (with GlobalChannelInterceptor) to achieve the thing I wanted, but they call afterSendCompletion(...) after each attempts, while the record interceptor afterRecord(...) is called after all attempts. Also they do not allow to modify headers.

Nyamiou avatar Oct 08 '24 14:10 Nyamiou

Thank for the report! We don't find this as an easy fix since there is going to be too many moving parts with those add/remove([indext]) in the CompositeRecordInterceptor and AbstractMessageListenerContainer. So, unless you see how to fix it quickly, we are going to look into that for the next version.

artembilan avatar Oct 08 '24 16:10 artembilan

No problem, this is not urgent. I just wanted this to hopefully be fixed in the future. Thank you for answering so quickly.

Nyamiou avatar Oct 08 '24 18:10 Nyamiou

@Nyamiou I have an idea for you. maybe, it could bo short-term solution. How about using this class for your case?

public class TestRecordInterceptor<K,V> implements RecordInterceptor<K, V> {

    private final List<RecordInterceptor<K, V>> children = new ArrayList<>();

    public void addRecordInterceptor(RecordInterceptor<K, V> interceptor) {
        this.children.add(interceptor);
    }
    
    @Override
    public ConsumerRecord<K, V> intercept(ConsumerRecord<K, V> record, Consumer<K, V> consumer) {
        ConsumerRecord<K, V> invokedRecord = record;
        for (RecordInterceptor<K, V> child : children) {
            invokedRecord = child.intercept(invokedRecord, consumer);
        }
        return invokedRecord;
    }
    
}

chickenchickenlove avatar Oct 09 '24 13:10 chickenchickenlove

Right, @chickenchickenlove , that will work for some use-case. But I believe @Nyamiou 's request is more about AbstractMessageListenerContainer internals, where an addRecordInterceptor() API would be exposed to deal with a CompositeRecordInterceptor behind the scene. That's why the request, but not a workaround in the target project.

artembilan avatar Oct 09 '24 13:10 artembilan

I see! Thank you for the explanation 🙇‍♂️

chickenchickenlove avatar Oct 10 '24 12:10 chickenchickenlove

@chickenchickenlove https://willispersistence821.github.io/selfish4/

Ashby-agm-core avatar Oct 20 '25 16:10 Ashby-agm-core