[improve][broker] optimize the problem of subscription snapshot cache not hitting
Motivation
- When message acknowledgment confirmation is slower than message consumption rate, subscription cursor synchronization fails to complete. This occurs because:
- Current Behavior
- With large receiver queues (e.g.,
receiverQueueSize=1000), the cursor never synchronizes
Consumer<String> consumer = client.newConsumer(Schema.STRING) .topic(topic) .subscriptionName("sub") .receiverQueueSize(1000) .subscribe(); while (true) { Message<String> msg = consumer.receive(); consumer.acknowledge(msg); Thread.sleep(100); }- With small queues (e.g.,
receiverQueueSize=1), synchronization works properly
Consumer<String> consumer = client.newConsumer(Schema.STRING) .topic(topic) .subscriptionName("sub") .receiverQueueSize(1) .subscribe(); while (true) { Message<String> msg = consumer.receive(); consumer.acknowledge(msg); Thread.sleep(100); } - With large receiver queues (e.g.,
- Root Cause:
- The SnapshotCache updates too aggressively
- When advancedMarkDeletePosition executes, valid cache entries are frequently unavailable
- Current eviction strategy doesn't account for periodic synchronization needs https://github.com/apache/pulsar/blob/7be22eb2b23057bd5e09c361a43d6ccdcc0c8afd/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCache.java#L44C4-L59C6
Modifications
- Cache Update Strategy:
- modified the cache to maintain mapping relationships for remote clusters.
- Eviction Policy Enhancement:
- When cache reaches capacity (maxSnapshotToCache):
- Allow subsequent snapshots to be added through periodic dynamic adjustment
- The latest snapshot is used to replace the intermediate snapshot of the cache, and the update becomes slower as the difference between the latest snapshot time and the Mark Delete Position time increases.
- When cache reaches capacity (maxSnapshotToCache):
Verifying this change
- [x] Make sure that the change passes the CI checks.
(Please pick either of the following options)
This change is a trivial rework / code cleanup without any test coverage.
(or)
This change is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
- Added integration tests for end-to-end deployment with large payloads (10MB)
- Extended integration test for recovery after broker failure
Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
- [ ] Dependencies (add or upgrade a dependency)
- [ ] The public API
- [ ] The schema
- [ ] The default values of configurations
- [ ] The threading model
- [ ] The binary protocol
- [ ] The REST endpoints
- [ ] The admin CLI options
- [ ] The metrics
- [ ] Anything that affects deployment
Documentation
- [ ]
doc - [ ]
doc-required - [x]
doc-not-needed - [ ]
doc-complete
Matching PR in forked repository
PR in forked repository:
- When message acknowledgment confirmation is slower than message consumption rate, subscription cursor synchronization fails to complete.
@liudezhi2098 Regarding this scenario, is there a way to find out this happens?
Does the metric pulsar_replicated_subscriptions_timedout_snapshots added in #22381 help detecting problems?
Current Behavior
- With large receiver queues (e.g.,
receiverQueueSize=1000), the cursor never synchronizes
@liangyepianzhou Do you have a separate repro app where this could be observed with real brokers (let's say 2 Pulsar broker within docker-compose + some test app)? Creating a separate Git repo for such a repro would be one approach to share it. Having a runnable repro makes things easier for reviewers too.
- The SnapshotCache updates too aggressively
- When advancedMarkDeletePosition executes, valid cache entries are frequently unavailable
@liudezhi2098 One thought here is that perhaps there could be interaction between ReplicatedSubscriptionsController and all ReplicatedSubscriptionSnapshotCache instances? Could there be a solution that when "updates too aggressively" that there's a solution in place that a snapshot would be completed every replicatedSubscriptionsSnapshotFrequencyMillis.
Since the ReplicatedSubscriptionSnapshotCache is an internal interface, we don't need to keep it as a "cache". It's possible that it doesn't make sense in the revisited solution.
Do you have a chance to try something in this area instead since I don't think that using publishTime in the solution makes sense.
Since the
publishTimeinformation comes from Pulsar client, the logic could be brittle. In Pulsar, there's also an optionalbrokerPublishTimewhich is not enabled by default (requires specific configuration in all brokers).Have you considered what could happen when
publishTimevalues aren't in sync?I've understood that the "PIP-33: Replicated subscriptions" algorithm rely on vector clocks so that clock sync doesn't become a problem. (earlier discussion) The snapshots are the way how the vector clocks are synchronized, at least that's how I interpret it from one view point.
The changes in this PR don't currently make sense to me, mainly due to the use of
publishTime.I'd assume that in your problem scenario, the correct approach would be to tune
replicatedSubscriptionsSnapshotFrequencyMillis,replicatedSubscriptionsSnapshotTimeoutSecondsandreplicatedSubscriptionsSnapshotMaxCachedPerSubscriptionvalues.https://github.com/apache/pulsar/blob/825b862d8674f32ffbc13f10c17611ba394bfcc9/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1444-L1457
Have you already done this?
Currently it is a problem that it's necessary to tune the values to fix issues and it's also hard to notice that the problem is occurring.
It looks like future improvements are needed too.
@lhotari The generation of snapshots is completed through the exchange of snapshotRequest and snapshotResponse between two clusters. Ultimately, the ReplicatedSubscriptionsController writes Marker messages, and using publishTime is reliable because this behavior occurs within the same broker.
Of course, the topic may be transferred to another broker, but this is a low-frequency scenario, and its publishTime will not exhibit continuous jumps.
However, we can adopt a simpler approach that doesn't require using publishTime. Instead, we can record the current system time each time the snapshotCache,but there is a flaw that it cannot truly reflect the time difference between two messages. In some scenarios, it will cause the cache update frequency to decrease.
When looking at the current master branch code in ReplicatedSubscriptionSnapshotCache.addNewSnapshot, I'd assume that a potential solution to the problem could be that the current mark delete position is taken into account before purging entries.
It looks like the problem arrises when there's isn't at least one entry that is older than the current mark delete position in the cache.
I'd suggest to revisit the purging logic in this way:
modify
addNewSnapshotand add a 2nd parameter which is the current mark delete positionalways keep the newest entry that is before the current mark delete position when purging entries, all older entries can be purged
if the cache remains full after after doing this, remove a single entry in the cache so that the new entry could be added.
- keep the position of the last removed entry so that it's possible to continue the purging algorithm in subsequent calls
- if there's no previous last removed entry, purge the 2nd entry (assuming that the first entry is the newest entry before current mark delete position)
- if there's a previous last removed entry, continue purging from next entry after the last removed position by first skipping one entry and then removing the 2nd entry
- if there's no more entries to remove, start removing from the beginning.
This purging logic should always result in making it possible to add a new entry. Since every 2nd entry is removed, it will result in "sampling" so that when the mark delete position finally advances, it advances to the most recent position.
There's a possibility to increase
replicatedSubscriptionsSnapshotMaxCachedPerSubscriptionparameter to improve the resolution, if that's desirable.Perhaps the intention of your timestamp based approach is already to achieve something similar?
@lhotari The intention of timestamp based is to achieve this purpose,the key is when cache is full, how to update the cache, there is no perfect algorithm to solve this problem.
I recommend using Median-based eviction for simplicity, try to make the cache an arithmetic progression in time, because for the shared mode subscription, there will be individual unconfirmed messages, presenting a very jumpy situation.
The intention of timestamp based is to achieve this purpose,the key is when cache is full, how to update the cache, there is no perfect algorithm to solve this problem.
I recommend using Median-based eviction for simplicity, try to make the cache an arithmetic progression in time, because for the shared mode subscription, there will be individual unconfirmed messages, presenting a very jumpy situation.
You are right about this. The added comments in the code make it easier to understand the intention of the logic. My previous comment about taking the mark deletion position into account in adding snapshots didn't make much sense after rethinking. I'll review again.
I recommend using Median-based eviction for simplicity, try to make the cache an arithmetic progression in time, because for the shared mode subscription, there will be individual unconfirmed messages, presenting a very jumpy situation.
I don't see how the median based eviction could make sense. After the cache is filled up, when the median entry is removed and a new entry is added, and this repeats, the result will be that only entries after the median entry will be evicted (assuming no other events happen in between). Eventually there will be a large gap between the 2 entries in the middle.
Since time is already considered in the algorithm, it seems that an alternative approach would be to evict the entry with the shortest time distance to it's adjacent entries. Makes sense?
I recommend using Median-based eviction for simplicity, try to make the cache an arithmetic progression in time, because for the shared mode subscription, there will be individual unconfirmed messages, presenting a very jumpy situation.
I don't see how the median based eviction could make sense. After the cache is filled up, when the median entry is removed and a new entry is added, and this repeats, the result will be that only entries after the median entry will be evicted (assuming no other events happen in between). Eventually there will be a large gap between the 2 entries in the middle.
Since time is already considered in the algorithm, it seems that an alternative approach would be to evict the entry with the shortest time distance to it's adjacent entries. Makes sense?
@liudezhi2098 Just wondering if you are fine with the provided feedback on this PR? It would be great to address this issue in replicated subscriptions and get this PR to completion.
@liudezhi2098 Are you planning to continue working on this? I think that this is a really great improvement to address a long time issue with replicated subscriptions.
@liudezhi2098 There's also a long-standing issue #10054 which is addressed by #16651. I have updated PR 16651 description, rebased it and revisited it slightly. Please review
@liudezhi2098 Are you planning to continue working on this? I think that this is a really great improvement to address a long time issue with replicated subscriptions.
yes , I will continue working on this
@liudezhi2098 Any updates on this PR?
@liudezhi2098 Any updates on this PR?