[improve][broker] Optimize fine-grained concurrency control for BucketDelayedDeliveryTracker
Fixes #24603
Main Issue: #24600
PIP: #xyz
Motivation
- Reduce lock contention and fix thread-safety issues in BucketDelayedDeliveryTracker.
- Remove blocking I/O from the publish path.
- Decouple the tracker from dispatcher for easier testing and benchmarking.
Modifications
- Concurrency
- Replace StampedLock and synchronized with ReentrantReadWriteLock (read/write separation).
- addMessage uses “read-lock check → write-lock modify” to minimize write lock time.
- Asynchronous snapshots
- Seal-and-swap the current bucket and persist snapshots on a single-thread executor.
- Ensure only one snapshot is created at a time.
- Scheduling and timer
- getScheduledMessages triggers async loading of next snapshot segment instead of blocking.
- Reworked timer handling with a dedicated lock and scheduleImmediateRun; no synchronization on dispatcher.
- Decoupling
- Introduce DelayedDeliveryContext:
- DispatcherDelayedDeliveryContext (production).
- NoopDelayedDeliveryContext (tests/benchmarks).
- Trackers and factories accept dispatcher or context; added constructors for name+cursor.
- Introduce DelayedDeliveryContext:
- Merging and lifecycle
- asyncMergeBucketSnapshot uses write lock; merges bitmaps; removes old buckets; improved stats/logging.
- clear() and close() under write lock; wait for snapshot futures outside locks; shutdown snapshot executor.
- Tests and benchmarks
- Expanded unit and thread-safety tests; made MockManagedCursor public.
- New JMH suite (BucketDelayedDeliveryTrackerBenchmark) and MockBucketSnapshotStorage; removed old simple benchmark.
Verifying this change
- [x] Make sure that the change passes the CI checks.
(Please pick either of the following options)
This change is a trivial rework / code cleanup without any test coverage.
(or)
This change is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
- Added integration tests for end-to-end deployment with large payloads (10MB)
- Extended integration test for recovery after broker failure
Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
- [ ] Dependencies (add or upgrade a dependency)
- [ ] The public API
- [ ] The schema
- [ ] The default values of configurations
- [ ] The threading model
- [ ] The binary protocol
- [ ] The REST endpoints
- [ ] The admin CLI options
- [ ] The metrics
- [ ] Anything that affects deployment
Documentation
- [ ]
doc - [ ]
doc-required - [x]
doc-not-needed - [ ]
doc-complete
Matching PR in forked repository
PR in forked repository: https://github.com/Denovo1998/pulsar/pull/10
@codelipenghui @lhotari @coderzc @Apurva007 How does this copy on write method look, is my direction correct? There is still much work to be done, and I will convert this PR to a draft later.
@codelipenghui @lhotari @coderzc @Apurva007 How does this copy on write method look, is my direction correct? There is still much work to be done, and I will convert this PR to a draft later.
Without benchmarking, it will be hard to validate assumptions. I'd suggest adding JMH benchmarks so that performance could be compared.
JMH benchmarks can be added to microbench module: https://github.com/apache/pulsar/tree/master/microbench
OK. I will add JMH benchmarks first.
bump, this PR can be reviewed now. @lhotari @codelipenghui @coderzc @Apurva007 @thetumbled @dao-jun @BewareMyPower PTAL.
bump.
@BewareMyPower Copilot's review made sense, I re-examined it and have made the changes. PTAL.
@codelipenghui @lhotari @coderzc @Apurva007 @BewareMyPower @thetumbled @dao-jun
Update the JMH test results.
java -Xmx4g -jar microbench/target/microbenchmarks.jar \
-p maxAdditionalUniqueMessages=5000000 \
-rf json -rff jmh-result-$(date +%s).json \
".*BucketDelayedDeliveryTrackerBenchmark.benchmarkConcurrentAddMessage.*" \
| tee jmh-result-$(date +%s).txt
-
Current PR optimized implementation. https://gist.github.com/Denovo1998/ed2483ef8b0139aaef85663216f7c226 https://gist.github.com/Denovo1998/2b83267e2e36636bc4dab2ef120ed72c
-
Code branch implemented using synchronized. https://gist.github.com/Denovo1998/6ce77dbf8aced1b493f4ee7926e80a61 https://gist.github.com/Denovo1998/81ab70285988e675ec1b5024faa6c3a7
In the current master branch the implementation uses StampedLock and is heavily optimized for pure read (contains) throughput via optimistic reads.
In the new version (Denovo1998:bucket_delivery_tracker_optimize) I switched to a standard read–write lock, so it’s expected that a pure contains micro-benchmark cannot beat the StampedLock version.
However, delayed-message workloads are not “pure read” – they are a mix of:
addMessage(adding new delayed messages),remove/ack/ cleanup,- occasional
containschecks.
So I focused on the mixed-operation benchmarks, which better reflect real usage.
From the JMH results:
-
Write path (
benchmarkConcurrentAddMessage)master: ~6k ops/s- new version: 5M–11M ops/s
→ roughly three orders of magnitude faster on
addMessage.
-
Pure read (
benchmarkConcurrentContainsMessage)master: ~80M–90M ops/s- new version: ~2M–3M ops/s
→ here
masteris much faster, which matches theStampedLockdesign goal.
-
Mixed operations (1 thread,
benchmarkMixedOperations) For typical read/write mixes:- 50/50, 70/30, 80/20: the new version is about hundreds of times faster (e.g. 50/50 goes from ~13k ops/s to ~12.6M ops/s).
- Even at 90% read / 10% write, the new version is still slightly faster overall.
-
Mixed operations, high contention (32 threads,
benchmarkHighContentionMixedOperations)- 50/50: ~2.9× faster
- 70/30: ~1.5–1.6× faster
- 80/20: ~1.3–1.4× faster
- 90/10: still 5–10% faster end-to-end
So the trade-off is:
- We give up some extreme pure-read throughput from the
StampedLockimplementation; - In exchange, we dramatically improve write performance and get consistent wins in mixed read/write workloads, which is closer to how delayed messages are actually used in production.
- Current PR optimized implementation.
https://gist.github.com/Denovo1998/d470ea939fc9337523545525539b8f5f https://gist.github.com/Denovo1998/74077064f0aa7c1119c6e627a083262a
- Code branch implemented using synchronized and StampedLock.
https://gist.github.com/Denovo1998/02075b374a5f00771ac6d32809ac4036 https://gist.github.com/Denovo1998/19b0483cd1f9ca06b17ac434fe402a74