pulsar
pulsar copied to clipboard
[improve][broker] Add limits for Key_Shared Subscription look ahead in dispatching
Fixes #23200
Motivation
This PR introduces enhancements to the Key_Shared subscription dispatching logic in the Pulsar broker. The changes aim to optimize message dispatching, improve the handling of slow consumers, and address several identified issues related to message replay and message "look ahead" when consumers are blocked in delivery due to key ordering restrictions.
This PR adds limits to the Key_Shared Subscription look-ahead feature, which was introduced in PR #7105. Additionally, there are several improvements to the previous logic.
Since there wasn't any named concept for the feature added in PR #7105, it has been named "key shared look ahead" in this PR. This references the feature added in PR #7105, where the dispatcher will skip replaying messages and read more messages from the backlog. The changes in PR #7105 didn't add a limit for reading more messages. The comments in the PR mention that it will be limited by the unacked message limits. However, this isn't the case since the read messages aren't considered unacked messages until they have been dispatched to consumers.
Modifications
-
Configuration Properties:
- Added new configuration properties to control the limits of Key_Shared subscriptions look ahead in dispatching:
keySharedLookAheadMsgInReplayThresholdPerConsumerkeySharedLookAheadMsgInReplayThresholdPerSubscriptionkeySharedLookAheadEnabledWhenRecentlyJoinedConsumersExist
- Added new configuration properties to control the limits of Key_Shared subscriptions look ahead in dispatching:
-
Broker Configuration:
- Updated
broker.confto include descriptions and default values for the new configuration properties.
- Updated
-
Key_Shared Subscription Logic:
- Implement the key shared look-ahead limits that are configured with the above configuration properties.
- Enhance the dispatching to ensure efficient dispatching:
- Instead of filtering the list of entries pulled from the message redelivery controller, refactor the solution to take a function that can filter the entries until there's a sufficient amount of messages for dispatching.
- This refactoring eliminates the previous method
filterOutEntriesWillBeDiscardedwhile preserving the previous logic of filtering.
- This refactoring eliminates the previous method
- Remove the unnecessary thread-local fields
localGroupedEntriesand the unusedlocalGroupedPositions.- It is unnecessary to eliminate the creation of short-lived objects in modern JVMs. It just clutters the code without any performance benefit. "No garbage" style is relevant mainly for long-lived objects stored in caches or large array allocations.
- Instead of filtering the list of entries pulled from the message redelivery controller, refactor the solution to take a function that can filter the entries until there's a sufficient amount of messages for dispatching.
- Replace
getMessagesToReplayNow(1)withgetFirstPositionInReplay().- This check will no longer have the side effect of pulling messages from delayed delivery when it's called.
- Add a new method
canReplayMessagesto make the look-ahead logic more explicit. The previous solution of returning an empty set from thegetMessagesToReplayNowmethod made it hard to understand how the look-ahead behavior is implemented. - Rename the
isDispatcherStuckOnReplaysfield toskipNextReplayToTriggerLookAheadto make the meaning of the field explicit and self-descriptive. - Rename the
keyNumbersvariable toremainingConsumersToFinishSendingso that the meaning of the variable is explicit and self-descriptive. - Rename the
groupedEntriesvariable toentriesByConsumerForDispatchingso that the meaning of the variable is explicit and self-descriptive. - Rename the
entriesWithSameKeyvariable toentriesForConsumerso that the variable is self-descriptive. - Calculate permits for batch messages with the average number of messages and take the permits into account for initial filtering in replay.
Documentation
- [ ]
doc - [x]
doc-required - [ ]
doc-not-needed - [ ]
doc-complete
This PR is ready for initial review feedback. I'm looking into ways of adding tests for the key_shared look ahead limits.
Tomorrow I'll continue and add tests and fix broken tests. I'd appreciate any reviews along the way.
There are some test failures. Trying to understand what org.apache.pulsar.client.api.KeySharedSubscriptionTest#testCheckBetweenSkippingAndRecentlyJoinedConsumers is testing. https://github.com/apache/pulsar/blob/dccc06bf50bb5ca510b39167908c02d2b4602ca5/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java#L1235-L1333 "Test for guaranteed message ordering in corner cases" says the PR. @equanz Do you remember which corner case this test method is for?
UPDATE: I got the test passing after fixing the permit issue.
Found the cause of SimpleProducerConsumerTest#testMaxUnAckMessagesLowerThanPermits failure. It's about available permits handling. The previous implementation before this PR ignored the batch index count so I skipped the logic. There's a lot of duplication required at the moment, so it becomes a broader refactoring to address this properly.
The remaining part is to add tests that reproduce the issues that this PR is addressing.
"Test for guaranteed message ordering in corner cases" says the PR. @equanz Do you remember which corner case this test method is for?
https://github.com/apache/pulsar/pull/23231#issuecomment-2321233767
This test verifies that the issue referred to in PIP-282 as [issue-2] is not reproduced.
One of the details that I'll have to address is the fact that the Pulsar client consumer sends permits for entries/batches and not individual messages. This detail isn't accurately described in the Pulsar binary protocol specification.
Codecov Report
Attention: Patch coverage is 91.53226% with 21 lines in your changes missing coverage. Please review.
Project coverage is 74.67%. Comparing base (
bbc6224) to head (c9e72c4). Report is 1090 commits behind head on master.
Additional details and impacted files
@@ Coverage Diff @@
## master #23231 +/- ##
============================================
+ Coverage 73.57% 74.67% +1.10%
- Complexity 32624 33933 +1309
============================================
Files 1877 1929 +52
Lines 139502 145237 +5735
Branches 15299 15894 +595
============================================
+ Hits 102638 108459 +5821
+ Misses 28908 28529 -379
- Partials 7956 8249 +293
| Flag | Coverage Δ | |
|---|---|---|
| inttests | 27.83% <44.75%> (+3.25%) |
:arrow_up: |
| systests | 24.72% <44.75%> (+0.39%) |
:arrow_up: |
| unittests | 74.02% <91.53%> (+1.18%) |
:arrow_up: |
Flags with carried forward coverage won't be shown. Click here to find out more.
| Files with missing lines | Coverage Δ | |
|---|---|---|
| ...org/apache/pulsar/broker/ServiceConfiguration.java | 98.95% <100.00%> (-0.44%) |
:arrow_down: |
| ...ervice/persistent/MessageRedeliveryController.java | 95.83% <100.00%> (+0.83%) |
:arrow_up: |
| ...ulsar/utils/ConcurrentBitmapSortedLongPairSet.java | 95.45% <100.00%> (+1.01%) |
:arrow_up: |
| ...sistent/PersistentDispatcherMultipleConsumers.java | 75.10% <87.50%> (+0.78%) |
:arrow_up: |
| ...ersistentStickyKeyDispatcherMultipleConsumers.java | 86.18% <91.37%> (+0.54%) |
:arrow_up: |
:rocket: New features to boost your workflow:
- :snowflake: Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
- :package: JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.
One of the details that I'll have to address is the fact that the Pulsar client consumer sends permits for entries/batches and not individual messages. This detail isn't accurately described in the Pulsar binary protocol specification.
I noticed that the calculations based on available permits and available unacknowledged messages were wrong for both Shared and Key_Shared subscriptions. The fixes are included in this PR for both subscription types. Tests break unless the fixes are made for Key_Shared subscriptions and since the fix is made for Key_Shared, it should also be applied for Shared subscriptions. This does show a lack of proper test coverage to catch regressions in this area so it's not the most optimal solution to roll the changes in this PR.
The remaining part is to add tests that reproduce the issues that this PR is addressing.
I had the goal to add some system level tests, but that seems hard. I'll try to add some tests to verify the behavior by adding tests to the existing Key_Shared test suite which uses partial mocks.
Please do not merge this PR fast, I will finish my review tomorrow
Thanks @poorbarcode. The tests are also pending. I'm planning to finish them tomorrow and hopefully your feedback will be available for addressing at that time too.
@lhotari
And an important comment here: https://github.com/apache/pulsar/pull/23231#discussion_r1746465968
Noticing more challenges to cover while getting into the tests. Setting this to draft mode until the concerns are addressed and tests have been added. Some of the concerns are related to #23264.
@lhotari
-
Three weeks ago: I submitted a Request Change -
Two weeks ago, you left a comment: Noticing more challenges to cover while getting into the tests. Setting this to draft mode until the concerns are addressed and tests have been added. Some of the concerns are related to https://github.com/apache/pulsar/issues/23264. -
5 days ago: you dismissed the Request Change, and left a comment: "Outdated comment"
Between step-2 and step-3, you never marked it as available to review and never ask to review again, then you merged the PR :joy:
I'm sorry @poorbarcode. Please let me explain the misunderstanding.
5 days ago: you dismissed the Request Change, and left a comment: "Outdated comment"
For me, the "request for changes" comment was "Please do not merge this PR fast, I will finish my review tomorrow" (https://github.com/apache/pulsar/pull/23231#pullrequestreview-2283712399). That's why I dismissed it with "Outdated comment". I had already addressed also the other review comments you had made.
Between
step-2andstep-3, you never marked it as available to review and never ask to review again, then you merged the PR 😂
I'm sorry for that. I did request a review, but you didn't respond. You can check the timeline. The PR was in draft mode for some time, but the reason for that was that the PR was already approved and I didn't want anyone to merge it. I didn't realize that you wouldn't be reviewing it because of the draft status.
The changes in this PR aren't final, if you have review feedback, it's possible also address that after the PR has been merged. Please add comments to this PR of any concerns you may have.
@lhotari
Sure, thanks for replying this comment. ❤️