`S3TransferManager` / `S3AsyncClient` does not seem to use `SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR`'s `Executor`.
Describe the bug
I think CompletableFutures seems to always be on or reverts to the default shared ForkJoinPool even if an Executor is provided via FUTURE_COMPLETION_EXECUTOR when making S3TransferManager calls.
Expected Behavior
If provided, CompletableFutures run on the Executor provided via FUTURE_COMPLETION_EXECUTOR when making S3TransferManager calls.
Current Behavior
CompletableFutures uses or reverts back to the default shared ForkJoinPool even if an Executor is provided via .futureCompletionExecutor() when making S3TransferManager calls.
Reproduction Steps
Disclaimer: this is the first time I've worked with Executors and CompletableFutures.
On 2.20.160 of the SDK (SDK version because it currently runs on EMR 6.15.0), when attempting something like
// https://github.com/aws/aws-sdk-java-v2/blob/501e37cea262a4c1e676f0fcceb1ff09e9a07282/core/sdk-core/src/main/java/software/amazon/awssdk/core/client/builder/SdkDefaultClientBuilder.java#L487-L499
int processors = Runtime.getRuntime().availableProcessors();
int corePoolSize = Math.max(8, processors);
int maxPoolSize = Math.max(64, processors * 2);
int keepAliveTime = 10;
TimeUnit keepAliveTimeUnit = TimeUnit.SECONDS;
ThreadPoolExecutor futureCompletionExecutor = new ThreadPoolExecutor(
corePoolSize,
maxPoolSize,
keepAliveTime,
keepAliveTimeUnit,
new LinkedBlockingQueue<>(1_000),
new ThreadFactoryBuilder()
.threadNamePrefix("s3-transfer-manager-s3-async-client").build(),
new ThreadPoolExecutor.CallerRunsPolicy() // vs default of `ThreadPoolExecutor.AbortPolicy`
);
futureCompletionExecutor.allowCoreThreadTimeOut(true);
S3AsyncClient s3AsyncClient = S3AsyncClient.builder()
.asyncConfiguration(
ClientAsyncConfiguration.builder()
.advancedOption(
SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR,
futureCompletionExecutor
)
.build()
)
// "UnsupportedOperationException: Multipart download is not yet supported. Instead use the CRT based S3 client for multipart download."
// https://github.com/aws/aws-sdk-java-v2/blob/7fd8ea197ede83a663d6cd045ec83bd6047c8633/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartS3AsyncClient.java#L112-L118
// https://github.com/aws/aws-sdk-java-v2/blob/7fd8ea197ede83a663d6cd045ec83bd6047c8633/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/TransferManagerFactory.java#L76
// .multipartEnabled(true)
.build();
S3TransferManager s3TransferManager = S3TransferManager.builder()
.s3Client(s3AsyncClient)
.build();
// ...
DownloadFileRequest downloadFileRequest = DownloadFileRequest.builder()
.getObjectRequest(req -> req.bucket(bucket).key(key))
.destination(filePath.toFile())
.build();
FileDownload downloadStatus = s3TransferManager.downloadFile(downloadFileRequest);
downloadStatus.completionFuture().join(); // throws here
in production code that attempts to do many S3TransferManager calls, I get
java.util.concurrent.RejectedExecutionException: Thread limit exceeded replacing blocked worker
at java.util.concurrent.ForkJoinPool.tryCompensate(ForkJoinPool.java:2011)
at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3310)
at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
at java.util.concurrent.CompletableFuture.join(CompletableFuture.java:1947)
...
even though the executor is a ThreadPoolExecutor and RejectedExecutionHandler is set to ThreadPoolExecutor.CallerRunsPolicy.
Local testing on some scratch code shows that the CompletableFuture<CompletedFileDownload> object has stack.executor as null, and calling CompletableFutureUtils.forwardResultTo(sdkFuture, customFuture, futureCompletionExecutor); changes stack.executor to the Executor I provided. This seems to suggest the SDK was using the default ForkJoinPool Executor before I changed it via CompletableFutureUtils.forwardResultTo().
Scratch code (SDK: 2.26.9, software.amazon.awssdk.crt.aws-crt: 0.29.20):
// https://github.com/aws/aws-sdk-java-v2/blob/501e37cea262a4c1e676f0fcceb1ff09e9a07282/core/sdk-core/src/main/java/software/amazon/awssdk/core/client/builder/SdkDefaultClientBuilder.java#L487-L499
int processors = Runtime.getRuntime().availableProcessors();
int corePoolSize = Math.max(8, processors);
int maxPoolSize = Math.max(64, processors * 2);
int keepAliveTime = 10;
TimeUnit keepAliveTimeUnit = TimeUnit.SECONDS;
ThreadPoolExecutor futureCompletionExecutor = new ThreadPoolExecutor(
corePoolSize,
maxPoolSize,
keepAliveTime,
keepAliveTimeUnit,
new LinkedBlockingQueue<>(1_000),
new ThreadFactoryBuilder()
.threadNamePrefix("s3-transfer-manager-s3-async-client").build(),
new ThreadPoolExecutor.CallerRunsPolicy() // vs default of `ThreadPoolExecutor.AbortPolicy`
);
futureCompletionExecutor.allowCoreThreadTimeOut(true);
try (
S3AsyncClient s3AsyncClient = S3AsyncClient.crtBuilder()
.credentialsProvider(
ProfileCredentialsProvider.builder()
.profileName("<my-profile>")
.build())
.futureCompletionExecutor(futureCompletionExecutor)
.region(Region.US_EAST_1)
.build();
S3TransferManager tm = S3TransferManager.builder()
.s3Client(s3AsyncClient)
.build();
) {
DownloadFileRequest downloadFileRequest = DownloadFileRequest.builder()
.getObjectRequest(b -> b.bucket("<my-bucket>").key("<my-key>"))
.destination(Paths.get("C:\\Users\\<user>\\Downloads\\<filename>"))
.build();
FileDownload downloadFile = tm.downloadFile(downloadFileRequest);
CompletableFuture<CompletedFileDownload> completedFileDownloadCompletableFuture = downloadFile.completionFuture();
// Breakpoint: `completedFileDownloadCompletableFuture`'s `stack.executor` is `null`
CompletableFuture<CompletedFileDownload> forwardWithExecutorFuture = new CompletableFuture<>();
CompletableFutureUtils.forwardResultTo(completedFileDownloadCompletableFuture, forwardWithExecutorFuture, futureCompletionExecutor);
// Breakpoint: `completedFileDownloadCompletableFuture`'s `stack.executor` is `ThreadPoolExecutor`
CompletableFuture<CompletedFileDownload> forwardWithoutExecutorFuture = new CompletableFuture<>();
CompletableFutureUtils.forwardResultTo(forwardWithExecutorFuture, forwardWithoutExecutorFuture);
// Breakpoint: `forwardWithExecutorFuture`'s `stack.executor` is `null`
CompletedFileDownload downloadResult1 = completedFileDownloadCompletableFuture.join();
}
Possible Solution
No response
Additional Information/Context
JDK and SDK versions are provided in-line under Reproduction Steps.
Versions for the fields below will be based on production, which is using EMR 6.15.0.
AWS Java SDK version used
2.20.160
JDK version used
Amazon Corretto 8
Operating System and version
Amazon Linux 2.0.20240610.1
I've seen this behaviour too. If you have many parallel transfers inititated (more than your CPU count), and need concurrency, you may need to raise java.util.concurrent.ForkJoinPool.common.parallelism.
Hi @neverendingqs , could you provide the following information:
- the entire stacktrace
- the code of how you send multiple requests concurrently
- Are you using
CompletableFuture#runAsyncorCompletableFuture#supplyAsyncin your application? If no executor is provided, the task will be performed by ForkJoinPool
I see you have a support case open, feel free to share with them if it works better for you. I don't think we use ForkJoinPool in the SDK and I searched our code base and couldn't find places where we use CompletableFuture#xxxAsync methods in the S3 client without overriding the executor.
It looks like this issue has not been active for more than five days. In the absence of more information, we will be closing this issue soon. If you find that this is still a problem, please add a comment to prevent automatic closure, or if the issue is already closed please feel free to reopen it.
Hi @neverendingqs I've reviewed the code. It seems you are using parallel streams to invoke concurrent requests. By default, parallel streams use ForkJoinPool. If the behavior is not desired, I'd suggest using a custom executor to submit those requests instead.
StreamSupport.stream(var.spliterator(), true)
It looks like this issue has not been active for more than five days. In the absence of more information, we will be closing this issue soon. If you find that this is still a problem, please add a comment to prevent automatic closure, or if the issue is already closed please feel free to reopen it.