pulsar
pulsar copied to clipboard
PIP 105: Support pluggable entry filter in Dispatcher
Motivation
There are many users who need to use tag messages. The implementation of this part has also been discussed before:
https://lists.apache.org/list.html?*@pulsar.apache.org:lte=2y:Proposal%20for%20Consumer%20Filtering%20in%20Pulsar%20brokers
https://lists.apache.org/thread.html/ra4639e75659b2adf384b77bca85ab2ca9b4969dea2bd8896741baae2%40%3Cdev.pulsar.apache.org%3E
We need to provide a plug-in way so that developers can decide which messages can be sent to consumers and which ones are not. In this way, not only tag messages can be supported, users can also use this extension point to implement other features.
Goal
In this PIP, we only implement the entry filter framework at the broker level, and we will continue to support the namespace and topic level in other PR.
API Changes And Implementation
Broker
add option entryFilterClassName in broker.conf and ServiceConfiguration, default value is null.
Add a factory class, if the entryFilterClassName is set, the factory class will be created, then init the filter
public interface EntriesFilterProvider {
// Use `EntriesFilterProvider` to create `EntriesFilter` through reflection
EntriesFilter createEntriesFilter(Subscription subscription);
// The default implementation class of EntriesFilterProvider
public DefaultEntriesFilterProviderImpl implements EntriesFilterProvider {
public DefaultEntriesFilterProviderImpl(ServiceConfiguration serviceConfiguration) {
//...
}
EntriesFilter createEntriesFilter(Subscription subscription) { //… }
}
}
Add some new interfaces to avoid relying on existing interfaces inside Pulsar. In this way, even if the internal interface changes in the future, the plugin can continue to run
// The interface that needs to be implemented in the plugin
public interface EntryFilter {
public FilterResult filterEntries(Entry entry, FilterContext context);
public static class FilterContext {
EntryBatchSizes batchSizes,
SendMessageInfo sendMessageInfo,
EntryBatchIndexesAcks indexesAcks,
ManagedCursor cursor,
boolean isReplayRead,
MessageMetadata msgMetadata
...
}
enum FilterResult {
ACCEPT, // deliver to the Consumer
REJECT, // skip the message
// ...
}
}
Why filter a single entry instead of List<Entry> ? The existing Dispatchers already traverse the entry list and parse the metadata of each Entry. We can reuse this data to avoid traversing the entry list twice and parsing the metadata twice.
Call the plugin in AbstractBaseDispatcher#filterEntriesForConsumer
if(entryFilter != null) {
FilterResult result = filterEntries(entry, FilterContext context);
if(FilterResult.REJECT.equals(result)) {
entries.set(i, null);
entry.release();
subscription.acknowledgeMessage(entry);
continue;
}
}
Client
Add optional attribute subscriptionProperties to the subscription, which can be set when subscribing to topic. These properties cannot be modified. We can only delete the subscription and create it again.
a. This attribute needs to be added to the subscribe command
b. For persistent subscriptions, these properties should stored in ZooKeeper with subscription
c. REST API (create-subscription) should also support setting subscriptionProperties
Example:
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName("sub")
.subscriptionProperties(new HashMap<String, String>)
.subscribe();
Reject Alternatives
-
Plug-in Dispatcher If we make this pluggable that we must define a limited private but "stable" API. Enrico's suggestion is to define particular needs and then add features to make pluggable single specific parts of the dispatcher.
-
Create a message dimension filter At this stage, we can only do Entry-level filtering. If the Message in the Entry is forced to be filtered on the Broker side, there will be problems in the subsequent consumer ack. Therefore, if we want to use this filter, we must set enableBatching=false, which is the same as delayed messages.
It is not clear to me if the Consumer can attach some configuration value (like JMS message selector) that can be used in the Filter. Otherwise we can only create static filters
It is not clear to me if the Consumer can attach some configuration value (like JMS message selector) that can be used in the Filter. Otherwise we can only create static filters
We can use it like this:
Set properties when creating consumer.
entryFilterClass can be defined as an adapter, which internally determines what filter logic to use according to properties
@eolivelli Do you think this is okay? If there is no problem, I will initiate a vote
@315157973
I am okay as far there is an API for the client to set such properties and to be able to use them in the broker.
can you please give me an example about how to set custom properties in the subscription in a way that the broker can leverage those configurations ?
@315157973
I am okay as far there is an API for the client to set such properties and to be able to use them in the broker.
can you please give me an example about how to set custom properties in the subscription in a way that the broker can leverage those configurations ?
@eolivelli I added a description of this part. PTAL, Thanks
@eolivelli PTAL, thanks
I would change 'subscriptProperties' to 'subscriptionProperties'
Apart from this LGTM
What happens if I subscribe from two different consumer with different subscriptProperties ?
What happens if I subscribe from two different consumer with different subscriptProperties ?
subscriptionProperties is immutable. Attempts to modify will return an exception. If Properties is empty or the same as the existing one, we can successfully subscribe.
We need to provide a plug-in way so that developers can decide which messages can be sent to consumers and which ones are not. In this way, not only tag messages can be supported, users can also use this extension point to implement other features.
Could user implement their feature and deside which messages can be sent to consumers according to deserialized entry data?
For interface public FilterResult filterEntries(Entry entry, FilterContext context); Entry parameter have no meaning for user unless user deserialize Entry.getData() in there implement, which may bring more CPU and GC workload on the broker.
So should we allow user to deserialize each entry data in this interface?
@315157973 Did this support the topic with end-to-end encryption enabled?
Could user implement their feature and deside which messages can be sent to consumers according to deserialized entry data? For interface
public FilterResult filterEntries(Entry entry, FilterContext context);Entryparameter have no meaning for user unless user deserializeEntry.getData()in there implement, which may bring more CPU and GC workload on the broker. So should we allow user to deserialize each entry data in this interface?
- This interface is open to developers
- We only need to parse the header of the entry, instead of deserializing the entire Entry to the heap
- The parsed header information is already in the
FilterContext, and you don't even need to re-parse the metadata. The incoming parameter is an Entry, you can play freely. You can even modify the data in Entry.
@315157973 Did this support the topic with end-to-end encryption enabled?
It depends on your implementation class. End-to-End encryption will also go through this logic
- The parsed header information is already in the
FilterContext, and you don't even need to re-parse the metadata. The incoming parameter is an Entry, you can play freely. You can even modify the data in Entry.
If user want to play anything with Entry parameter, user need to deserialize entire Entry to heap in their implement, then user could modify the data in Entry.
As @codelipenghui suggested in https://github.com/apache/pulsar/issues/11962#issuecomment-915716242, we should avoid to deserialize Entry to heap which will bring more GC workload on the broker.
If we do not want user to do this, we could remove the Entry parameter.
- The parsed header information is already in the
FilterContext, and you don't even need to re-parse the metadata. The incoming parameter is an Entry, you can play freely. You can even modify the data in Entry.If user want to play anything with
Entryparameter, user need to deserialize entire Entry to heap in their implement, then user could modify the data in Entry. As @codelipenghui suggested in #11962 (comment), we should avoid to deserialize Entry to heap which will bring more GC workload on the broker. If we do not want user to do this, we could remove the Entry parameter.
The framework will not deserialize the entry. Whether it will deserialize it depends on the developer. Entry as a parameter is necessary, we can also be used in many other scenarios , for example: modify the data of a specific Entry
The framework will not deserialize the entry. Whether it will deserialize it depends on the developer. Entry as a parameter is necessary, we can also be used in many other scenarios , for example: modify the data of a specific Entry
I aggreed. If we want to modify the data of a specific Entry, currently we could only use funtions. If we could modify Entry data in broker side directly, we could save much network workload. But it's not a good way to let user deserialize Entry data to heap, we could try to consider how to modify Entry data in broker side with a efficiency way and reduce the GC workload on the broker. Maybe we could create another PIP for how to modify the data of entry, since this PIP is for entry filter.
The framework will not deserialize the entry. Whether it will deserialize it depends on the developer. Entry as a parameter is necessary, we can also be used in many other scenarios , for example: modify the data of a specific Entry
I aggreed. If we want to modify the data of a specific Entry, currently we could only use funtions. If we could modify Entry data in broker side directly, we could save much network workload. But it's not a good way to let user deserialize Entry data to heap, we could try to consider how to modify Entry data in broker side with a efficiency way and reduce the GC workload on the broker. Maybe we could create another PIP for how to modify the data of entry, since this PIP is for entry filter.
If I just need to filter based on the content of the message, and the developer can tolerate the CPU and memory usage, why not? It all depends on the developer's implementation
If I just need to filter based on the content of the message, and the developer can tolerate the CPU and memory usage, why not? It all depends on the developer's implementation
The Entry data here is in direct memory, if user call Entry.getData() in there implmentation, the Entry data will transfer to heap. If message traffic is high, broker heap may be full soon and GC will repleated in seconds.
We need to evaluate the impact carefully.
I recommend:
- Filtering should not introduce the message body, which can avoid the problem of end-to-end encryption unable to decrypt, and can solve the possible JVM GC problem.
- A special message header that has nothing to do with encryption is specially opened on the message header, which is used for actual filtering and reduces the filtering resource consumption (including memory and CPU) on the broker side.
Hi @315157973 for the doc side, I've discussed with @codelipenghui, he suggests creating an independent chapter message filter under Development and considering adding plugin and protocol handler sections under that chapter.

For adding some more context, there was a proposal in the past from @andrekramer1 , that was discussed in this dev mailing list thread: https://lists.apache.org/thread/k0hb33nz42f3603hvs03p7lmltjgj40z . I would prefer a standard feature approach in Pulsar instead of making it an optional plugin. The reason for this is to prevent fragmentation of Pulsar into a set of optional plugins which might or might not be available or installed.
The issue had no activity for 30 days, mark with Stale label.
I can't find the vote in the mailing list searching PIP-105.
@asafm good call! I can't find it either
These properties cannot be modified.
Can you elaborate on the reason for this please? For long-lived subscriptions on persistent topics being able to modify the properties could definitely be beneficial, as recreating the subscription could be quite cumbersome if done regularly.