[fix][broker] Fix recentlyJoinedConsumers to address the out-of-order issue
Motivation
The Key_Shared subscription type has the following issues.
-
Key_Shared subscription has out-of-order cases because of the race condition of the recently joined consumers feature. Consider the following flow.
- Assume that the current read position is
1:6and the recently joined consumers is empty. - Called OpReadEntry#internalReadEntriesComplete from thread-1.
Then, the current read position is updated to
1:11(Messages from1:6to1:10have yet to be dispatched to consumers). - Called PersistentStickyKeyDispatcherMultipleConsumers#addConsumer from thread-2.
Then, the new consumer is stored to recently joined consumers with read position
1:11. - Called PersistentDispatcherMultipleConsumers#trySendMessagesToConsumers from thread-5.
Then, messages from
1:6to1:10are dispatched to consumers. From the recently joined consumers feature, the new consumer can receive messages from1:6to1:10. However, it is not expected. For example, if existing consumers have some unacked messages and disconnect, it causes out of order in some cases.
- Assume that the current read position is
-
Key_Shared subscription has a redundant process. The stuckConsumers feature was introduced from https://github.com/apache/pulsar/pull/7553 . However, it can't fix the issue entirely because it doesn't consider the range changes. After this commit, https://github.com/apache/pulsar/pull/10762 was introduced. It fixes the issue.
Modifications
-
Store last sent position instead of read position to recently joined consumers. Updating read position, then dispatching messages, and adding new consumer are not exclusive. I used the last send position to get a position without any new exclusion features.
-
Keep the last sent position per key. When I introduced the last sent position, I noticed a new concern. Consider the following flow.
- Assume that the entries has the following messages,
msg-1, key:key-a, position:1:1msg-2, key:key-a, position:1:2msg-3, key:key-a, position:1:3msg-4, key:key-b, position:1:4msg-5, key:key-b, position:1:5msg-6, key:key-b, position:1:6the dispatcher has two consumers (c1messagesForCis 1,c2messageForCis 1000), and the selector will returnc1ifkey-aandc2ifkey-b.
- Send
msg-1toc1andmsg-4-msg-6toc2.- So, the current last sent position is
1:6. c1never acknowledgemsg-1.
- So, the current last sent position is
- Add new consumer
c3, and the selector will returnc3ifkey-a. - Send
msg-2-msg-3toc3because1:2and1:3are less than the last sent position,1:6. - Disconnect
c1. - Send
msg-1toc3. Now,c3receives messages without expected order aboutkey-a.
To avoid this issue, I introduce the last sent positions feature.
- Assume that the entries has the following messages,
-
Remove redundant features due to the addition of the last sent positions feature.
-
Remove this condition because the last sent positions feature restricts part of messages. New consumers can receive any new messages with the message key that is not in the last sent position. https://github.com/apache/pulsar/blob/35e9897742b7db4bd29349940075a819b2ad6999/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java#L323-L336
-
Remove this behavior because the last sent positions has positions per key. If some key is stuck at a certain point in time, from that point, new consumers store the same information about the key to the
recentlyJoinedConsumers. https://github.com/apache/pulsar/blob/35e9897742b7db4bd29349940075a819b2ad6999/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java#L365-L399 -
Remove this behavior because this calculation is moved to LastSentPositions#compareToLastSentPosition. https://github.com/apache/pulsar/blob/35e9897742b7db4bd29349940075a819b2ad6999/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java#L400-L408
-
-
Remove the stuckConsumers feature.
-
Reconstruct some consumer stats fields related to Key_Shared. This is a breaking change. However, if this PR is merged, the existing field which is removed in this PR is no longer needed.
Verifying this change
- [ ] Make sure that the change passes the CI checks.
This change added tests and can be verified as follows:
- Added unit test to ensure the behavior of last sent positions feature works properly
Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
- [ ] Dependencies (add or upgrade a dependency)
- [ ] The public API
- [ ] The schema
- [ ] The default values of configurations
- [ ] The threading model
- [ ] The binary protocol
- [ ] The REST endpoints
- [ ] The admin CLI options
- [x] The metrics
- [ ] Anything that affects deployment
Documentation
- [ ]
doc - [x]
doc-required - [ ]
doc-not-needed - [ ]
doc-complete
Matching PR in forked repository
PR in forked repository: https://github.com/equanz/pulsar/pull/1
@poorbarcode Thank you for your clarification.
-
[1] Yes.
-
[2] Partially yes. However, I think the main point of this PR is the change in the timing of getting the position. https://github.com/apache/pulsar/blob/27880e42c03eabd02d3d5f93f97988f6d03a5d90/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java#L450-L458
Currently, the position is the
readPosition. ThereadPositionis updated when the read is completed, even if messages are not dispatched. In this PR, I will change the position to the last sent position.
@equanz
Currently, the position is the readPosition. The readPosition is updated when the read is completed, even if messages are not dispatched. In this PR, I will change the position to the last sent position.
Scenario-2
| time | process: add consumer |
process: delivery messages to client |
|---|---|---|
| 1 | read position is 1:6 |
|
| 2 | Add new consumer into the selector | |
| 3 | Read entries 1:6 ~ 1:10 complete |
|
| 4 | Set read position 1:11 |
|
| 5 | Add the new consumer into recently joined consumers | |
| 6 | (Highlight)The max read-position of the new consumer is 1:11, but the exact correct value is 1:6 |
|
| 7 | Choose consumer by the selector | |
| 8 | Delivery entries 1:6 ~ 1:10 to all consumers(includes old consumers and the new consumer) |
This scenario is what you are saying, right?[Q-1]
If yes[Q-1], we have two solutions to fix it:
- Use
last send positioninstead oflast read position(what this PR does) - Find a way to make the lock works for Scenario-1[scenario-1], and add a lock for
set read positionfor Scenario-2.
I also think solution-1 is better, but declaring the data structure of lastSentPosition as a Map is an improvement, right[Q-2]?
If yes[Q-2], could you split this PR into two PR: one typed "fix" and another one typed "improve"? And I think the PR which typed "improve" should be submitted with a PIP because the mechanism of Key_Shared dispatcher is so complicated now we need PIPs to trace the changes of it.
[scenario-1]: https://github.com/apache/pulsar/pull/20179#pullrequestreview-1400545109
@poorbarcode
-
[Q-1] Yes.
-
[Q-2] When I created this PR, I introduced Map structure because it was necessary. For more details, please see the description("2. Keep the last sent position per key."). I just noticed we could avoid introducing a Map structure if we implemented the last sent position feature, like the mark delete position and the individually deleted messages feature. In other words,
- The position is already scheduled to be sent.
- All positions less than or equal to it are already scheduled to be sent.
- Manage individually sent positions to update the position as expected.
Consider the following flow.
- Assume that the entries has the following messages,
msg-1, key:key-a, position:1:1msg-2, key:key-a, position:1:2msg-3, key:key-a, position:1:3msg-4, key:key-b, position:1:4msg-5, key:key-b, position:1:5msg-6, key:key-b, position:1:6the dispatcher has two consumers (c1messagesForCis 1,c2messageForCis 1000), and the selector will returnc1ifkey-aandc2ifkey-b.
- Send
msg-1toc1andmsg-4-msg-6toc2.- So, the current last sent position is
1:1and the individually sent positions is[[1:3, 1:6]](list of closed intervals. not list of list). c1never acknowledgemsg-1.
- So, the current last sent position is
scenario A
- Add new consumer
c3, and the selector will returnc3ifkey-a. - Can't send
msg-2-msg-3toc3because1:2, and1:3are greater than the last sent position,1:1. - Disconnect
c1. - Send
msg-1-msg-3toc3.
Now,c3receives messages with expected order aboutkey-a.
scenario B
c1messagesForCis back to 999.- Send
msg-2-msg-3toc1
- So, the current last sent position is
1:6, and the individually sent positions is[].
My primary motivation is not to improve Key_Shared but fix Key_Shared. Therefore, if the current approach is not accepted as a fix, I introduce it and remove the feature by Map per key.
@eolivelli
Thank you for your comments. I'll check it. By the way, do you have any comments about the following concern from @poorbarcode ?
By the way, if there are a huge number of keys, will the constructor LastSentPositions cost too much memory?
If the current approach is not approved, I create the next PR as in https://github.com/apache/pulsar/pull/20179#issuecomment-1539314184 instead of addressing your comments in this PR.
@poorbarcode
What do you think about the following comment? https://github.com/apache/pulsar/pull/20179#issuecomment-1539314184
- I think we can't split easily between fix and improvement in the current approach.
- For more details, please see the description("2. Keep the last sent position per key.").
- I think we need to implement a new feature like https://github.com/apache/pulsar/pull/20179#issuecomment-1539314184 to fix without introducing Map structure.
- My primary motivation is not to improve Key_Shared but fix Key_Shared.
Therefore, if the current approach is not approved as a fix, I introduce it and remove the feature by Map per key.
The pr had no activity for 30 days, mark with Stale label.
I opened a new PR https://github.com/apache/pulsar/pull/21953.
https://github.com/apache/pulsar/pull/21953 was merged.