[improve][pip] PIP-359: Support custom message listener executor for specific subscription
Motivation
PIP-359 Implementation PR:https://github.com/apache/pulsar/pull/22861
Modifications
Verifying this change
- [x] Make sure that the change passes the CI checks.
(Please pick either of the following options)
This change is a trivial rework / code cleanup without any test coverage.
(or)
This change is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
- Added integration tests for end-to-end deployment with large payloads (10MB)
- Extended integration test for recovery after broker failure
Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
- [ ] Dependencies (add or upgrade a dependency)
- [ ] The public API
- [ ] The schema
- [ ] The default values of configurations
- [ ] The threading model
- [ ] The binary protocol
- [ ] The REST endpoints
- [ ] The admin CLI options
- [ ] The metrics
- [ ] Anything that affects deployment
Documentation
- [ ]
doc - [ ]
doc-required - [x]
doc-not-needed - [ ]
doc-complete
Matching PR in forked repository
PR in forked repository:
Why can't this logic be written by the person writing the listener? I think it adds extra complexity to an already complicated client
Why can't this logic be written by the person writing the listener? I think it adds extra complexity to an already complicated client
Yeah, the solution you mentioned can also solve this problem. But on the contrary, I think providing a unified builder configuration would be easier to use.
IMO the surface area the client already exposes is way too big. It feels any person that wanted to scratch their own itch just added their feature without holding back. This needs to be maintained. In OTel they have a contrib repo where people can contribute their plugin . In your case it's contributing a listener implementation
IMO the surface area the client already exposes is way too big. It feels any person that wanted to scratch their own itch just added their feature without holding back. This needs to be maintained. In OTel they have a contrib repo where people can contribute their plugin . In your case it's contributing a listener implementation
You make a very valid point. We do indeed need a plugin library that allows contributors to customize Pulsar according to their own needs without introducing more complexity into Pulsar. Do you have the opportunity to promote this matter? If so, I think we can continue to push forward with this. And take this proposal as the first test case.
Otherwise, it might be better to first promote this optimization, so that those who need it can start using it.
I am very interested in the plugin library you mentioned, and perhaps later we can work together to promote its implementation in Pulsar. However, it would be unwise to block other optimizations from entering before that.
IMO the surface area the client already exposes is way too big. It feels any person that wanted to scratch their own itch just added their feature without holding back. This needs to be maintained. In OTel they have a contrib repo where people can contribute their plugin . In your case it's contributing a listener implementation
Great idea! Are you interested in building and promoting a similar contrib repo for Pulsar? This PIP may be a good opportunity to do this, and we can build it together. @asafm @liangyepianzhou
I'm currently working on my free time on pulsar as in my new company they don't even use pulsar. I can help review a pip to open such a repo. You can read OTel policy on how they handle it and use this as the basis for the contrib repo PIP. If you are also on free time we can try to work on this together but it'll be a lower pace
We also participate in this work in our spare time, and progress is indeed slower. However, if we can achieve something significant together, it would be a very cool thing.
@asafm Since we all agree that the plugin library is still a long way from being truly realized, it's best that we do not hinder the entry of other optimizations before that time. It would be best to let it continue along its original process, allowing those who need it to use this feature first.We can migrate some functionalities after the plugin library is completed. What do you think about it?
With the help of @liangyepianzhou I was able to fully figure out all the context required to understand what this PIP is all about. If I summarize it: When using a Consumer, you have two options to consume messages:
- Synchronously, by calling
consumer.recieve() - Asynchronously, by registering a
MessageListenerinterface, when building theConsumer. When this method is used, you can't also useconsumer.receive().
In the asynchronous way, the MessageListener instance is called by the consumer, hence it is doing that with a thread taken from its own internal ExecutorService (i.e. thread pool).
The problem comes when you build and use 2 consumers from the same PulsarClient. It so happens that those 2 consumers will share the same thread pool to call the Message Listeners. One can be slower from the other.
I suggest a different solution, if I may:
in the ConsumerBuilder there is the following method to register a MessageListener :
.messageListener(myMessageListener)
I suggest we expand it to:
.messageListener(myMessageListener, myExecutorService)
This will allow the user to provide a separate executor service (thread pool) to be used when executing myMessageListener for that consumer.
So summarizing my request for changes:
- Rewrite the motivation and background knowledge to include all relevant information (see my example above).
- Switch to a different API for providing that executor service.
With the help of @liangyepianzhou I was able to fully figure out all the context required to understand what this PIP is all about. If I summarize it: When using a Consumer, you have two options to consume messages:
- Synchronously, by calling
consumer.recieve()- Asynchronously, by registering a
MessageListenerinterface, when building theConsumer. When this method is used, you can't also useconsumer.receive().In the asynchronous way, the MessageListener instance is called by the consumer, hence it is doing that with a thread taken from its own internal ExecutorService (i.e. thread pool). The problem comes when you build and use 2 consumers from the same
PulsarClient. It so happens that those 2 consumers will share the same thread pool to call the Message Listeners. One can be slower from the other.I suggest a different solution, if I may:
in the
ConsumerBuilderthere is the following method to register aMessageListener:.messageListener(myMessageListener)I suggest we expand it to:
.messageListener(myMessageListener, myExecutorService)This will allow the user to provide a separate executor service (thread pool) to be used when executing
myMessageListenerfor that consumer.So summarizing my request for changes:
- Rewrite the motivation and background knowledge to include all relevant information (see my example above).
- Switch to a different API for providing that executor service.
Okay, good suggestions! I will update it later as suggested.Thank you all @asafm @liangyepianzhou
With the help of @liangyepianzhou I was able to fully figure out all the context required to understand what this PIP is all about. If I summarize it: When using a Consumer, you have two options to consume messages:
- Synchronously, by calling
consumer.recieve()- Asynchronously, by registering a
MessageListenerinterface, when building theConsumer. When this method is used, you can't also useconsumer.receive().In the asynchronous way, the MessageListener instance is called by the consumer, hence it is doing that with a thread taken from its own internal ExecutorService (i.e. thread pool). The problem comes when you build and use 2 consumers from the same
PulsarClient. It so happens that those 2 consumers will share the same thread pool to call the Message Listeners. One can be slower from the other.I suggest a different solution, if I may:
in the
ConsumerBuilderthere is the following method to register aMessageListener:.messageListener(myMessageListener)I suggest we expand it to:
.messageListener(myMessageListener, myExecutorService)This will allow the user to provide a separate executor service (thread pool) to be used when executing
myMessageListenerfor that consumer.So summarizing my request for changes:
- Rewrite the motivation and background knowledge to include all relevant information (see my example above).
- Switch to a different API for providing that executor service.
I have made the changes to the PIP as suggested, PTAL @asafm @liangyepianzhou Thanks!
The use case looks a little strange, why don't
Consumer<Long> consumer = pulsarClient.newConsumer(Schema.INT64)
.topic("persistent://public/default/my-topic")
.subscriptionName("my-subscription2")
.messageListener((consumer, message) -> {
// process message
consumer.acknowledgeAsync(message);
})
// Introduce a new `processExecutor` method
.processExecutor(Executors.newCachedExecutor(...))
.subscribe();
or
Consumer<Long> consumer = pulsarClient.newConsumer(Schema.INT64)
.topic("persistent://public/default/my-topic")
.subscriptionName("my-subscription2")
.messageListener((consumer, message) -> {
// process message
consumer.acknowledgeAsync(message);
// Add a new method that accepts `MessageListener, ExecutorService`
}, Executors.newCachedExecutor(...))
.subscribe();
The use case looks a little strange, why don't
Consumer<Long> consumer = pulsarClient.newConsumer(Schema.INT64) .topic("persistent://public/default/my-topic") .subscriptionName("my-subscription2") .messageListener((consumer, message) -> { // process message consumer.acknowledgeAsync(message); }) // Introduce a new `processExecutor` method .processExecutor(Executors.newCachedExecutor(...)) .subscribe();or
Consumer<Long> consumer = pulsarClient.newConsumer(Schema.INT64) .topic("persistent://public/default/my-topic") .subscriptionName("my-subscription2") .messageListener((consumer, message) -> { // process message consumer.acknowledgeAsync(message); // Add a new method that accepts `MessageListener, ExecutorService` }, Executors.newCachedExecutor(...)) .subscribe();
Agree! Using the Function to implement this feature does indeed reduce development costs while ensuring that partition messages are sent in an orderly manner. However, it increases the cost of use for the user.
The use case looks a little strange, why don't
Consumer<Long> consumer = pulsarClient.newConsumer(Schema.INT64) .topic("persistent://public/default/my-topic") .subscriptionName("my-subscription2") .messageListener((consumer, message) -> { // process message consumer.acknowledgeAsync(message); }) // Introduce a new `processExecutor` method .processExecutor(Executors.newCachedExecutor(...)) .subscribe();or
Consumer<Long> consumer = pulsarClient.newConsumer(Schema.INT64) .topic("persistent://public/default/my-topic") .subscriptionName("my-subscription2") .messageListener((consumer, message) -> { // process message consumer.acknowledgeAsync(message); // Add a new method that accepts `MessageListener, ExecutorService` }, Executors.newCachedExecutor(...)) .subscribe();
@dao-jun @liangyepianzhou Thanks for your suggestions! I have updated the usage example. And the following points make it impossible to simply pass the ExecutorService interface:
- For Key_Shared or Failover subscription, we need to ensure message order, if just pass the
ExecutorServicesimply , we cannot guarantee that messages with the same key or the same partition will be processed by the same thread(unless the passedExecutorServiceis a SingleThreadExecutor). - Also see this link: https://github.com/apache/pulsar/pull/22861#discussion_r1630111105
private void startConsumerWithMessageListener(String topic, String subscriptionName) throws PulsarClientException {
// for example: key_shared
MessageListenerExecutor keySharedExecutor = getKeySharedMessageListenerExecutor(subscriptionName);
Consumer<Long> keySharedconsumer =
pulsarClient.newConsumer(Schema.INT64)
.topic(topic)
.subscriptionName(subscriptionName)
// set and then message lister will be executed in the executor
.messageListener((c1, msg) -> {
log.info("Received message [{}] in the listener", msg.getValue());
c1.acknowledgeAsync(msg);
})
.messageListenerExecutor(keySharedExecutor)
.subscribe();
// for example: partition_ordered
MessageListenerExecutor partitionOrderedExecutor = getPartitionOrderdMessageListenerExecutor(subscriptionName);
Consumer<Long> partitionOrderedConsumer =
pulsarClient.newConsumer(Schema.INT64)
.topic(topic)
.subscriptionName(subscriptionName)
// set and then message lister will be executed in the executor
.messageListener((c1, msg) -> {
log.info("Received message [{}] in the listener", msg.getValue());
c1.acknowledgeAsync(msg);
})
.messageListenerExecutor(partitionOrderedExecutor)
.subscribe();
// for example: out-of-order
ExecutorService executorService = Executors.newFixedThreadPool(10);
Consumer<Long> outOfOrderConsumer =
pulsarClient.newConsumer(Schema.INT64)
.topic(topic)
.subscriptionName(subscriptionName)
// not set and then message lister will be executed in the default executor
.messageListener((c1, msg) -> {
log.info("Received message [{}] in the listener", msg.getValue());
c1.acknowledgeAsync(msg);
})
.messageListenerExecutor((message, runnable) -> executorService.execute(runnable))
.subscribe();
}
private static MessageListenerExecutor getKeySharedMessageListenerExecutor(String subscriptionName) {
ExecutorProvider executorProvider = new ExecutorProvider(10, subscriptionName + "listener-executor-");
return (message, runnable) -> {
byte[] key = "".getBytes(StandardCharsets.UTF_8);
if (message.hasKey()) {
key = message.getKeyBytes();
} else if (message.hasOrderingKey()) {
key = message.getOrderingKey();
}
// select a thread by message key to execute the runnable!
// that say, the message listener task with same order key
// will be executed by the same thread
ExecutorService executorService = executorProvider.getExecutor(key);
// executorService is a SingleThreadExecutor
executorService.execute(runnable);
};
}
private static MessageListenerExecutor getPartitionOrderdMessageListenerExecutor(String subscriptionName) {
ExecutorProvider executorProvider = new ExecutorProvider(10, subscriptionName + "listener-executor-");
return (message, runnable) -> {
// select a thread by partition topic name to execute the runnable!
// that say, the message listener task from the same partition topic
// will be executed by the same thread
ExecutorService executorService = executorProvider.getExecutor(message.getTopicName().getBytes());
// executorService is a SingleThreadExecutor
executorService.execute(runnable);
};
}
With the help of @liangyepianzhou I was able to fully figure out all the context required to understand what this PIP is all about. If I summarize it: When using a Consumer, you have two options to consume messages:
- Synchronously, by calling
consumer.recieve()- Asynchronously, by registering a
MessageListenerinterface, when building theConsumer. When this method is used, you can't also useconsumer.receive().In the asynchronous way, the MessageListener instance is called by the consumer, hence it is doing that with a thread taken from its own internal ExecutorService (i.e. thread pool). The problem comes when you build and use 2 consumers from the same
PulsarClient. It so happens that those 2 consumers will share the same thread pool to call the Message Listeners. One can be slower from the other. I suggest a different solution, if I may: in theConsumerBuilderthere is the following method to register aMessageListener:.messageListener(myMessageListener)I suggest we expand it to:.messageListener(myMessageListener, myExecutorService)This will allow the user to provide a separate executor service (thread pool) to be used when executingmyMessageListenerfor that consumer. So summarizing my request for changes:
- Rewrite the motivation and background knowledge to include all relevant information (see my example above).
- Switch to a different API for providing that executor service.
I have made the changes to the PIP as suggested, PTAL @asafm @liangyepianzhou Thanks!
After further discussion with @lhotari (see more discussions at https://github.com/apache/pulsar/pull/22861), for the second point, we both think that Builders usually configure each property separately, so I will stick to the previous interface! Thanks! @asafm @liangyepianzhou (Of course, what you said is also a good suggestion. Thanks again!)
I support the motivation, make MessageListener executor isolate is indeed a good point. But I hold a conservative attitude towards the implementation.
I support the motivation, make MessageListener executor isolate is indeed a good point. But I hold a conservative attitude towards the implementation.
I have explained why we cannot use simply java.util.concurrent.ExecutorService at https://github.com/apache/pulsar/pull/22902#issuecomment-2176107246, do you agree with this opinion?If not, please provide more detailed suggestions and I will try my best to make it better.
I understand you introduce MessageListenerExecutor is for the purpose of keep consume messages in ordering, but the point is the API design.
I had read the implementation PR, there are some points I think are strange:
- The MessageListener interface
public interface MessageListenerExecutor {
void execute(Message<?> message, Runnable runnable);
}
The interface is for the purpose of select an executor by message and execute runnable on it, right?
Why not just like the following pattern? Just expose message and get an executor is OK.
public interface MessageListenerExecutorSelector {
ExecutorService select(Message<?> message);
}
- No default implementations for
MessageListenerExecutor
Default implementations is an example, users could follow the pattern to develop their own extensions, and it can be used out of the box.
Others is OK
I understand you introduce
MessageListenerExecutoris for the purpose of keep consume messages in ordering, but the point is the API design.I had read the implementation PR, there are some points I think are strange:
- The MessageListener interface
public interface MessageListenerExecutor { void execute(Message<?> message, Runnable runnable); }The interface is for the purpose of select an executor by
messageand executerunnableon it, right? Why not just like the following pattern? Just exposemessageand get an executor is OK.public interface MessageListenerExecutorSelector { ExecutorService select(Message<?> message); }
- No default implementations for
MessageListenerExecutorDefault implementations is an example, users could follow the pattern to develop their own extensions, and it can be used out of the box.
Others is OK
Or
public interface MessageListenerExecutorSelector {
ExecutorService select(Message<?> message);
ExecutorService selectOrdering(Message<?> message);
}
If the subscriptionType is KeyShared, call selectOrdering, others call select;
I understand you introduce
MessageListenerExecutoris for the purpose of keep consume messages in ordering, but the point is the API design.I had read the implementation PR, there are some points I think are strange:
- The MessageListener interface
public interface MessageListenerExecutor { void execute(Message<?> message, Runnable runnable); }The interface is for the purpose of select an executor by
messageand executerunnableon it, right? Why not just like the following pattern? Just exposemessageand get an executor is OK.public interface MessageListenerExecutorSelector { ExecutorService select(Message<?> message); }
- No default implementations for
MessageListenerExecutorDefault implementations is an example, users could follow the pattern to develop their own extensions, and it can be used out of the box.
Others is OK
For the first point, I don't see any better point than now, on the contrary, the interface you mentioned has shortcomings:
The select method returns an ExecutorService instance, which may contain any number of threads, and the order is still not guaranteed.
For the second point, refer to the previous discussion:https://github.com/apache/pulsar/pull/22861#discussion_r1635850544
I understand you introduce
MessageListenerExecutoris for the purpose of keep consume messages in ordering, but the point is the API design. I had read the implementation PR, there are some points I think are strange:
- The MessageListener interface
public interface MessageListenerExecutor { void execute(Message<?> message, Runnable runnable); }The interface is for the purpose of select an executor by
messageand executerunnableon it, right? Why not just like the following pattern? Just exposemessageand get an executor is OK.public interface MessageListenerExecutorSelector { ExecutorService select(Message<?> message); }
- No default implementations for
MessageListenerExecutorDefault implementations is an example, users could follow the pattern to develop their own extensions, and it can be used out of the box. Others is OK
Or
public interface MessageListenerExecutorSelector { ExecutorService select(Message<?> message); ExecutorService selectOrdering(Message<?> message); }If the subscriptionType is KeyShared, call selectOrdering, others call select;
Also see:https://github.com/apache/pulsar/pull/22902#issuecomment-2176610042
Hi, Everyone! @lhotari @codelipenghui @asafm @liangyepianzhou @dao-jun @poorbarcode @Technoboy-
I have sent a voting email for PIP-359: https://lists.apache.org/thread/oo3jdvq3b6bv6p4n7x7sdvypw4gp6hpk
Thanks!
@AuroraTwinkle Please select only one documentation label in your PR description.
I have reviewed the comments made by @lhotari but I'm sorry I still don't understand.
ConsumerBuilder<T> messageListenerExecutor(MessageListenerExecutor messageListenerExecutor);
This is basically to "spill the guts/internals" of the way the customer is implemented. This is super hard to undetrstand for a normal user. I haven't seen anything like it in Kafka. Have you guys looked at how other messaging are doing it?
The reason I'm asking is that I think it's too complicated. I asked on the PR comments some questions to help clarify since I lack some background to try to come up with another idea.
You guys move to the vote phase way too soon. I mean you posted your comments 13 hours ago. I don't want to hold anyone back of course, but you're changing quite a substantial public API here. @lhotari I think we need at least one more set of eyes on this solution before we can proceed.
I have reviewed the comments made by @lhotari but I'm sorry I still don't understand.
ConsumerBuilder<T> messageListenerExecutor(MessageListenerExecutor messageListenerExecutor);This is basically to "spill the guts/internals" of the way the customer is implemented. This is super hard to undetrstand for a normal user. I haven't seen anything like it in Kafka. Have you guys looked at how other messaging are doing it?
@asafm I believe that this concern could be handled by improving the explanation of the concept and abstraction that this exposes. I agree that it's not a good practice to add abstractions that are implementation specific. This abstraction isn't implementation specific, but the problem is that 95% of the users don't need it. In a multi-topic consumer when a MessageListener is used, there might be a requirement to have different priorities how the messages for a specific topic are processed. It is up to the implementer how to implement this logic.
The reason I'm asking is that I think it's too complicated. I asked on the PR comments some questions to help clarify since I lack some background to try to come up with another idea.
That's a valid point. I think that we would need to come up with a way to add extensions to the Pulsar client without cluttering the basic use cases. 95% of Pulsar users will never need this messageListenerExecutor, but we'd like to make it possible to do this for the remaining 5% of users. Perhaps we should prioritize finding the way to add such extensions to the Pulsar client. .
You guys move to the vote phase way too soon. I mean you posted your comments 13 hours ago. I don't want to hold anyone back of course, but you're changing quite a substantial public API here. @lhotari I think we need at least one more set of eyes on this solution before we can proceed.
It's great to see that this PIP has interest. Even if someone is accepted with a vote doesn't mean that we cannot make changes later before releasing the feature. In this case, there's a clear need to solve the multi-topic consumer queuing problem when using a MessageListener. We have to find a way to address that problem.
One possible idea for avoiding cluttering the builder interfaces with more and more configuration options would be to support some type of extension pattern.
In this case, there could be this type of method on the ConsumerBuilder interface:
<E extends ConsumerBuilderExtension> E extensionBuilder(Class<E> customerBuilderExtensionClass);
then these interfaces:
interface MessageListenerExecutorConsumerBuilderExtension<T> extends ConsumerBuilderExtension<T> {
MessageListenerExecutorConsumerBuilderExtension<T> messageListenerExecutor(MessageListenerExecutor messageListenerExecutor);
}
interface ConsumerBuilderExtention {
ConsumerBuilder<T> parent();
}
then it would be possible to call
.extensionBuilder(MessageListenerExecutorConsumerBuilderExtension.class).messageListenerExecutor(....).parent().build()
Obviously this is more complexity, but it would address the problem of cluttering the main interfaces with configuration options that are only for very specific requirements.
One possible idea for avoiding cluttering the builder interfaces with more and more configuration options would be to support some type of extension pattern.
In this case, there could be this type of method on the ConsumerBuilder interface:
<E extends ConsumerBuilderExtension> E extensionBuilder(Class<E> customerBuilderExtensionClass);then these interfaces:
interface MessageListenerExecutorConsumerBuilderExtension<T> extends ConsumerBuilderExtension<T> { MessageListenerExecutorConsumerBuilderExtension<T> messageListenerExecutor(MessageListenerExecutor messageListenerExecutor); } interface ConsumerBuilderExtention { ConsumerBuilder<T> parent(); }then it would be possible to call
.extensionBuilder(MessageListenerExecutorConsumerBuilderExtension.class).messageListenerExecutor(....).parent().build()Obviously this is more complexity, but it would address the problem of cluttering the main interfaces with configuration options that are only for very specific requirements.
How do you think about this? @asafm @liangyepianzhou
Obviously this is more complexity, but it would address the problem of cluttering the main interfaces with configuration options that are only for very specific requirements.
Hi @lhotari, this is a truly innovative concept that showcases a profound level of technical proficiency and expertise. However, as you've mentioned, it does seem overly complex.
I suggest we revert to the initial design approach; I'm convinced that the first iteration of the API design was the most succinct and user-friendly. What are your thoughts? cc @asafm @AuroraTwinkle
pulsarClient.newConsumer().topic("topic").subscriptionName("subName")
.messageListener((consumer, msg) -> {}, new ExecutorProvider(10, "your_poolName"))
.subscribe();
The pip is approved with 5 binding votes and 1 non-binding vote and stay more than 72 hours. So merge it.