pulsar icon indicating copy to clipboard operation
pulsar copied to clipboard

[Bug] Inconsistent markDeletePosition replication for geo-replicated shared subscriptions with delayed messages

Open tarmacmonsterg opened this issue 6 months ago • 6 comments

Search before reporting

  • [x] I searched in the issues and found nothing similar.

Read release policy

  • [x] I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker.

User environment

Pulsar: 4.0.4 official docker image Deployed on K8S

Issue Description

We have several topics in our Pulsar deployment. For some topics (cache-related), we have geo-replication disabled. Others work as expected — the subscription cursor is replicated to the backup cluster. However, we are seeing inconsistent behavior with topics used for delayed messages and shared subscriptions. These topics have geo-replication enabled and use individual acknowledgments. According to the documentation, individual acknowledgments themselves are not replicated across clusters. However, the markDeletePosition should be replicated. In our tests, we noticed that the markDeletePosition in the backup cluster does not move predictably. In some cases, it remains unchanged for a long time. The only time it eventually advances is after the primary cluster stops receiving new messages to that topic — and then, after a delay, the markDeletePosition is finally updated in the backup cluster.

First check stats-internal main cluster

"delayed_message_10_min" : {
      "markDeletePosition" : "2382448:33718",

backup cluster

    "delayed_message_10_min" : {
      "markDeletePosition" : "50797:509",

Second check main cluster

    "delayed_message_10_min" : {
      "markDeletePosition" : "2382448:40268",

backup cluster

    "delayed_message_10_min" : {
      "markDeletePosition" : "50797:509",

third check main cluster

    "delayed_message_10_min" : {
      "markDeletePosition" : "2382722:21942",

backup cluster

    "delayed_message_10_min" : {
      "markDeletePosition" : "50797:509",

and check after stop load tests and empty backlog in main cluster main

    "delayed_message_10_min" : {
      "markDeletePosition" : "2382761:11155",

backup

    "delayed_message_10_min" : {
      "markDeletePosition" : "54807:11139",

And i see one difference. In main clusters disappear individuallyDeletedMessages after stooping load test.

Error messages


Reproducing the issue

1.	Deploy two Pulsar clusters.
2.	Create the relevant topics.
3.	Configure geo-replication between the clusters.
4.	Enable subscription replication on the client.
5.	Start continuously producing delayed messages to the topic, with delivery delays of up to 10 minutes.
6.	On the primary cluster, consume messages selectively (based on delivery time).

Expected behavior: The markDeletePosition should advance on both the primary and the backup clusters.

Actual behavior: The markDeletePosition advances only on the primary cluster. On the backup cluster, a backlog accumulates and markDeletePosition remains stuck for a long time.

Additional information

Disscussion started here: https://apache-pulsar.slack.com/archives/C5Z4T36F7/p1748598297819549

Are you willing to submit a PR?

  • [ ] I'm willing to submit a PR!

tarmacmonsterg avatar Jun 04 '25 21:06 tarmacmonsterg

The only time it eventually advances is after the primary cluster stops receiving new messages to that topic — and then, after a delay, the markDeletePosition is finally updated in the backup cluster.

it seems that all suitable snapshots have been pushed out of the ReplicatedSubscriptionSnapshotCache (max size configurable with replicatedSubscriptionsSnapshotMaxCachedPerSubscription, default 10) before the mark delete position makes advances. That's what PR https://github.com/apache/pulsar/pull/24300 attempts to address.

These are the broker level configuration values for replicated subscriptions: https://github.com/apache/pulsar/blob/a1a2b363cfaa1bbc38933a742484a70a0a56e761/conf/broker.conf#L672-L679

The implementation level details of replicated subscription snapshots are described in PIP-33: Replicated subscriptions and there's a sequence diagram here: https://gist.github.com/lhotari/96fda511a70d7de93744d868b4472b92.

What most likely happens here is that when snapshots are created every 1 second by default, the snapshot cache will only keep the latest 10 snapshots in the cache. When the mark delete position advances on the primary cluster, it doesn't find a suitable snapshot in the cache and the impact of that is that the mark delete position state isn't updated to the backup cluster. Eventually when consumers catch up on the primary cluster, there will be a suitable mark delete position which then allows updating the mark delete position state to the backup cluster.

PR #24300 by @liudezhi2098 attempts to address the above issue by having a better way of evicting snapshots from the cache. Instead pushing out the oldest snapshots, it tries to keep snapshots in the cache so that when consumers make progress, there would be a suitable snapshot in the cache.

There has also been reports of problems with replicated subscriptions where the mark delete position doesn't get updated when producing has paused and only gets updated after new messages have been published and consumed. That's a different issue and most likely caused by the current logic where the snapshot is omitted if the most recent snapshot was completed after the most recent message was published. The condition is not correct since it should be based on the starting time of the most recent snapshot that was completed successfully. (In addition, the fix is more complicated since it would need to ignore marker messages for checking when the most recent message was published. Without that it would end up in an loop that doesn't stop.)

lhotari avatar Jun 05 '25 08:06 lhotari

Not directly related to this issue, but related to the use of a large amount of delayed messages: https://github.com/apache/pulsar/discussions/23990#discussioncomment-12233561 It's worth being aware that acknowledgement state could be lost when a topic is moved from one broker to another due to load balancing or broker restart if the configuration isn't tuned.

lhotari avatar Jun 06 '25 08:06 lhotari

Not directly related to this issue, but related to the use of a large amount of delayed messages: #23990 (reply in thread) It's worth being aware that acknowledgement state could be lost when a topic is moved from one broker to another due to load balancing or broker restart if the configuration isn't tuned.

We encountered this issue in production — and it hit us hard. As a result, we reworked the architecture for handling delayed messages and improved how the application deals with duplicates in such cases. Initially, a single topic could grow to hundreds of gigabytes. After refactoring the architecture, it’s now reduced to just a few gigabytes, thanks to proper and timely cleanup finally working.

tarmacmonsterg avatar Jun 06 '25 09:06 tarmacmonsterg

Not directly related to this issue, but related to the use of a large amount of delayed messages: #23990 (reply in thread) It's worth being aware that acknowledgement state could be lost when a topic is moved from one broker to another due to load balancing or broker restart if the configuration isn't tuned.

We encountered this issue in production — and it hit us hard. As a result, we reworked the architecture for handling delayed messages and improved how the application deals with duplicates in such cases. Initially, a single topic could grow to hundreds of gigabytes. After refactoring the architecture, it’s now reduced to just a few gigabytes, thanks to proper and timely cleanup finally working.

Thanks for sharing, @tarmacmonsterg. Just curious, how did you address the problem?

One solution that I've heard about is having multiple topics for delayed messages where there would be a topic for some "time bucket", grouping more messages together that get delivered together. On the consuming side a topic regex pattern would be used to consume from the topics that might be dynamic (one topic for each day, etc.) and get deleted automatically after they are empty. Another one is just about separating to different topics based on the duration of the delay. That works if there's a set of fixed duration delays and there's a limited amount of fixed delays that are used. There could be a higher level client library that handles this internally.

Acknowledgement state could also get lost or very large. I've recently added PRs https://github.com/apache/pulsar/pull/24392 and https://github.com/apache/pulsar/pull/24391 to address the problem that many of the related settings are missing from broker.conf and haven't been documented properly. There was also a setting which wasn't configurable at all.

lhotari avatar Jun 06 '25 12:06 lhotari

Thanks for sharing, @tarmacmonsterg. Just curious, how did you address the problem?

Initially, we had delayed topics split by domain. In some of these topics, there were messages scheduled for delivery in 2–3 months, which blocked ledger cleanup and triggered a full re-read of all messages stored in the ledgers after broker restart.

To prevent this behavior, we reorganized the messages into topics based on their delivery time: • 10 minutes • 1 hour • 1 day • 1 month • infinity

Since most of our messages fall within the 10-minute range, cleanup now occurs in a timely manner, and restarts no longer lead to re-reading large volumes of messages.

However, a restart still resets the cursor state, requiring already-read messages to be read again. To handle this, the application skips duplicates.

tarmacmonsterg avatar Jun 06 '25 13:06 tarmacmonsterg

To prevent this behavior, we reorganized the messages into topics based on their delivery time: • 10 minutes • 1 hour • 1 day • 1 month • infinity

Since most of our messages fall within the 10-minute range, cleanup now occurs in a timely manner, and restarts no longer lead to re-reading large volumes of messages.

@tarmacmonsterg Makes sense, that's a solution that I've heard that has been used elsewhere as well.

However, a restart still resets the cursor state, requiring already-read messages to be read again. To handle this, the application skips duplicates.

You might want to take a look at #24392 and #24391 to address this problem as well as the discussion https://github.com/apache/pulsar/discussions/23990 .

lhotari avatar Jun 06 '25 13:06 lhotari