[fix] Don't use ForkJoinPool when using CompletableFuture.thenXXXAsync
Motivation
Currently, in some cases when don't pass an executor to CompletableFuture.thenComposeAsync/thenApplyAsync/thenAcceptAsync, which will use ForkJoinPool.commonPool.
Which maybe lead to 3 problems:
ForkJoinPool.commonPoolis managed by JVM, it is out of Pulsar manage.- The default thread num of
ForkJoinPool.commonPoolisMath.max(1, Runtime.getRuntime().availableProcessors() - 1), if the broker is in heavy workload, it's not enough. - The callback methods can be executed in different thread everytime when we call
thenComposeAsync/thenApplyAsync/thenAcceptAsync, it maybe lead to potential thread safety issue.
Modifications
Verifying this change
- [ ] Make sure that the change passes the CI checks.
(Please pick either of the following options)
This change is a trivial rework / code cleanup without any test coverage.
(or)
This change is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
- Added integration tests for end-to-end deployment with large payloads (10MB)
- Extended integration test for recovery after broker failure
Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
- [ ] Dependencies (add or upgrade a dependency)
- [ ] The public API
- [ ] The schema
- [ ] The default values of configurations
- [ ] The threading model
- [ ] The binary protocol
- [ ] The REST endpoints
- [ ] The admin CLI options
- [ ] The metrics
- [ ] Anything that affects deployment
Documentation
- [ ]
doc - [ ]
doc-required - [x]
doc-not-needed - [ ]
doc-complete
3. The callback methods can be executed in different thread everytime when we call
thenComposeAsync/thenApplyAsync/thenAcceptAsync, it maybe lead to potential thread safety issue.
As long as the mutations don't happen after an object has been passed through a completable future, the receiving thread will observe the same state of the object. A lot of Pulsar's "thread safety" relies on this at the moment.
The reason for this is that in many cases there's a clear reason why then*Async has been used.
+1.
Generally, thenXxxAsync has poor performance due to the thread switching. But it's used to prevent the callback from blocking the completion thread too long. Take this PR's change for example:
return provider.isSuperUser(role, authenticationData, conf).thenCompose(isSuperUser -> {
if (isSuperUser) {
return CompletableFuture.completedFuture(true);
} else {
return provider.canLookupAsync(topicName, role, authenticationData);
}
});
The implementation of provider.canLookupAsync could be a customized one. If there is a bug, canLookupAsync might block the thread for some time (usually the metadata store's I/O thread). What's worse, it might cause deadlock (e.g. calling a metadata store operation synchronously in the metadata store's callback).
Regarding other places that replace the ForkJoinPool with Pulsar's internal executors, it depends.
ForkJoinPool.commonPool is managed by JVM, it is out of Pulsar manage.
Executors managed by Pulsar are not guaranteed to be better than ForkJoinPool. Actually, ForkJoinPool should be better than Pulsar's existing thread model. Almost all executor services managed by Pulsar have fixed threads like FixedThreadPool. Once a thread has too many pending tasks, they have to wait previous task to be done, even if there is another IDLE thread.
The callback methods can be executed in different thread everytime when we call thenComposeAsync/thenApplyAsync/thenAcceptAsync, it maybe lead to potential thread safety issue.
It's better to give a concrete example to show how could it result a thread safety issue. Make sure you know the happens-before relationship in Java.
When the ForkJoinPool becomes blocked or runs into deadlock-like behavior, it can stall all dependent tasks. New tasks get stuck in the work queue, leading to widespread performance degradation.
Additionally, ForkJoinPool is designed for workloads that can be recursively split into independent subtasks. If the task isn't inherently decomposable, using ForkJoinPool won't improve performance and may introduce unnecessary overhead.
I think it's reasonable to replace the ForkJoinPool, but going with a single-threaded pool is risky.
If the task isn't inherently decomposable, using ForkJoinPool won't improve performance and may introduce unnecessary overhead.
It's only one typical case. From the official document:
as well as when many small tasks are submitted to the pool from external clients.
A static commonPool() is available and appropriate for most applications. The common pool is used by any ForkJoinTask that is not explicitly submitted to a specified pool. Using the common pool normally reduces resource usage (its threads are slowly reclaimed during periods of non-use, and reinstated upon subsequent use).
For applications that require separate or custom pools, a ForkJoinPool may be constructed with a given target parallelism level; by default, equal to the number of available processors.
Most tasks from Pulsar are such small tasks in a chain, which are usually starting an asynchronous task.
var future = runTask1().thenCompose(result1 -> runTask2(result1)).thenApply(result2 -> runTask3(result2);
future.whenComplete((result, e) -> {
if (e == null) {
future2.complete(result);
} else {
future2.completeExceptionally(e);
}
});
Then future2 might trigger the callback somewhere else.
For a specific example, the transactionExecutorProvider whose number of threads is determined by numTransactionReplayThreadPoolSize, which is the number of processors by default.
- It does never make sense to use the number of processors by default. I guess it's the default value just because everyone else does that.
- This executor executes many small tasks, even including when a future is completed.
private void init() {
pendingAckStoreProvider.checkInitializedBefore(persistentSubscription)
.thenAcceptAsync(init -> {
if (init) {
initPendingAckStore();
} else {
completeHandleFuture();
}
}, internalPinnedExecutor)
The caller side should be responsible to switch the thread to continue, not from the internal code.
1、Although, as stated in the official documentation, ForkJoinPool provides benefits such as thread reuse and resource efficiency (e.g., automatic thread reclamation), in certain scenarios, it may actually lead to performance degradation. This is because when a task is submitted to the ForkJoinPool, if the previously used threads have already been reclaimed, the system may need to recreate those threads, introducing additional overhead.
2、We cannot guarantee that all tasks submitted to the ForkJoinPool are lightweight and non-blocking. In practice, for example within Pulsar's Protocol Handler or Broker Interceptor, developers are free to implement custom logic that may execute tasks using ForkJoinPool. This unrestricted usage increases the risk of introducing blocking operations, which can eventually saturate or block the entire pool.
3、Using a dedicated thread pool results in clearer code structure, allowing developers to easily identify which thread pool is executing the current task.This not only improves code maintainability but also facilitates debugging and performance tuning.
4、When the ForkJoinPool encounters blocking issues, it becomes difficult to diagnose. For instance, in a jstack output, some threads may appear to be in a WAITING state, while they are actually blocked by tasks. This kind of "false idleness" can be misleading during diagnosis and significantly increases the difficulty of identifying the root cause.
By default, CompletableFuture executes continuations (thenApply, thenCompose, etc.) on the same thread that completes the future. In Pulsar, this can sometimes be a critical thread, such as a metadata service thread. If we're not careful, this behavior can lead to overloaded threads on hot paths and introduce performance regressions.
That said, in Pulsar we typically avoid switching threads unless it's explicitly necessary. We prefer to execute continuation logic on the callback thread, which helps reduce context switching and improves CPU efficiency. However, this also means we need to be mindful of how much work we're doing in these continuations, especially if the callback thread is part of a limited thread pool.
Using then*Async with the default ForkJoinPool.commonPool introduces behavior that is outside Pulsar's control and can be problematic under load. If thread switching is needed, it’s better to specify a Pulsar-managed executor to maintain better control explicitly and avoid contention with unrelated workloads.
If thread switching is needed, it’s better to specify a Pulsar-managed executor
The issue is, where should a Pulsar-managed executor be used is very ambiguous.
I agree that there is no silver bullet, so we must be careful for which thread pool to use. This PR title is misleading that it thinks the built-in common ForkJoinPool is harmful. But for short tasks that need thread switching, it's proper.
It's reasonable to switch thenApplyAsync(f) to thenApply(f) or thenApply(f, existingExecutor) in some places. But please explain it with a convincing reason, not just "ForkJoinPool is worse than Pulsar managed thread pools". Just like I've mentioned, many Pulsar managed thread pools are not really taken enough care of. Much code is written without any careful thinking.
Therefore, I'd like to talk about the use of thenXXXAsync or thenXXXAsync case by case.
Take the case I've mentioned before: https://github.com/apache/pulsar/blob/3bd70fd74fd2586724d4830c140a42517e028959/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java#L171
We should replace thenAcceptAsync with thenAccept here. It should be the caller's responsibility to determine whether to switch the thread. Switching the thread internally adds unnecessary thread switching cost.
However, it's hard to know which thread will complete the future of checkInitializedBefore, so a safer solution is to change https://github.com/apache/pulsar/blob/3bd70fd74fd2586724d4830c140a42517e028959/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java#L244
to thenComposeAsync. The callback calls a synchronized code block that acquires the lock of PersistentSubscription. The question becomes, which thread pool should be used? From the purpose of avoiding deadlocks, the default common pool could be best because Pulsar uses its internal executors to execute some tasks, which could never have a chance to lead to a deadlock with the common ForkJoinPool. Though the code block is synchronized, the logic could never be blocking.
BTW, I see Pulsar's default executor is used here: https://github.com/apache/pulsar/blob/3bd70fd74fd2586724d4830c140a42517e028959/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java#L391
Unfortunately, it's really ambiguous that where should PulsarService#executor be used. I can only see people just choosing an available thread pool as the 2nd argument of thenXXXAsync randomly.
https://bugs.openjdk.org/browse/JDK-8330017
@lhotari @BewareMyPower We've run into this issue several times. The ForkJoinPool may stop executing tasks after running for a while, and we eventually confirmed that the problem lies within the ForkJoinPool itself. Please take a look.
https://bugs.openjdk.org/browse/JDK-8330017
@lhotari @BewareMyPower We've run into this issue several times. The ForkJoinPool may stop executing tasks after running for a while, and we eventually confirmed that the problem lies within the ForkJoinPool itself. Please take a look.
@zhanghaou Thank you! Please report this as a separate issue since it's different than this PR. Please also describe under which conditions it happens and how often. It looks like JDK-8330017 bug doesn't exist in Java 21 according to the issue description. You can run Pulsar 3.x on Java 21 too. I'd recommend building a custom docker image with Java 21 to get around the issue. Pulsar 3.x building should be fine with Java 21, but running tests requires Java 17 due to some incompatible testing libraries. The Pulsar docker images since 3.3.x use Java 21.
https://bugs.openjdk.org/browse/JDK-8330017 @lhotari @BewareMyPower We've run into this issue several times. The ForkJoinPool may stop executing tasks after running for a while, and we eventually confirmed that the problem lies within the ForkJoinPool itself. Please take a look.
@zhanghaou Thank you! Please report this as a separate issue since it's different than this PR. Please also describe under which conditions it happens and how often. It looks like JDK-8330017 bug doesn't exist in Java 21 according to the issue description. You can run Pulsar 3.x on Java 21 too. I'd recommend building a custom docker image with Java 21 to get around the issue. Pulsar 3.x building should be fine with Java 21, but running tests requires Java 17 due to some incompatible testing libraries. The Pulsar docker images since 3.3.x use Java 21.
@lhotari Java 21.0.8 fixed the bug, it seems Java 21 also affected. https://bugs.openjdk.org/browse/JDK-8351933
@lhotari Java 21.0.8 fixed the bug, it seems Java 21 also affected. https://bugs.openjdk.org/browse/JDK-8351933
@zhanghaou Thanks for sharing. It looks like it's also fixed in 17.0.17
apachepulsar/pulsar:4.0.6 contains 21.0.8 so it's addressed there. apachepulsar/pulsar:3.3.8 also contains Java 21.0.8.
apachepulsar/pulsar:3.0.13 contains 17.0.16 so it's not yet addressed there.
It seems that 17.0.17 hasn't been released yet. According to https://wiki.openjdk.org/display/JDKUpdates/JDK+17u, 17.0.17 will be released on October 21, 2025.
For Pulsar 3.0.x, it seems that the only workaround is to build a custom docker image that uses Java 21.0.8 to run Pulsar.
@lhotari Java 21.0.8 fixed the bug, it seems Java 21 also affected. https://bugs.openjdk.org/browse/JDK-8351933
@zhanghaou Thanks for sharing. It looks like it's also fixed in 17.0.17
apachepulsar/pulsar:4.0.6contains 21.0.8 so it's addressed there.apachepulsar/pulsar:3.3.8also contains Java 21.0.8.apachepulsar/pulsar:3.0.13contains 17.0.16 so it's not yet addressed there. It seems that 17.0.17 hasn't been released yet. According to https://wiki.openjdk.org/display/JDKUpdates/JDK+17u, 17.0.17 will be released on October 21, 2025. For Pulsar 3.0.x, it seems that the only workaround is to build a custom docker image that uses Java 21.0.8 to run Pulsar.
@lhotari For users deploying with Docker, this might be helpful. However, many users are likely installing the JDK themselves and running Pulsar directly on a VM. As for the JDK version, it seems we only specify the major version requirement, without detailing the minor versions.
The version we maintain internally has removed all dependencies on the ForkJoin Pool, including the content mentioned in this PR, as well as the caffeine part (since asynchronous construction in caffeine also uses the ForkJoinPool by default).
Given the current situation, I think this PR is meaningful.
For users deploying with Docker, this might be helpful. However, many users are likely installing the JDK themselves and running Pulsar directly on a VM. As for the JDK version, it seems we only specify the major version requirement, without detailing the minor versions.
For users installing the JDK directly, it's even easier to use Java 21.0.8 for Pulsar 3.0.x
The version we maintain internally has removed all dependencies on the ForkJoin Pool, including the content mentioned in this PR, as well as the caffeine part (since asynchronous construction in caffeine also uses the ForkJoinPool by default).
Well, that's one way to have a workaround to the JVM bug.
Given the current situation, I think this PR is meaningful.
That's true. However, replacing all usages of ForkJoinPool in Pulsar might not make sense. ForkJoinPool is useful in certain cases due to the ability to add more threads when all threads are blocked and work cannot progress because of that. For example, IIRC, that addresses some deadlock scenarios when ExtensibleLoadManager is used.
That's true. However, replacing all usages of ForkJoinPool in Pulsar might not make sense. ForkJoinPool is useful in certain cases due to the ability to add more threads when all threads are blocked and work cannot progress because of that. For example, IIRC, that addresses some deadlock scenarios when ExtensibleLoadManager is used.
Thank you for providing the additional information. I think it would be better to use a dedicated ForkJoinPool where necessary, instead of relying on the default common pool everywhere, even if the problem occurs, the impact will be relatively limited.
Thank you for providing the additional information. I think it would be better to use a dedicated ForkJoinPool where necessary, instead of relying on the default common pool everywhere, even if the problem occurs, the impact will be relatively limited.
@zhanghaou I agree that more care should be put into the use of different threads and thread pools in Pulsar.
it would be better to use a dedicated ForkJoinPool where necessary,
I agree.
Java 17.0.17 has been released. This change is no longer needed as a workaround for the Java bug. I'll initiate 3.0.15 release so that we'll have Java 17.0.17 used in the 3.0.15 docker image.