pulsar icon indicating copy to clipboard operation
pulsar copied to clipboard

[fix][broker] Fix recentlyJoinedConsumers to address the out-of-order issue

Open equanz opened this issue 2 years ago • 6 comments

Motivation

The Key_Shared subscription type has the following issues.

  1. Key_Shared subscription has out-of-order cases because of the race condition of the recently joined consumers feature. Consider the following flow.

    1. Assume that the current read position is 1:6 and the recently joined consumers is empty.
    2. Called OpReadEntry#internalReadEntriesComplete from thread-1. Then, the current read position is updated to 1:11 (Messages from 1:6 to 1:10 have yet to be dispatched to consumers).
    3. Called PersistentStickyKeyDispatcherMultipleConsumers#addConsumer from thread-2. Then, the new consumer is stored to recently joined consumers with read position 1:11.
    4. Called PersistentDispatcherMultipleConsumers#trySendMessagesToConsumers from thread-5. Then, messages from 1:6 to 1:10 are dispatched to consumers. From the recently joined consumers feature, the new consumer can receive messages from 1:6 to 1:10. However, it is not expected. For example, if existing consumers have some unacked messages and disconnect, it causes out of order in some cases.
  2. Key_Shared subscription has a redundant process. The stuckConsumers feature was introduced from https://github.com/apache/pulsar/pull/7553 . However, it can't fix the issue entirely because it doesn't consider the range changes. After this commit, https://github.com/apache/pulsar/pull/10762 was introduced. It fixes the issue.

Modifications

  1. Store last sent position instead of read position to recently joined consumers. Updating read position, then dispatching messages, and adding new consumer are not exclusive. I used the last send position to get a position without any new exclusion features.

  2. Keep the last sent position per key. When I introduced the last sent position, I noticed a new concern. Consider the following flow.

    1. Assume that the entries has the following messages,
      • msg-1, key: key-a, position: 1:1
      • msg-2, key: key-a, position: 1:2
      • msg-3, key: key-a, position: 1:3
      • msg-4, key: key-b, position: 1:4
      • msg-5, key: key-b, position: 1:5
      • msg-6, key: key-b, position: 1:6 the dispatcher has two consumers (c1 messagesForC is 1, c2 messageForC is 1000), and the selector will return c1 if key-a and c2 if key-b.
    2. Send msg-1 to c1 and msg-4 - msg-6 to c2.
      • So, the current last sent position is 1:6.
      • c1 never acknowledge msg-1.
    3. Add new consumer c3, and the selector will return c3 if key-a.
    4. Send msg-2 - msg-3 to c3 because 1:2 and 1:3 are less than the last sent position, 1:6.
    5. Disconnect c1.
    6. Send msg-1 to c3. Now, c3 receives messages without expected order about key-a.

    To avoid this issue, I introduce the last sent positions feature.

  3. Remove redundant features due to the addition of the last sent positions feature.

    1. Remove this condition because the last sent positions feature restricts part of messages. New consumers can receive any new messages with the message key that is not in the last sent position. https://github.com/apache/pulsar/blob/35e9897742b7db4bd29349940075a819b2ad6999/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java#L323-L336

    2. Remove this behavior because the last sent positions has positions per key. If some key is stuck at a certain point in time, from that point, new consumers store the same information about the key to the recentlyJoinedConsumers. https://github.com/apache/pulsar/blob/35e9897742b7db4bd29349940075a819b2ad6999/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java#L365-L399

    3. Remove this behavior because this calculation is moved to LastSentPositions#compareToLastSentPosition. https://github.com/apache/pulsar/blob/35e9897742b7db4bd29349940075a819b2ad6999/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java#L400-L408

  4. Remove the stuckConsumers feature.

  5. Reconstruct some consumer stats fields related to Key_Shared. This is a breaking change. However, if this PR is merged, the existing field which is removed in this PR is no longer needed.

Verifying this change

  • [ ] Make sure that the change passes the CI checks.

This change added tests and can be verified as follows:

  • Added unit test to ensure the behavior of last sent positions feature works properly

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • [ ] Dependencies (add or upgrade a dependency)
  • [ ] The public API
  • [ ] The schema
  • [ ] The default values of configurations
  • [ ] The threading model
  • [ ] The binary protocol
  • [ ] The REST endpoints
  • [ ] The admin CLI options
  • [x] The metrics
  • [ ] Anything that affects deployment

Documentation

  • [ ] doc
  • [x] doc-required
  • [ ] doc-not-needed
  • [ ] doc-complete

Matching PR in forked repository

PR in forked repository: https://github.com/equanz/pulsar/pull/1

equanz avatar Apr 25 '23 05:04 equanz

@poorbarcode Thank you for your clarification.

  • [1] Yes.

  • [2] Partially yes. However, I think the main point of this PR is the change in the timing of getting the position. https://github.com/apache/pulsar/blob/27880e42c03eabd02d3d5f93f97988f6d03a5d90/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java#L450-L458

    Currently, the position is the readPosition. The readPosition is updated when the read is completed, even if messages are not dispatched. In this PR, I will change the position to the last sent position.

equanz avatar Apr 26 '23 08:04 equanz

@equanz

Currently, the position is the readPosition. The readPosition is updated when the read is completed, even if messages are not dispatched. In this PR, I will change the position to the last sent position.

Scenario-2

time process: add consumer process: delivery messages to client
1 read position is 1:6
2 Add new consumer into the selector
3 Read entries 1:6 ~ 1:10 complete
4 Set read position 1:11
5 Add the new consumer into recently joined consumers
6 (Highlight)The max read-position of the new consumer is 1:11, but the exact correct value is 1:6
7 Choose consumer by the selector
8 Delivery entries 1:6 ~ 1:10 to all consumers(includes old consumers and the new consumer)

This scenario is what you are saying, right?[Q-1]

If yes[Q-1], we have two solutions to fix it:

  • Use last send position instead of last read position(what this PR does)
  • Find a way to make the lock works for Scenario-1[scenario-1], and add a lock for set read position for Scenario-2.

I also think solution-1 is better, but declaring the data structure of lastSentPosition as a Map is an improvement, right[Q-2]?

If yes[Q-2], could you split this PR into two PR: one typed "fix" and another one typed "improve"? And I think the PR which typed "improve" should be submitted with a PIP because the mechanism of Key_Shared dispatcher is so complicated now we need PIPs to trace the changes of it.

[scenario-1]: https://github.com/apache/pulsar/pull/20179#pullrequestreview-1400545109

poorbarcode avatar May 04 '23 02:05 poorbarcode

@poorbarcode

  • [Q-1] Yes.

  • [Q-2] When I created this PR, I introduced Map structure because it was necessary. For more details, please see the description("2. Keep the last sent position per key."). I just noticed we could avoid introducing a Map structure if we implemented the last sent position feature, like the mark delete position and the individually deleted messages feature. In other words,

    • The position is already scheduled to be sent.
    • All positions less than or equal to it are already scheduled to be sent.
    • Manage individually sent positions to update the position as expected.

    Consider the following flow.

    1. Assume that the entries has the following messages,
      • msg-1, key: key-a, position: 1:1
      • msg-2, key: key-a, position: 1:2
      • msg-3, key: key-a, position: 1:3
      • msg-4, key: key-b, position: 1:4
      • msg-5, key: key-b, position: 1:5
      • msg-6, key: key-b, position: 1:6 the dispatcher has two consumers (c1 messagesForC is 1, c2 messageForC is 1000), and the selector will return c1 if key-a and c2 if key-b.
    2. Send msg-1 to c1 and msg-4 - msg-6 to c2.
      • So, the current last sent position is 1:1 and the individually sent positions is [[1:3, 1:6]] (list of closed intervals. not list of list).
      • c1 never acknowledge msg-1.

    scenario A

    1. Add new consumer c3, and the selector will return c3 if key-a.
    2. Can't send msg-2 - msg-3 to c3 because 1:2, and 1:3 are greater than the last sent position, 1:1.
    3. Disconnect c1.
    4. Send msg-1 - msg-3 to c3.
      Now, c3 receives messages with expected order about key-a.

    scenario B

    1. c1 messagesForC is back to 999.
    2. Send msg-2 - msg-3 to c1
    • So, the current last sent position is 1:6, and the individually sent positions is [].

    My primary motivation is not to improve Key_Shared but fix Key_Shared. Therefore, if the current approach is not accepted as a fix, I introduce it and remove the feature by Map per key.

equanz avatar May 09 '23 02:05 equanz

@eolivelli

Thank you for your comments. I'll check it. By the way, do you have any comments about the following concern from @poorbarcode ?

By the way, if there are a huge number of keys, will the constructor LastSentPositions cost too much memory?

If the current approach is not approved, I create the next PR as in https://github.com/apache/pulsar/pull/20179#issuecomment-1539314184 instead of addressing your comments in this PR.


@poorbarcode

What do you think about the following comment? https://github.com/apache/pulsar/pull/20179#issuecomment-1539314184

  • I think we can't split easily between fix and improvement in the current approach.
    • For more details, please see the description("2. Keep the last sent position per key.").
  • I think we need to implement a new feature like https://github.com/apache/pulsar/pull/20179#issuecomment-1539314184 to fix without introducing Map structure.
  • My primary motivation is not to improve Key_Shared but fix Key_Shared.
    Therefore, if the current approach is not approved as a fix, I introduce it and remove the feature by Map per key.

equanz avatar May 11 '23 07:05 equanz

The pr had no activity for 30 days, mark with Stale label.

github-actions[bot] avatar Jul 01 '23 02:07 github-actions[bot]

I opened a new PR https://github.com/apache/pulsar/pull/21953.

equanz avatar Jan 26 '24 01:01 equanz

https://github.com/apache/pulsar/pull/21953 was merged.

equanz avatar Aug 01 '24 11:08 equanz