aws-sdk-java-v2 icon indicating copy to clipboard operation
aws-sdk-java-v2 copied to clipboard

`S3TransferManager` / `S3AsyncClient` does not seem to use `SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR`'s `Executor`.

Open neverendingqs opened this issue 1 year ago • 1 comments

Describe the bug

I think CompletableFutures seems to always be on or reverts to the default shared ForkJoinPool even if an Executor is provided via FUTURE_COMPLETION_EXECUTOR when making S3TransferManager calls.

Expected Behavior

If provided, CompletableFutures run on the Executor provided via FUTURE_COMPLETION_EXECUTOR when making S3TransferManager calls.

Current Behavior

CompletableFutures uses or reverts back to the default shared ForkJoinPool even if an Executor is provided via .futureCompletionExecutor() when making S3TransferManager calls.

Reproduction Steps

Disclaimer: this is the first time I've worked with Executors and CompletableFutures.

On 2.20.160 of the SDK (SDK version because it currently runs on EMR 6.15.0), when attempting something like

// https://github.com/aws/aws-sdk-java-v2/blob/501e37cea262a4c1e676f0fcceb1ff09e9a07282/core/sdk-core/src/main/java/software/amazon/awssdk/core/client/builder/SdkDefaultClientBuilder.java#L487-L499
int processors = Runtime.getRuntime().availableProcessors();
int corePoolSize = Math.max(8, processors);
int maxPoolSize = Math.max(64, processors * 2);
int keepAliveTime = 10;
TimeUnit keepAliveTimeUnit = TimeUnit.SECONDS;

ThreadPoolExecutor futureCompletionExecutor = new ThreadPoolExecutor(
	corePoolSize,
	maxPoolSize,
	keepAliveTime,
	keepAliveTimeUnit,
	new LinkedBlockingQueue<>(1_000),
	new ThreadFactoryBuilder()
		.threadNamePrefix("s3-transfer-manager-s3-async-client").build(),
	new ThreadPoolExecutor.CallerRunsPolicy()   // vs default of `ThreadPoolExecutor.AbortPolicy`
);
futureCompletionExecutor.allowCoreThreadTimeOut(true);

S3AsyncClient s3AsyncClient = S3AsyncClient.builder()
	.asyncConfiguration(
		ClientAsyncConfiguration.builder()
			.advancedOption(
				SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR,
				futureCompletionExecutor
			)
			.build()
	)
	// "UnsupportedOperationException: Multipart download is not yet supported. Instead use the CRT based S3 client for multipart download."
	// https://github.com/aws/aws-sdk-java-v2/blob/7fd8ea197ede83a663d6cd045ec83bd6047c8633/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartS3AsyncClient.java#L112-L118
	// https://github.com/aws/aws-sdk-java-v2/blob/7fd8ea197ede83a663d6cd045ec83bd6047c8633/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/TransferManagerFactory.java#L76
	// .multipartEnabled(true)
	.build();

S3TransferManager s3TransferManager = S3TransferManager.builder()
	.s3Client(s3AsyncClient)
	.build();
	
// ...

DownloadFileRequest downloadFileRequest = DownloadFileRequest.builder()
	.getObjectRequest(req -> req.bucket(bucket).key(key))
	.destination(filePath.toFile())
	.build();

FileDownload downloadStatus = s3TransferManager.downloadFile(downloadFileRequest);
downloadStatus.completionFuture().join();	// throws here

in production code that attempts to do many S3TransferManager calls, I get

java.util.concurrent.RejectedExecutionException: Thread limit exceeded replacing blocked worker
	at java.util.concurrent.ForkJoinPool.tryCompensate(ForkJoinPool.java:2011)
	at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3310)
	at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
	at java.util.concurrent.CompletableFuture.join(CompletableFuture.java:1947)
	...

even though the executor is a ThreadPoolExecutor and RejectedExecutionHandler is set to ThreadPoolExecutor.CallerRunsPolicy.

Local testing on some scratch code shows that the CompletableFuture<CompletedFileDownload> object has stack.executor as null, and calling CompletableFutureUtils.forwardResultTo(sdkFuture, customFuture, futureCompletionExecutor); changes stack.executor to the Executor I provided. This seems to suggest the SDK was using the default ForkJoinPool Executor before I changed it via CompletableFutureUtils.forwardResultTo().

Scratch code (SDK: 2.26.9, software.amazon.awssdk.crt.aws-crt: 0.29.20):

// https://github.com/aws/aws-sdk-java-v2/blob/501e37cea262a4c1e676f0fcceb1ff09e9a07282/core/sdk-core/src/main/java/software/amazon/awssdk/core/client/builder/SdkDefaultClientBuilder.java#L487-L499
int processors = Runtime.getRuntime().availableProcessors();
int corePoolSize = Math.max(8, processors);
int maxPoolSize = Math.max(64, processors * 2);
int keepAliveTime = 10;
TimeUnit keepAliveTimeUnit = TimeUnit.SECONDS;

ThreadPoolExecutor futureCompletionExecutor = new ThreadPoolExecutor(
	corePoolSize,
	maxPoolSize,
	keepAliveTime,
	keepAliveTimeUnit,
	new LinkedBlockingQueue<>(1_000),
	new ThreadFactoryBuilder()
		.threadNamePrefix("s3-transfer-manager-s3-async-client").build(),
	new ThreadPoolExecutor.CallerRunsPolicy()   // vs default of `ThreadPoolExecutor.AbortPolicy`
);
futureCompletionExecutor.allowCoreThreadTimeOut(true);

try (
	S3AsyncClient s3AsyncClient = S3AsyncClient.crtBuilder()
		.credentialsProvider(
			ProfileCredentialsProvider.builder()
				.profileName("<my-profile>")
				.build())
		.futureCompletionExecutor(futureCompletionExecutor)
		.region(Region.US_EAST_1)
		.build();

	S3TransferManager tm = S3TransferManager.builder()
		.s3Client(s3AsyncClient)
		.build();
) {

	DownloadFileRequest downloadFileRequest = DownloadFileRequest.builder()
		.getObjectRequest(b -> b.bucket("<my-bucket>").key("<my-key>"))
		.destination(Paths.get("C:\\Users\\<user>\\Downloads\\<filename>"))
		.build();

	FileDownload downloadFile = tm.downloadFile(downloadFileRequest);

	CompletableFuture<CompletedFileDownload> completedFileDownloadCompletableFuture = downloadFile.completionFuture();

	// Breakpoint: `completedFileDownloadCompletableFuture`'s `stack.executor` is `null`
	CompletableFuture<CompletedFileDownload> forwardWithExecutorFuture = new CompletableFuture<>();
	CompletableFutureUtils.forwardResultTo(completedFileDownloadCompletableFuture, forwardWithExecutorFuture, futureCompletionExecutor);

	// Breakpoint: `completedFileDownloadCompletableFuture`'s `stack.executor` is `ThreadPoolExecutor`
	CompletableFuture<CompletedFileDownload> forwardWithoutExecutorFuture = new CompletableFuture<>();
	CompletableFutureUtils.forwardResultTo(forwardWithExecutorFuture, forwardWithoutExecutorFuture);

	// Breakpoint: `forwardWithExecutorFuture`'s `stack.executor` is `null`
	CompletedFileDownload downloadResult1 = completedFileDownloadCompletableFuture.join();
}

Possible Solution

No response

Additional Information/Context

JDK and SDK versions are provided in-line under Reproduction Steps.

Versions for the fields below will be based on production, which is using EMR 6.15.0.

AWS Java SDK version used

2.20.160

JDK version used

Amazon Corretto 8

Operating System and version

Amazon Linux 2.0.20240610.1

neverendingqs avatar Jun 27 '24 14:06 neverendingqs

I've seen this behaviour too. If you have many parallel transfers inititated (more than your CPU count), and need concurrency, you may need to raise java.util.concurrent.ForkJoinPool.common.parallelism.

Moloch-az avatar Jul 02 '24 06:07 Moloch-az

Hi @neverendingqs , could you provide the following information:

  • the entire stacktrace
  • the code of how you send multiple requests concurrently
  • Are you using CompletableFuture#runAsync or CompletableFuture#supplyAsync in your application? If no executor is provided, the task will be performed by ForkJoinPool

I see you have a support case open, feel free to share with them if it works better for you. I don't think we use ForkJoinPool in the SDK and I searched our code base and couldn't find places where we use CompletableFuture#xxxAsync methods in the S3 client without overriding the executor.

zoewangg avatar Sep 07 '24 00:09 zoewangg

It looks like this issue has not been active for more than five days. In the absence of more information, we will be closing this issue soon. If you find that this is still a problem, please add a comment to prevent automatic closure, or if the issue is already closed please feel free to reopen it.

github-actions[bot] avatar Sep 17 '24 00:09 github-actions[bot]

Hi @neverendingqs I've reviewed the code. It seems you are using parallel streams to invoke concurrent requests. By default, parallel streams use ForkJoinPool. If the behavior is not desired, I'd suggest using a custom executor to submit those requests instead.

StreamSupport.stream(var.spliterator(), true)

zoewangg avatar Sep 17 '24 16:09 zoewangg

It looks like this issue has not been active for more than five days. In the absence of more information, we will be closing this issue soon. If you find that this is still a problem, please add a comment to prevent automatic closure, or if the issue is already closed please feel free to reopen it.

github-actions[bot] avatar Sep 27 '24 18:09 github-actions[bot]