pulsar icon indicating copy to clipboard operation
pulsar copied to clipboard

PIP 105: Support pluggable entry filter in Dispatcher

Open 315157973 opened this issue 4 years ago • 25 comments
trafficstars

Motivation

There are many users who need to use tag messages. The implementation of this part has also been discussed before:

https://lists.apache.org/list.html?*@pulsar.apache.org:lte=2y:Proposal%20for%20Consumer%20Filtering%20in%20Pulsar%20brokers

https://lists.apache.org/thread.html/ra4639e75659b2adf384b77bca85ab2ca9b4969dea2bd8896741baae2%40%3Cdev.pulsar.apache.org%3E

We need to provide a plug-in way so that developers can decide which messages can be sent to consumers and which ones are not. In this way, not only tag messages can be supported, users can also use this extension point to implement other features.

Goal

In this PIP, we only implement the entry filter framework at the broker level, and we will continue to support the namespace and topic level in other PR.

API Changes And Implementation

Broker

add option entryFilterClassName in broker.conf and ServiceConfiguration, default value is null.

Add a factory class, if the entryFilterClassName is set, the factory class will be created, then init the filter

public interface EntriesFilterProvider {

    // Use `EntriesFilterProvider` to create `EntriesFilter` through reflection
    EntriesFilter createEntriesFilter(Subscription subscription);

   // The default implementation class of EntriesFilterProvider
    public DefaultEntriesFilterProviderImpl implements EntriesFilterProvider {
	public  DefaultEntriesFilterProviderImpl(ServiceConfiguration  serviceConfiguration)  {
               //...
        }

         EntriesFilter createEntriesFilter(Subscription subscription) { //…  }
    }

}

Add some new interfaces to avoid relying on existing interfaces inside Pulsar. In this way, even if the internal interface changes in the future, the plugin can continue to run

// The interface that needs to be implemented in the plugin
public interface EntryFilter {

        public FilterResult filterEntries(Entry entry, FilterContext context);
        
        public static class FilterContext {
               EntryBatchSizes batchSizes,
               SendMessageInfo sendMessageInfo, 
               EntryBatchIndexesAcks indexesAcks,
               ManagedCursor cursor, 
               boolean isReplayRead,
               MessageMetadata msgMetadata
                ...
        }
        
        enum FilterResult {
               ACCEPT, // deliver to the Consumer
               REJECT, // skip the message
               // ...
        }

}

Why filter a single entry instead of List<Entry> ? The existing Dispatchers already traverse the entry list and parse the metadata of each Entry. We can reuse this data to avoid traversing the entry list twice and parsing the metadata twice.

Call the plugin in AbstractBaseDispatcher#filterEntriesForConsumer

if(entryFilter != null) {
    FilterResult result =  filterEntries(entry, FilterContext context);
    if(FilterResult.REJECT.equals(result)) {
           entries.set(i, null);
           entry.release();
	   subscription.acknowledgeMessage(entry);
           continue;
     } 
}

Client

Add optional attribute subscriptionProperties to the subscription, which can be set when subscribing to topic. These properties cannot be modified. We can only delete the subscription and create it again. a. This attribute needs to be added to the subscribe command b. For persistent subscriptions, these properties should stored in ZooKeeper with subscription c. REST API (create-subscription) should also support setting subscriptionProperties Example:

Consumer<byte[]> consumer = pulsarClient.newConsumer()
                .topic(topic)
                .subscriptionName("sub")
                .subscriptionProperties(new HashMap<String, String>)
                .subscribe();

Reject Alternatives

  1. Plug-in Dispatcher If we make this pluggable that we must define a limited private but "stable" API. Enrico's suggestion is to define particular needs and then add features to make pluggable single specific parts of the dispatcher.

  2. Create a message dimension filter At this stage, we can only do Entry-level filtering. If the Message in the Entry is forced to be filtered on the Broker side, there will be problems in the subsequent consumer ack. Therefore, if we want to use this filter, we must set enableBatching=false, which is the same as delayed messages.

315157973 avatar Oct 03 '21 11:10 315157973

It is not clear to me if the Consumer can attach some configuration value (like JMS message selector) that can be used in the Filter. Otherwise we can only create static filters

eolivelli avatar Oct 04 '21 06:10 eolivelli

It is not clear to me if the Consumer can attach some configuration value (like JMS message selector) that can be used in the Filter. Otherwise we can only create static filters

We can use it like this: Set properties when creating consumer. entryFilterClass can be defined as an adapter, which internally determines what filter logic to use according to properties

315157973 avatar Oct 05 '21 09:10 315157973

@eolivelli Do you think this is okay? If there is no problem, I will initiate a vote

315157973 avatar Oct 08 '21 07:10 315157973

@315157973

I am okay as far there is an API for the client to set such properties and to be able to use them in the broker.

can you please give me an example about how to set custom properties in the subscription in a way that the broker can leverage those configurations ?

eolivelli avatar Oct 08 '21 07:10 eolivelli

@315157973

I am okay as far there is an API for the client to set such properties and to be able to use them in the broker.

can you please give me an example about how to set custom properties in the subscription in a way that the broker can leverage those configurations ?

@eolivelli I added a description of this part. PTAL, Thanks

315157973 avatar Oct 11 '21 07:10 315157973

@eolivelli PTAL, thanks

315157973 avatar Oct 12 '21 14:10 315157973

I would change 'subscriptProperties' to 'subscriptionProperties'

Apart from this LGTM

eolivelli avatar Oct 12 '21 16:10 eolivelli

What happens if I subscribe from two different consumer with different subscriptProperties ?

eolivelli avatar Oct 12 '21 16:10 eolivelli

What happens if I subscribe from two different consumer with different subscriptProperties ?

subscriptionProperties is immutable. Attempts to modify will return an exception. If Properties is empty or the same as the existing one, we can successfully subscribe.

315157973 avatar Oct 12 '21 16:10 315157973

We need to provide a plug-in way so that developers can decide which messages can be sent to consumers and which ones are not. In this way, not only tag messages can be supported, users can also use this extension point to implement other features.

Could user implement their feature and deside which messages can be sent to consumers according to deserialized entry data? For interface public FilterResult filterEntries(Entry entry, FilterContext context); Entry parameter have no meaning for user unless user deserialize Entry.getData() in there implement, which may bring more CPU and GC workload on the broker. So should we allow user to deserialize each entry data in this interface?

wangjialing218 avatar Oct 15 '21 02:10 wangjialing218

@315157973 Did this support the topic with end-to-end encryption enabled?

yangl avatar Oct 26 '21 06:10 yangl

Could user implement their feature and deside which messages can be sent to consumers according to deserialized entry data? For interface public FilterResult filterEntries(Entry entry, FilterContext context); Entry parameter have no meaning for user unless user deserialize Entry.getData() in there implement, which may bring more CPU and GC workload on the broker. So should we allow user to deserialize each entry data in this interface?

  1. This interface is open to developers
  2. We only need to parse the header of the entry, instead of deserializing the entire Entry to the heap
  3. The parsed header information is already in the FilterContext, and you don't even need to re-parse the metadata. The incoming parameter is an Entry, you can play freely. You can even modify the data in Entry.

315157973 avatar Oct 26 '21 07:10 315157973

@315157973 Did this support the topic with end-to-end encryption enabled?

It depends on your implementation class. End-to-End encryption will also go through this logic

315157973 avatar Oct 26 '21 07:10 315157973

  1. The parsed header information is already in the FilterContext, and you don't even need to re-parse the metadata. The incoming parameter is an Entry, you can play freely. You can even modify the data in Entry.

If user want to play anything with Entry parameter, user need to deserialize entire Entry to heap in their implement, then user could modify the data in Entry. As @codelipenghui suggested in https://github.com/apache/pulsar/issues/11962#issuecomment-915716242, we should avoid to deserialize Entry to heap which will bring more GC workload on the broker. If we do not want user to do this, we could remove the Entry parameter.

wangjialing218 avatar Oct 26 '21 08:10 wangjialing218

  1. The parsed header information is already in the FilterContext, and you don't even need to re-parse the metadata. The incoming parameter is an Entry, you can play freely. You can even modify the data in Entry.

If user want to play anything with Entry parameter, user need to deserialize entire Entry to heap in their implement, then user could modify the data in Entry. As @codelipenghui suggested in #11962 (comment), we should avoid to deserialize Entry to heap which will bring more GC workload on the broker. If we do not want user to do this, we could remove the Entry parameter.

The framework will not deserialize the entry. Whether it will deserialize it depends on the developer. Entry as a parameter is necessary, we can also be used in many other scenarios , for example: modify the data of a specific Entry

315157973 avatar Oct 26 '21 08:10 315157973

The framework will not deserialize the entry. Whether it will deserialize it depends on the developer. Entry as a parameter is necessary, we can also be used in many other scenarios , for example: modify the data of a specific Entry

I aggreed. If we want to modify the data of a specific Entry, currently we could only use funtions. If we could modify Entry data in broker side directly, we could save much network workload. But it's not a good way to let user deserialize Entry data to heap, we could try to consider how to modify Entry data in broker side with a efficiency way and reduce the GC workload on the broker. Maybe we could create another PIP for how to modify the data of entry, since this PIP is for entry filter.

wangjialing218 avatar Oct 26 '21 09:10 wangjialing218

The framework will not deserialize the entry. Whether it will deserialize it depends on the developer. Entry as a parameter is necessary, we can also be used in many other scenarios , for example: modify the data of a specific Entry

I aggreed. If we want to modify the data of a specific Entry, currently we could only use funtions. If we could modify Entry data in broker side directly, we could save much network workload. But it's not a good way to let user deserialize Entry data to heap, we could try to consider how to modify Entry data in broker side with a efficiency way and reduce the GC workload on the broker. Maybe we could create another PIP for how to modify the data of entry, since this PIP is for entry filter.

If I just need to filter based on the content of the message, and the developer can tolerate the CPU and memory usage, why not? It all depends on the developer's implementation

315157973 avatar Oct 26 '21 09:10 315157973

If I just need to filter based on the content of the message, and the developer can tolerate the CPU and memory usage, why not? It all depends on the developer's implementation

The Entry data here is in direct memory, if user call Entry.getData() in there implmentation, the Entry data will transfer to heap. If message traffic is high, broker heap may be full soon and GC will repleated in seconds. We need to evaluate the impact carefully.

wangjialing218 avatar Oct 26 '21 10:10 wangjialing218

I recommend:

  1. Filtering should not introduce the message body, which can avoid the problem of end-to-end encryption unable to decrypt, and can solve the possible JVM GC problem.
  2. A special message header that has nothing to do with encryption is specially opened on the message header, which is used for actual filtering and reduces the filtering resource consumption (including memory and CPU) on the broker side.

kuangye098 avatar Nov 03 '21 06:11 kuangye098

Hi @315157973 for the doc side, I've discussed with @codelipenghui, he suggests creating an independent chapter message filter under Development and considering adding plugin and protocol handler sections under that chapter.

image

Anonymitaet avatar Nov 26 '21 03:11 Anonymitaet

For adding some more context, there was a proposal in the past from @andrekramer1 , that was discussed in this dev mailing list thread: https://lists.apache.org/thread/k0hb33nz42f3603hvs03p7lmltjgj40z . I would prefer a standard feature approach in Pulsar instead of making it an optional plugin. The reason for this is to prevent fragmentation of Pulsar into a set of optional plugins which might or might not be available or installed.

lhotari avatar Jan 20 '22 11:01 lhotari

The issue had no activity for 30 days, mark with Stale label.

github-actions[bot] avatar Feb 28 '22 01:02 github-actions[bot]

I can't find the vote in the mailing list searching PIP-105.

asafm avatar Jul 13 '22 08:07 asafm

@asafm good call! I can't find it either

eolivelli avatar Jul 13 '22 09:07 eolivelli

These properties cannot be modified.

Can you elaborate on the reason for this please? For long-lived subscriptions on persistent topics being able to modify the properties could definitely be beneficial, as recreating the subscription could be quite cumbersome if done regularly.

ullebe1 avatar Jul 22 '24 11:07 ullebe1