pulsar icon indicating copy to clipboard operation
pulsar copied to clipboard

[improve][client]PIP-359:Support custom message listener executor for specific subscription

Open AuroraTwinkle opened this issue 1 year ago • 11 comments
trafficstars

PIP-359 Support custom message listener thread pool for specific subscription, avoid individual subscription listener consuming too much time leading to higher consumption delay in other subscriptions.

Motivation

In our scenario, there is a centralized message proxy service, this service will use the same PulsarClient instance to create a lot of subscription groups to consume many topics and cache messages locally. Then the business will pull messages from the cache of the proxy service. It seems that there is no problem, but during use, we found that when the message processing time of several consumer groups (listener mode) is very high, it almost affects all consumer groups responsible for the proxy service, causing a large number of message delays.

By analyzing the source code, we found that by default, all consumer instances created from the same PulsarClient will share a thread pool to process message listeners, and sometimes there are multiple consumer message listeners bound to the same thread. Obviously, when a consumer processes messages and causes long-term blocking, it will cause the messages of other consumers bound to the thread to fail to be processed in time, resulting in message delays. Therefore, for this scenario, it may be necessary to support specific a message listener thread pool with consumer latitudes to avoid mutual influence between different consumers.

Modifications

Support custom message listener thread pool for specific subscription.

Verifying this change

  • [x] Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • [ ] Dependencies (add or upgrade a dependency)
  • [x] The public API
  • [ ] The schema
  • [ ] The default values of configurations
  • [ ] The threading model
  • [ ] The binary protocol
  • [ ] The REST endpoints
  • [ ] The admin CLI options
  • [ ] The metrics
  • [ ] Anything that affects deployment

Documentation

  • [ ] doc
  • [ ] doc-required
  • [x] doc-not-needed
  • [ ] doc-complete

Matching PR in forked repository

PR in forked repository: https://github.com/AuroraTwinkle/pulsar/pull/1

AuroraTwinkle avatar Jun 06 '24 17:06 AuroraTwinkle

A simpler solution to your problem could be to add

ConsumerBuilder<T> messageListenerExecutor(java.util.concurrent.Executor executor);

The caller would be responsible of the life cycle of the Executor and it would be used only for this specific consumer.

lhotari avatar Jun 06 '24 19:06 lhotari

This isn't directly related to this feature request, but I thought that it might be useful to share.

There's "PIP-234: Support using shared thread pool across multiple Pulsar client instance" https://lists.apache.org/thread/5jw06hqlmwnrgvbn9lfom1vkwhwqwwd4 . This hasn't been implemented yet. This would be useful when there's an application that creates multiple Pulsar clients and would like to share the thread pools. This PIP could also be make to support sharing all other thread pools, but overriding the thread pool used for message listeners. However I think that the messageListenerExecutor for ConsumerBuilder would be a better match for your use case.

There's also a completely unrelated challenge when a single client is shared. It's about the rate limiting and backpressure handling. That is mentioned in the out-of-scope part of PIP-322. Pulsar will share the same set of connections across all producers and consumers. When a producer or consumer is rate limited, it will impact other producers and consumers using the same connection. One of the ways to mitigate this is to introduce a way to ensure that a producer or consumer gets isolated when it's explicitly specified. There could be isolatedConnection(boolean enabled) etc. in ConsumerBuilder and ProducerBuilder to cover this.

lhotari avatar Jun 06 '24 19:06 lhotari

By analyzing the source code, we found that by default, all consumer instances created from the same PulsarClient will share a thread pool to process message listeners, and sometimes there are multiple consumer message listeners bound to the same thread. Obviously, when a consumer processes messages and causes long-term blocking, it will cause the messages of other consumers bound to the thread to fail to be processed in time, resulting in message delays.

Yes, this is a problem. messages get queued in the executor thread pool as they arrive. In the current Pulsar client versions, one way to deal with the problem is to configure smaller receiver queue sizes for the lower priority queues and larger receiver queue sizes for the higher priority queues. Have you already tried this type of solution?

lhotari avatar Jun 06 '24 19:06 lhotari

By analyzing the source code, we found that by default, all consumer instances created from the same PulsarClient will share a thread pool to process message listeners, and sometimes there are multiple consumer message listeners bound to the same thread. Obviously, when a consumer processes messages and causes long-term blocking, it will cause the messages of other consumers bound to the thread to fail to be processed in time, resulting in message delays.

Yes, this is a problem. messages get queued in the executor thread pool as they arrive. In the current Pulsar client versions, one way to deal with the problem is to configure smaller receiver queue sizes for the lower priority queues and larger receiver queue sizes for the higher priority queues. Have you already tried this type of solution?

Yes, configuring receiver queue sizes seems to work, but in actual scenarios, it is difficult to accurately determine the priority of each consumer and set a reasonable queue size. In addition, this priority may change at any time as the business changes. Therefore, this solution is very difficult to operate and maintain and is not the best solution.

AuroraTwinkle avatar Jun 11 '24 08:06 AuroraTwinkle

This isn't directly related to this feature request, but I thought that it might be useful to share.

There's "PIP-234: Support using shared thread pool across multiple Pulsar client instance" https://lists.apache.org/thread/5jw06hqlmwnrgvbn9lfom1vkwhwqwwd4 . This hasn't been implemented yet. This would be useful when there's an application that creates multiple Pulsar clients and would like to share the thread pools. This PIP could also be make to support sharing all other thread pools, but overriding the thread pool used for message listeners. However I think that the messageListenerExecutor for ConsumerBuilder would be a better match for your use case.

There's also a completely unrelated challenge when a single client is shared. It's about the rate limiting and backpressure handling. That is mentioned in the out-of-scope part of PIP-322. Pulsar will share the same set of connections across all producers and consumers. When a producer or consumer is rate limited, it will impact other producers and consumers using the same connection. One of the ways to mitigate this is to introduce a way to ensure that a producer or consumer gets isolated when it's explicitly specified. There could be isolatedConnection(boolean enabled) etc. in ConsumerBuilder and ProducerBuilder to cover this.

Looks great, hope it can be implemented soon!

AuroraTwinkle avatar Jun 11 '24 09:06 AuroraTwinkle

Thanks for this proposal @AuroraTwinkle . I believe that this will be a very useful addition to the Pulsar client.

There are certain details of the Message Listener support in the Pulsar client that aren't handled currently which could lead to unnecessary duplicate messages. That's something that isn't directly related to the MessageListenerExecutor. The detail is related to how ack timeouts and nacks are handled. Those will result in the client requesting the broker to redeliver the messages. In the case of exclusive or failover subscription, all unacknowledged messages get redelivered. When message listener isn't used, there's a solution to filter out the previous in-flight messages so that the later arrived redeliveries get processed and so that duplicates are avoided. However when message listener is used, that solution isn't used or effective. This solution is PIP-84, added in PR https://github.com/apache/pulsar/pull/10478 . In the future, this solution would also have to cover the message listener. The way how it slightly relates to MessageListenerExecutor is that a Runnable queued into the MessageListenerExecutor might skip message processing when it gets executed if it has been cancelled. There isn't a need to actually cancel the task in the executor since the client can internally handle that. It's just that the MessageListenerExecutor cannot make assumptions that a message gets processed when the runnable executes.

lhotari avatar Jun 11 '24 14:06 lhotari

I added some further comments to simplify MessageListenerExecutor. The lifecycle is managed by the application that provides the instance and that's why there shouldn't be any details of the executor. Behind the MessageListenerExecutor there might be multiple different queues / thread pools. Passing the message instance will allow using the key or any other properties of the message to do decisions when it is needed, for example in an executor that uses a priority queue implementation.

This change will also need a PIP. Please check for other PIPs on the mailing list for an example https://lists.apache.org/[email protected] . You can find mailing list joining instructions at https://pulsar.apache.org/contact/ . The PIP template is at https://github.com/apache/pulsar/blob/master/pip/TEMPLATE.md . Before the PIP is accepted, we cannot merge implementation PRs.

Okay, thanks for your prompt reply, i will try my best!

AuroraTwinkle avatar Jun 11 '24 14:06 AuroraTwinkle

Thanks for this proposal @AuroraTwinkle . I believe that this will be a very useful addition to the Pulsar client.

There are certain details of the Message Listener support in the Pulsar client that aren't handled currently which could lead to unnecessary duplicate messages. That's something that isn't directly related to the MessageListenerExecutor. The detail is related to how ack timeouts and nacks are handled. Those will result in the client requesting the broker to redeliver the messages. In the case of exclusive or failover subscription, all unacknowledged messages get redelivered. When message listener isn't used, there's a solution to filter out the previous in-flight messages so that the later arrived redeliveries get processed and so that duplicates are avoided. However when message listener is used, that solution isn't used or effective. This solution is PIP-84, added in PR #10478 . In the future, this solution would also have to cover the message listener. The way how it slightly relates to MessageListenerExecutor is that a Runnable queued into the MessageListenerExecutor might skip message processing when it gets executed if it has been cancelled. There isn't a need to actually cancel the task in the executor since the client can internally handle that. It's just that the MessageListenerExecutor cannot make assumptions that a message gets processed when the runnable executes.

Okay, I need some time to fully understand it.

AuroraTwinkle avatar Jun 11 '24 15:06 AuroraTwinkle

@liangyepianzhou Please don't close and reopen PRs when that isn't needed. Primarily re-running in GitHub UI (for committers) and commenting "/pulsarbot rerun-failure-checks" on the PR should be used. Closing and reopening is fine when the previous PR build is over 3 days old and the cached artifacts have expired or when there's a specific need to pick up latest changes from master branch. If closing and reopening is used as the general approach for retrying, it will waste a lot of CI resources since all builds are re-run.

It's possible to continue iterating on the solution in builds in the forked repository without any limits. It looks like there's a PR at https://github.com/AuroraTwinkle/pulsar/pull/1, but GitHub Actions haven't been enabled for the fork. That step is currently missing from our guide since when the guide was written, GitHub Actions were enabled by default, also for forks.

In this case, this PR requires a PIP. It doesn't make sense to keep on running the jobs in apache/pulsar until the request is resolved and this PR is really ready for merging.

lhotari avatar Jun 12 '24 05:06 lhotari

@liangyepianzhou Please don't close and reopen PRs when that isn't needed. Primarily re-running in GitHub UI (for committers) and commenting "/pulsarbot rerun-failure-checks" on the PR should be used. Closing and reopening is fine when the previous PR build is over 3 days old and the cached artifacts have expired or when there's a specific need to pick up latest changes from master branch. If closing and reopening is used as the general approach for retrying, it will waste a lot of CI resources since all builds are re-run.

It's possible to continue iterating on the solution in builds in the forked repository without any limits. It looks like there's a PR at AuroraTwinkle#1, but GitHub Actions haven't been enabled for the fork. That step is currently missing from our guide since when the guide was written, GitHub Actions were enabled by default, also for forks.

In this case, this PR requires a PIP. It doesn't make sense to keep on running the jobs in apache/pulsar until the request is resolved and this PR is really ready for merging.

Okay, I have enabled GitHub Actions in my forked repository, afterwards, I will run the jobs in it before the request is resolved and this PR is really ready for merging.

AuroraTwinkle avatar Jun 12 '24 08:06 AuroraTwinkle

I added some further comments to simplify MessageListenerExecutor. The lifecycle is managed by the application that provides the instance and that's why there shouldn't be any details of the executor. Behind the MessageListenerExecutor there might be multiple different queues / thread pools. Passing the message instance will allow using the key or any other properties of the message to do decisions when it is needed, for example in an executor that uses a priority queue implementation.

This change will also need a PIP. Please check for other PIPs on the mailing list for an example https://lists.apache.org/[email protected] . You can find mailing list joining instructions at https://pulsar.apache.org/contact/ . The PIP template is at https://github.com/apache/pulsar/blob/master/pip/TEMPLATE.md . Before the PIP is accepted, we cannot merge implementation PRs.

Hi!@lhotari, excuse me, I have written PIP-359 for this, looking forward to your suggestions!

AuroraTwinkle avatar Jun 13 '24 12:06 AuroraTwinkle

/pulsarbot rerun-failure-checks

liangyepianzhou avatar Aug 05 '24 07:08 liangyepianzhou

Before merging, I'll close and reopen this PR to ensure that CI passes with latest changes in master branch.

lhotari avatar Aug 05 '24 09:08 lhotari

Codecov Report

Attention: Patch coverage is 92.85714% with 1 line in your changes missing coverage. Please review.

Project coverage is 74.10%. Comparing base (bbc6224) to head (178bda0). Report is 493 commits behind head on master.

Files Patch % Lines
...apache/pulsar/client/impl/ConsumerBuilderImpl.java 66.66% 0 Missing and 1 partial :warning:
Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #22861      +/-   ##
============================================
+ Coverage     73.57%   74.10%   +0.52%     
- Complexity    32624    33939    +1315     
============================================
  Files          1877     1919      +42     
  Lines        139502   150946   +11444     
  Branches      15299    17279    +1980     
============================================
+ Hits         102638   111852    +9214     
- Misses        28908    30521    +1613     
- Partials       7956     8573     +617     
Flag Coverage Δ
inttests 29.33% <35.71%> (+4.75%) :arrow_up:
systests 25.69% <42.85%> (+1.36%) :arrow_up:
unittests 73.24% <92.85%> (+0.39%) :arrow_up:

Flags with carried forward coverage won't be shown. Click here to find out more.

Files Coverage Δ
...va/org/apache/pulsar/client/impl/ConsumerBase.java 74.56% <100.00%> (+0.44%) :arrow_up:
...ar/client/impl/conf/ConsumerConfigurationData.java 92.63% <100.00%> (+0.07%) :arrow_up:
...apache/pulsar/client/impl/ConsumerBuilderImpl.java 85.77% <66.66%> (-1.17%) :arrow_down:

... and 492 files with indirect coverage changes

codecov-commenter avatar Aug 05 '24 09:08 codecov-commenter

@liangyepianzhou Did we make a decision to cherry-pick this to branch-3.0 ? PIPs don't get cherry-picked by default to maintenance branches.

lhotari avatar Sep 02 '24 12:09 lhotari

@liangyepianzhou Did we make a decision to cherry-pick this to branch-3.0 ? PIPs don't get cherry-picked by default to maintenance branches.

@lhotari This has been discussed on the mailing list as usual. https://lists.apache.org/thread/23mf4fs33075bn86h840g8x2c8vnzky4

liangyepianzhou avatar Sep 02 '24 12:09 liangyepianzhou

@liangyepianzhou Did we make a decision to cherry-pick this to branch-3.0 ? PIPs don't get cherry-picked by default to maintenance branches.

@lhotari This has been discussed on the mailing list as usual. https://lists.apache.org/thread/23mf4fs33075bn86h840g8x2c8vnzky4

thanks, it's good to have the URL of that discussion recorded here. This is addressed.

lhotari avatar Sep 02 '24 12:09 lhotari