aws-sdk-java-v2 icon indicating copy to clipboard operation
aws-sdk-java-v2 copied to clipboard

S3 getObject combined with AsyncResponseTransformer.toBytes() copies too much data

Open chibenwa opened this issue 3 years ago • 1 comments

Describe the bug

Given the following piece of code:

client.getObject(
                    builder -> builder.bucket(bucket),
                    AsyncResponseTransformer.toBytes())

Several data copies do take place:

  • Netty ByteBuf get's copied into a heap ByteBuffer. I understand this copy is done to present a common abstraction level for all transports. Yet maybe ByteBuf::nioBuffers could be relied on to minimize copies. I did not expore this option myself.
  • AsyncResponseTransformer.toBytes() relies internally on an unsized ByteArrayOutputStream (that thus will expend many times) and the result will be copied when calling toByteArray() and another defensive copy is carried other when transforming the future.

Expected Behavior

Minimize memory copies in such a scenario to minimize heap pressure.

Current Behavior

Screenshot from 2022-05-16 10-20-07

Reproduction Steps

client.getObject(
                    builder -> builder.bucket(bucket),
                    AsyncResponseTransformer.toBytes())

I used https://github.com/jvm-profiling-tools/async-profiler for the flame graphs.

Possible Solution

  • Remove needless defensive copies inside ByteArrayAsyncResponseTransformer. The byte array is passed to the caller, who then becomes responsible of it, and nobody else references the old byte array once the publisher completes. This can be an instant win coming at a very low price.
  • Rely on GetResponse::contentLength to size a byte array and copy incoming buffers to it in place. This requires knowledge about response type... Thus this might be hardly doable in a generic fashion.

I tested this successfully:

package org.apache.james.blob.objectstorage.aws;

import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import software.amazon.awssdk.core.ResponseBytes;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.async.SdkPublisher;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;

/**
* Class copied for {@link software.amazon.awssdk.core.internal.async.ByteArrayAsyncResponseTransformer}
*
* Modified to take advantage of the content length of the get response in order to use a sized array
* upon content copy. This avoids the usage of a ByteArrayOutputStream that yields additional copies
* (resizing upon copy, copy of the resulting byte array).
*
* A defensive copy upon returning the result is also removed (responsibility transfered to the caller, no other usages)
*/
public class MinimalCopyBytesResponseTransformer implements AsyncResponseTransformer<GetObjectResponse, ResponseBytes<GetObjectResponse>> {
   private volatile CompletableFuture<byte[]> cf;
   private volatile GetObjectResponse response;

   public MinimalCopyBytesResponseTransformer() {

   }

   public CompletableFuture<ResponseBytes<GetObjectResponse>> prepare() {
       this.cf = new CompletableFuture();
       // Modifcation: Remove a defensive copy of the buffer upon completion: the caller is now the sole user of the array
       return this.cf.thenApply(arr -> ResponseBytes.fromByteArrayUnsafe(response, arr));
   }

   public void onResponse(GetObjectResponse response) {
       this.response = response;
   }

   public void onStream(SdkPublisher<ByteBuffer> publisher) {
       publisher.subscribe(new BaosSubscriber(this.cf, response.contentLength().intValue()));
   }

   public void exceptionOccurred(Throwable throwable) {
       this.cf.completeExceptionally(throwable);
   }

   static class BaosSubscriber implements Subscriber<ByteBuffer> {
       private final CompletableFuture<byte[]> resultFuture;
       // Modification: use a byte array instead of the ByteArrayInputStream and track position
       private final byte[] buffer;
       private int pos = 0;
       private Subscription subscription;

       BaosSubscriber(CompletableFuture<byte[]> resultFuture, int size) {
           this.resultFuture = resultFuture;
           this.buffer = new byte[size];
       }

       public void onSubscribe(Subscription s) {
           if (this.subscription != null) {
               s.cancel();
           } else {
               this.subscription = s;
               this.subscription.request(9223372036854775807L);
           }
       }

       public void onNext(ByteBuffer byteBuffer) {
           // Modification: copy the response part in place into the result buffer and track position
           int written = byteBuffer.remaining();
           byteBuffer.get(buffer, pos, written);
           pos += written;
           this.subscription.request(1L);
       }

       public void onError(Throwable throwable) {
           this.resultFuture.completeExceptionally(throwable);
       }

       public void onComplete() {
           this.resultFuture.complete(this.buffer);
       }
   }
}

Additional Information/Context

Apache James relies on the S3 driver for email content, thus speed and heap impact is important for us as S3 getObject will be called on each email content read.

AWS Java SDK version used

2.17.189

JDK version used

openjdk version "11.0.15" 2022-04-19

Operating System and version

Ubuntu 20.04.4 LTS

chibenwa avatar May 16 '22 03:05 chibenwa

@chibenwa thank you for reaching out, added to our backlog. Since it's an optimization I'm changing to feature request.

debora-ito avatar May 16 '22 22:05 debora-ito

The same problem is present for putObject

Dunemaster avatar Jan 31 '23 22:01 Dunemaster

Just spotted this issue- I raised a near duplicate as https://github.com/aws/aws-sdk-java-v2/issues/4392 (apologies for not searching existing issues, doh), and a PR to fix it in https://github.com/aws/aws-sdk-java-v2/pull/4355.

rtyley avatar Sep 05 '23 11:09 rtyley