gax-java icon indicating copy to clipboard operation
gax-java copied to clipboard

BatcherImpl.add can hang if an exception is thrown starting a call

Open igorbernstein2 opened this issue 3 years ago • 0 comments
trafficstars

Environment details

  • Programming language: java
  • OS: MacOS 12.6
  • Language runtime version: 11.0.16.1
  • Package version: 2.19.2

Steps to reproduce

Failing unit test:

BatchingDescriptor<Object, Object, Object, Object> batchingDescriptor = new BatchingDescriptor<Object, Object, Object, Object>() {
      @Override
      public BatchingRequestBuilder<Object, Object> newRequestBuilder(Object o) {
        return new BatchingRequestBuilder<Object, Object>() {
          @Override
          public void add(Object o) { }

          @Override
          public Object build() {
            return new Object();
          }
        };
      }

      @Override
      public void splitResponse(Object o, List<BatchEntry<Object, Object>> list) {
        for (BatchEntry<Object, Object> e : list) {
          e.getResultFuture().set(new Object());
        }
      }

      @Override
      public void splitException(Throwable throwable, List<BatchEntry<Object, Object>> list) {
        for (BatchEntry<Object, Object> e : list) {
          e.getResultFuture().setException(new RuntimeException("fake"));
        }
      }

      @Override
      public long countBytes(Object o) {
        return 1;
      }
    };

    UnaryCallable<Object, Object> unaryCallable =new UnaryCallable<Object, Object>() {
      @Override
      public ApiFuture<Object> futureCall(Object o, ApiCallContext apiCallContext) {
        throw new RuntimeException("this is not bubbled up!");
      }
    };
    Object prototype = new Object();
    BatchingSettings batchingSettings = BatchingSettings.newBuilder()
        .setDelayThreshold(Duration.ofSeconds(1))
        .setElementCountThreshold(100L)
        .setRequestByteThreshold(100L)
        .setFlowControlSettings(
            FlowControlSettings.getDefaultInstance()
        )
        .build();
    ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
    FlowController flowController = new FlowController(batchingSettings.getFlowControlSettings());
    ApiCallContext callContext = FakeCallContext.createDefault();

    BatcherImpl<Object, Object, Object, Object> batcher = new BatcherImpl<>(
        batchingDescriptor, unaryCallable, prototype, batchingSettings, executor, flowController, callContext
    );

    ApiFuture<Object> f = batcher.add(new Object());
    // get never returns
    Assert.assertThrows(ExecutionException.class, f::get);

Proposed solution

BatcherImpl#sendOutstanding() should catch exceptions from futureCall() and treat it as if the RPC failed

igorbernstein2 avatar Oct 18 '22 19:10 igorbernstein2