pulsar
pulsar copied to clipboard
[refactor][ml] Replace cache eviction algorithm with centralized removal queue and job
Motivation
This PR fixes fundamental inefficiencies and correctness issues in the current Pulsar broker entry cache eviction algorithm. The current implementation has flawed size-based eviction that doesn't remove the oldest entries and incorrect timestamp-based eviction with high CPU overhead. These fixes ensure that size-based eviction properly removes the oldest entries and timestamp-based eviction works correctly. Additionally, this PR serves as a foundation for future improvements to efficiently handle catch-up reads and Key_Shared subscription scenarios.
Mailing list discussion about this PR: https://lists.apache.org/thread/ddzzc17b0c218ozq9tx0r3rx5sgljfb0
Problems with the Current Broker Entry Cache Implementation
-
Size-Based Eviction doesn't remove oldest entries: The existing
EntryCacheDefaultEvictionPolicyuses an algorithm for keeping the cache size under the limit but cannot guarantee removal of the oldest entries from the cache. The algorithm:- Sorts caches by size in descending order
- Selects caches representing a percentage of total size (
PercentOfSizeToConsiderForEviction, default0.5) - Attempts to evict proportional amounts from each selected cache
- This approach doesn't ensure that the oldest entries are removed, leading to inefficient cache utilization
-
Inefficient and Incorrect Timestamp-Based Eviction: The current timestamp eviction has both performance and correctness issues:
- Performance problems:
- Iterates through ALL cache instances (there's an instance for each persistent topic / ManagedLedgerImpl)
- Causes remarkable CPU and memory pressure due to constant frequent iterations when there's a high number of topics running with high throughput.
- Runs every 10 milliseconds by default (
managedLedgerCacheEvictionIntervalMs=10) - 100 times per second!
- Correctness problems:
- Assumes entries are ordered by both position and timestamp, which breaks when:
- Cache is used for catch-up reads (backlogged cursors)
- Individual entries are cached out of order
- Multiple read patterns (tailing + catch-up) access the same cache simultaneously
- Assumes entries are ordered by both position and timestamp, which breaks when:
- Performance problems:
-
Limited Cache Scope: The original
RangeCachewas designed for tailing reads. Later changes added support for backlogged cursors, but the eviction algorithms weren't updated to handle mixed read patterns effectively. -
Unnecessary Complexity: Generic type parameters in
RangeCacheadd complexity without providing value, as the cache is only used for entry storage.
Modifications
1. Centralized Removal Queue (RangeCacheRemovalQueue)
- Single-threaded eviction: All cache evictions are handled by one thread to avoid contention
- Insertion-order tracking: Uses
MpscUnboundedArrayQueueto maintain entry insertion order - Accurate timestamp eviction: Entries are processed in insertion order, making timestamp-based eviction reliable
- Efficient size-based eviction: Oldest entries are removed first when freeing cache space
2. Simplified Cache Implementation
- Removed generics: Dropped unnecessary type parameters from
RangeCacheto reduce complexity - Unified eviction handling: All cache instances use the same central removal queue
- Improved consistency: Race conditions in eviction are minimized through centralized processing
3. Foundation for Future Improvements
The existing broker cache has limitations:
- Unnecessary BookKeeper reads during catch-up read scenarios
- Causes increased network costs and resource usage on BookKeeper nodes
- Cascading performance issues under high fan-out catch-up reads
- Current backlogged cursors caching solution has multiple gaps
- Poor cache hit rates for Key_Shared subscriptions with slow consumers since entries get put into the replay queue and once the consumer has sent permits, these entries are read from BookKeeper (unless
cacheEvictionByMarkDeletedPosition=true)
This refactoring prepares the cache system for:
- Enhanced catch-up read optimization
- Efficient replay queue caching for Key_Shared subscriptions
Algorithm Comparison
Before (EntryCacheDefaultEvictionPolicy)
Size Based Eviction
- Sort all caches by size (largest first)
- Select caches until reaching PercentOfSizeToConsiderForEviction (0.5)
- For each selected cache:
- Calculate proportional eviction amount
- Remove entries (no guarantee of age-based removal)
- Problem: May remove newer entries while keeping older ones
https://github.com/apache/pulsar/blob/82237d3684fe506bcb6426b3b23f413422e6e4fb/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDefaultEvictionPolicy.java#L41-L93
Timestamp eviction
- Iterate all caches
- For each cache:
- Start from the first position in the cache
- Remove entries from the cache until the cache is empty or there's a valid entry that hasn't yet been expired
- Problem: Iterating all caches and entries cause a lot of unnecessary CPU and memory pressure due to iterations. By default, this is performed every 10 milliseconds, 100 times per second. (
managedLedgerCacheEvictionIntervalMs=10)
https://github.com/apache/pulsar/blob/eccc6b647e9cb07a18901471de1b2f8fafa88417/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java#L306-L317
After (RangeCacheRemovalQueue)
- All entries added to insertion-order queue when cached
- For timestamp eviction:
- Process queue from oldest to newest
- Remove entries older than threshold
- Stop when hitting newer entry (leverages insertion order)
- For size eviction:
- Process queue from oldest to newest
- Remove entries until target size freed
- Guarantees oldest entries are removed first
https://github.com/apache/pulsar/blob/b72bc4ff3aa5c9c45d9233d2d000429b3cf0ce1a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeCacheRemovalQueue.java
Note: There's a single shared removal queue for all ManagedLedgerImpl instances instead of having to do the check in multiple instances.
Verifying this change
This change is already covered by existing tests:
- All existing cache-related tests continue to pass
RangeCacheTestvalidates the new removal queue functionalityEntryCacheManagerTestverifies eviction behavior remains correct- Integration tests ensure no regression in cache performance
Documentation
- [ ]
doc - [ ]
doc-required - [x]
doc-not-needed - [ ]
doc-complete
Codecov Report
Attention: Patch coverage is 87.80488% with 45 lines in your changes missing coverage. Please review.
Project coverage is 74.36%. Comparing base (
bbc6224) to head (b62ccdd). Report is 1223 commits behind head on master.
Additional details and impacted files
@@ Coverage Diff @@
## master #24363 +/- ##
============================================
+ Coverage 73.57% 74.36% +0.78%
- Complexity 32624 32651 +27
============================================
Files 1877 1878 +1
Lines 139502 146301 +6799
Branches 15299 16772 +1473
============================================
+ Hits 102638 108796 +6158
+ Misses 28908 28883 -25
- Partials 7956 8622 +666
| Flag | Coverage Δ | |
|---|---|---|
| inttests | 26.69% <63.14%> (+2.10%) |
:arrow_up: |
| systests | 23.29% <60.16%> (-1.03%) |
:arrow_down: |
| unittests | 73.86% <87.80%> (+1.02%) |
:arrow_up: |
Flags with carried forward coverage won't be shown. Click here to find out more.
| Files with missing lines | Coverage Δ | |
|---|---|---|
| ...org/apache/bookkeeper/mledger/PositionFactory.java | 100.00% <100.00%> (ø) |
|
| ...kkeeper/mledger/impl/cache/EntryCacheDisabled.java | 75.55% <100.00%> (+4.72%) |
:arrow_up: |
| .../mledger/impl/cache/RangeCacheRemovalCounters.java | 100.00% <100.00%> (ø) |
|
| ...per/mledger/impl/cache/RangeCacheRemovalQueue.java | 100.00% <100.00%> (ø) |
|
| ...keeper/mledger/impl/cache/RangeEntryCacheImpl.java | 63.30% <100.00%> (+4.55%) |
:arrow_up: |
| ...oker/service/nonpersistent/NonPersistentTopic.java | 72.89% <ø> (+3.42%) |
:arrow_up: |
| ...main/java/org/apache/bookkeeper/mledger/Entry.java | 0.00% <0.00%> (ø) |
|
| ...keeper/mledger/impl/cache/PendingReadsManager.java | 87.77% <80.00%> (+1.10%) |
:arrow_up: |
| .../org/apache/bookkeeper/mledger/impl/EntryImpl.java | 86.91% <93.75%> (+8.43%) |
:arrow_up: |
| ...che/bookkeeper/mledger/impl/ManagedLedgerImpl.java | 81.27% <85.71%> (+0.61%) |
:arrow_up: |
| ... and 5 more |
:rocket: New features to boost your workflow:
- :snowflake: Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
- :package: JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.
The merge conflicts will be resolved by first merging #24552 to master branch.
My only question is about the eviction from the stash queue: since the insertion might not be in order, when there is memory pressure, how is the eviction going to happen? Or, is the size of this stash already controlled in size such that it would only require time-based expiration?
For the implementation in this PR all entries are added in order to the removal queue and the timestamp is set when the entry gets added to the cache and the removal queue. Therefore, the timestamps are in order. When evicting to reduce the cache size under the watermark to handle "memory pressure", oldest entries are evicted from the queue. Time based expiration is done by evicting from the head of the queue until there's a newer timestamp on the entry.
The eviction will become more complicated later. In PIP-430, there's a need to put entries aside and keep them in the cache for a longer period of time. Currently that is addressed in PIP-430 by having 2 separate settings for expiration and that will simplify the solution. The maximum TTL for cache entries will continue to be in order in the removal queue, but another datastructure is needed when size based eviction would remove the entry, but it's prioritized to be skipped and kept until the entry expires. Handling that challenge is out-of-scope for this PR. After this current PR has been merged, it will be possible to build upon this and expand the solution further to implement PIP-430 (I have a WIP PR in my own fork, but it's not matching the PIP high-level design).
One more detail about eviction handling that is present in this PR. The central queue is used for eviction by size (to keep all cached items under the limit) or by TTL. When entries are evicted directly from the RangeCache (for a complete ledger or for up to the markdelete position), the entry wrapper held in the range cache will be cleared and the same instance will remain in the queue until it gets processed. Empty entry wrappers will be held in the queue at most for the TTL in worst case. The entry wrapper gets recycled when it gets processed from the RangeCacheRemovalQueue.
I hope this answer covers your question @merlimat
PIP-430 vote has passed. I'll merge this PR to master.