pulsar icon indicating copy to clipboard operation
pulsar copied to clipboard

[improve][broker] Don't call ManagedLedger#asyncAddEntry in Netty I/O thread

Open BewareMyPower opened this issue 9 months ago • 2 comments

Motivation

https://github.com/apache/pulsar/pull/23940 brings a behavior change that the core logic of ManagedLedger#asyncAddEntry now won't switch threads, which means it will be executed directly in Netty I/O thread via PersistentTopic#asyncAddEntry.

The beforeAddEntry method calls theintercept and interceptWithNumberOfMessages methods for all broker entry interceptors and prepends a new broker entry metadata buffer on the original buffer (though it's just a composite buffer).

There is a risk that when many producers send messages to the same managed ledger concurrently, the process of asyncAddEntry might block the Netty I/O thread for some time and cause the performance regression.

Modifications

In PersistentTopic#publishMessage, expose the getExecutor() method for ManagedLedger and execute ManagedLedger#asyncAddEntry in that executor. The change of https://github.com/apache/pulsar/pull/12606 is moved to PersistentTopic as well that the buffer is retained before switching to another thread.

After that, only synchronize afterAddEntryToQueue with other synchronized methods of ManagedLedgerImpl. P.S. actually I don't think synchronized is needed here but the logic is not trivial like beforeAddEntryToQueue and beforeAddEntry, so I still retain it as synchronized.

ManagedLedgerImpl#asyncAddEntry still doesn't switch the thread, so it would still be possible for the downstream application to synchronize asyncAddEntry, either by adding a lock (e.g. synchronized) or executing this method is a single thread.

Documentation

  • [ ] doc
  • [ ] doc-required
  • [x] doc-not-needed
  • [ ] doc-complete

Matching PR in forked repository

PR in forked repository: https://github.com/BewareMyPower/pulsar/pull/40

BewareMyPower avatar Feb 13 '25 12:02 BewareMyPower

An extra context switch for each entry is costly, especially when you have many small entries and little or no batching. That's why we put it on the same thread.

@merlimat The thread switching was added in PR https://github.com/apache/pulsar/pull/9039, already in December 2020. The reason to make this change is related to a performance concern of #23940 changes which removed the thread switching.

https://github.com/apache/pulsar/blob/ee5b13af5cf229c2e4846c6d34ebda59eb82330a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L796-L826

In Pulsar use cases, synchronization on CPU intensive operations (or blocking IO operations) in Netty IO threads could cause performance regressions. In this case, it would impact use cases where there's a large number of producers producing to a single topic. Blocking IO threads will have a broader impact since it will impact Netty IO of all connections sharing the same IO thread.

Before #23940, the code looks like this: https://github.com/apache/pulsar/blob/7a79c78f8e6f4b52f13be1c6441f4b007d9a00fe/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L796-L810

btw. In the Pulsar code base, we have a problem in how IO threads are used. IO threads are used to process work that shouldn't be handled with IO threads at all. I have created an issue #23865. There should be a separate thread pool for running blocking operations and CPU intensive synchronized operations.

lhotari avatar Feb 13 '25 18:02 lhotari

@merlimat The thread switching was added in PR https://github.com/apache/pulsar/pull/9039, already in December 2020.

@merlimat @lhotari to correct it, this is the very early behavior introduced in https://github.com/apache/pulsar/pull/1521.

This PR intends to decouple ManagedLedger#asyncAddEntry and PersistentTopic#asyncAddEntry so that the managed ledger interface can be more flexible for the downstream protocol handlers to use.

After that, all write operations from Pulsar client will still keep the original behavior that switches to managed ledger's executor to call ManagedLedger#asyncAddEntry.

However, regarding the downstream, for example, in my Kafka protocol handler implementation, PersistentTopic#publishMessage is not called in an I/O thread. Instead, it's called in an independent worker thread. Then I can choose to call persistentTopic.getManagedLedger().asyncAddEntry(/* ... */) in order, which can be achieved by adding the synchronized keyword or using the same worker thread for the same topic.

The comment here makes sense to a certain extent, but it might be a new topic (e.g. thread switching vs. synchronized) to discuss, which is beyond the scope of this PR. At least, the existing thread switching approach can already achieve high publish performance, which is verified by many benchmarks.

BewareMyPower avatar Feb 14 '25 03:02 BewareMyPower

Let me close this PR first.

An extra context switch for each entry is costly

It's true, but we still need a benchmark. I ran a simple test by adding the metrics to ManagedLedgerMBeanImpl

    @Getter
    private final StatsBuckets threadSwitchingStats = new StatsBuckets(ENTRY_LATENCY_BUCKETS_USEC);
    @Getter
    private final StatsBuckets executeStats = new StatsBuckets(ENTRY_LATENCY_BUCKETS_USEC);
    @Getter
    private final StatsBuckets asyncAddEntryStats = new StatsBuckets(ENTRY_LATENCY_BUCKETS_USEC);

and then record the latency in asyncAddEntry:

        final var beforeExecute = System.nanoTime();
        executor.execute(() -> {
            final var afterExecute = System.nanoTime();
            OpAddEntry addOperation = OpAddEntry.createNoRetainBuffer(this, buffer, numberOfMessages, callback, ctx,
                    currentLedgerTimeoutTriggered);
            internalAsyncAddEntry(addOperation);
            final var completeAsyncAddEntry = System.nanoTime();
            mbean.getThreadSwitchingStats().addValue(afterExecute - beforeExecute);
            mbean.getAsyncAddEntryStats().addValue(completeAsyncAddEntry - afterExecute);
        });
        final var afterExecute = System.nanoTime();
        mbean.getExecuteStats().addValue(afterExecute - beforeExecute);

Then I added a test:

    @Test
    public void testThreadSwitchingLatency() throws Exception {
        ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open("ml");
        final var data = new byte[1000];
        for (int i = 0; i < 1000; i++) {
            ml.addEntry(data);
        }
        ml.mbean.getThreadSwitchingStats().refresh();
        ml.mbean.getAsyncAddEntryStats().refresh();
        ml.mbean.getExecuteStats().refresh();
        log.info("1: {}", ml.mbean.getThreadSwitchingStats().getAvg());
        log.info("2: {}", ml.mbean.getAsyncAddEntryStats().getAvg());
        log.info("3: {}", ml.mbean.getExecuteStats().getAvg());
    }

Outputs:

1: 26924.005
2: 35933.837
3: 8866.484
  1. Task execution delay: 27 us
  2. Asynchronous operation itself: 36 us
  3. The execute method call itself: 9 us

All of these should not be a significant impact because the whole add latency is milliseconds level.

The main concern is that if internalAsyncAddEntry could cost much time, blocking the Netty I/O thread might have an impact for other requests. Hence, switching to a dedicated thread pool (managed ledger's executor) might make sense.

We should not use synchronized to replace executor.execute here, because currently ManagedLedgerImpl uses synchronized as a coarse-grained lock in many other places as well:

  • asyncOpenCursor
  • cursor initialization callback in asyncOpenCursor
  • asyncDeleteCursor
  • newNonDurableCursor
  • getEstimatedBacklogSize
  • asyncTerminate
  • ...

It's hard to analyze how much time is blocked by acquiring the lock.

Using a dedicated lock for asyncAddEntry should make sense, which is much more efficient than thread switching. However, we cannot simply replace synchronized with a different lock on internalAsyncAddEntry because it could access fields that are protected by synchronized in other threads.

In summary, the current design should not be changed, including:

  • Adding synchronized on internalAsyncAddEntry
  • Switching to managed ledger's executor to execute internalAsyncAddEntry

We should not care much about the thread switching because its latency is microseconds level.

BewareMyPower avatar Sep 23 '25 15:09 BewareMyPower