pulsar icon indicating copy to clipboard operation
pulsar copied to clipboard

[improve][broker] Optimize fine-grained concurrency control for BucketDelayedDeliveryTracker

Open Denovo1998 opened this issue 3 months ago • 8 comments

Fixes #24603

Main Issue: #24600

PIP: #xyz

Motivation

  • Reduce lock contention and fix thread-safety issues in BucketDelayedDeliveryTracker.
  • Remove blocking I/O from the publish path.
  • Decouple the tracker from dispatcher for easier testing and benchmarking.

Modifications

  • Concurrency
    • Replace StampedLock and synchronized with ReentrantReadWriteLock (read/write separation).
    • addMessage uses “read-lock check → write-lock modify” to minimize write lock time.
  • Asynchronous snapshots
    • Seal-and-swap the current bucket and persist snapshots on a single-thread executor.
    • Ensure only one snapshot is created at a time.
  • Scheduling and timer
    • getScheduledMessages triggers async loading of next snapshot segment instead of blocking.
    • Reworked timer handling with a dedicated lock and scheduleImmediateRun; no synchronization on dispatcher.
  • Decoupling
    • Introduce DelayedDeliveryContext:
      • DispatcherDelayedDeliveryContext (production).
      • NoopDelayedDeliveryContext (tests/benchmarks).
    • Trackers and factories accept dispatcher or context; added constructors for name+cursor.
  • Merging and lifecycle
    • asyncMergeBucketSnapshot uses write lock; merges bitmaps; removes old buckets; improved stats/logging.
    • clear() and close() under write lock; wait for snapshot futures outside locks; shutdown snapshot executor.
  • Tests and benchmarks
    • Expanded unit and thread-safety tests; made MockManagedCursor public.
    • New JMH suite (BucketDelayedDeliveryTrackerBenchmark) and MockBucketSnapshotStorage; removed old simple benchmark.

Verifying this change

  • [x] Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • [ ] Dependencies (add or upgrade a dependency)
  • [ ] The public API
  • [ ] The schema
  • [ ] The default values of configurations
  • [ ] The threading model
  • [ ] The binary protocol
  • [ ] The REST endpoints
  • [ ] The admin CLI options
  • [ ] The metrics
  • [ ] Anything that affects deployment

Documentation

  • [ ] doc
  • [ ] doc-required
  • [x] doc-not-needed
  • [ ] doc-complete

Matching PR in forked repository

PR in forked repository: https://github.com/Denovo1998/pulsar/pull/10

Denovo1998 avatar Sep 14 '25 13:09 Denovo1998

@codelipenghui @lhotari @coderzc @Apurva007 How does this copy on write method look, is my direction correct? There is still much work to be done, and I will convert this PR to a draft later.

Denovo1998 avatar Sep 14 '25 13:09 Denovo1998

@codelipenghui @lhotari @coderzc @Apurva007 How does this copy on write method look, is my direction correct? There is still much work to be done, and I will convert this PR to a draft later.

Without benchmarking, it will be hard to validate assumptions. I'd suggest adding JMH benchmarks so that performance could be compared.

JMH benchmarks can be added to microbench module: https://github.com/apache/pulsar/tree/master/microbench

lhotari avatar Sep 14 '25 16:09 lhotari

OK. I will add JMH benchmarks first.

Denovo1998 avatar Sep 15 '25 00:09 Denovo1998

bump, this PR can be reviewed now. @lhotari @codelipenghui @coderzc @Apurva007 @thetumbled @dao-jun @BewareMyPower PTAL.

Denovo1998 avatar Oct 11 '25 11:10 Denovo1998

bump.

Denovo1998 avatar Nov 29 '25 13:11 Denovo1998

@BewareMyPower Copilot's review made sense, I re-examined it and have made the changes. PTAL.

Denovo1998 avatar Nov 30 '25 04:11 Denovo1998

@codelipenghui @lhotari @coderzc @Apurva007 @BewareMyPower @thetumbled @dao-jun

Update the JMH test results.

java -Xmx4g -jar microbench/target/microbenchmarks.jar \
       -p maxAdditionalUniqueMessages=5000000 \
       -rf json -rff jmh-result-$(date +%s).json \
       ".*BucketDelayedDeliveryTrackerBenchmark.benchmarkConcurrentAddMessage.*" \
       | tee jmh-result-$(date +%s).txt
  1. Current PR optimized implementation. https://gist.github.com/Denovo1998/ed2483ef8b0139aaef85663216f7c226 https://gist.github.com/Denovo1998/2b83267e2e36636bc4dab2ef120ed72c CleanShot 2025-12-04 at 21 23 16@2x

  2. Code branch implemented using synchronized. https://gist.github.com/Denovo1998/6ce77dbf8aced1b493f4ee7926e80a61 https://gist.github.com/Denovo1998/81ab70285988e675ec1b5024faa6c3a7

CleanShot 2025-12-04 at 21 22 50@2x

Denovo1998 avatar Dec 04 '25 13:12 Denovo1998

In the current master branch the implementation uses StampedLock and is heavily optimized for pure read (contains) throughput via optimistic reads. In the new version (Denovo1998:bucket_delivery_tracker_optimize) I switched to a standard read–write lock, so it’s expected that a pure contains micro-benchmark cannot beat the StampedLock version.

However, delayed-message workloads are not “pure read” – they are a mix of:

  • addMessage (adding new delayed messages),
  • remove/ack / cleanup,
  • occasional contains checks.

So I focused on the mixed-operation benchmarks, which better reflect real usage.

From the JMH results:

  • Write path (benchmarkConcurrentAddMessage)

    • master: ~6k ops/s
    • new version: 5M–11M ops/s → roughly three orders of magnitude faster on addMessage.
  • Pure read (benchmarkConcurrentContainsMessage)

    • master: ~80M–90M ops/s
    • new version: ~2M–3M ops/s → here master is much faster, which matches the StampedLock design goal.
  • Mixed operations (1 thread, benchmarkMixedOperations) For typical read/write mixes:

    • 50/50, 70/30, 80/20: the new version is about hundreds of times faster (e.g. 50/50 goes from ~13k ops/s to ~12.6M ops/s).
    • Even at 90% read / 10% write, the new version is still slightly faster overall.
  • Mixed operations, high contention (32 threads, benchmarkHighContentionMixedOperations)

    • 50/50: ~2.9× faster
    • 70/30: ~1.5–1.6× faster
    • 80/20: ~1.3–1.4× faster
    • 90/10: still 5–10% faster end-to-end

So the trade-off is:

  • We give up some extreme pure-read throughput from the StampedLock implementation;
  • In exchange, we dramatically improve write performance and get consistent wins in mixed read/write workloads, which is closer to how delayed messages are actually used in production.

  1. Current PR optimized implementation.

https://gist.github.com/Denovo1998/d470ea939fc9337523545525539b8f5f https://gist.github.com/Denovo1998/74077064f0aa7c1119c6e627a083262a

CleanShot 2025-12-06 at 10 21 28@2x
  1. Code branch implemented using synchronized and StampedLock.

https://gist.github.com/Denovo1998/02075b374a5f00771ac6d32809ac4036 https://gist.github.com/Denovo1998/19b0483cd1f9ca06b17ac434fe402a74

CleanShot 2025-12-06 at 10 24 14@2x

Denovo1998 avatar Dec 06 '25 02:12 Denovo1998