gax-java
gax-java copied to clipboard
BatcherImpl.add can hang if an exception is thrown starting a call
trafficstars
Environment details
- Programming language: java
- OS: MacOS 12.6
- Language runtime version: 11.0.16.1
- Package version: 2.19.2
Steps to reproduce
Failing unit test:
BatchingDescriptor<Object, Object, Object, Object> batchingDescriptor = new BatchingDescriptor<Object, Object, Object, Object>() {
@Override
public BatchingRequestBuilder<Object, Object> newRequestBuilder(Object o) {
return new BatchingRequestBuilder<Object, Object>() {
@Override
public void add(Object o) { }
@Override
public Object build() {
return new Object();
}
};
}
@Override
public void splitResponse(Object o, List<BatchEntry<Object, Object>> list) {
for (BatchEntry<Object, Object> e : list) {
e.getResultFuture().set(new Object());
}
}
@Override
public void splitException(Throwable throwable, List<BatchEntry<Object, Object>> list) {
for (BatchEntry<Object, Object> e : list) {
e.getResultFuture().setException(new RuntimeException("fake"));
}
}
@Override
public long countBytes(Object o) {
return 1;
}
};
UnaryCallable<Object, Object> unaryCallable =new UnaryCallable<Object, Object>() {
@Override
public ApiFuture<Object> futureCall(Object o, ApiCallContext apiCallContext) {
throw new RuntimeException("this is not bubbled up!");
}
};
Object prototype = new Object();
BatchingSettings batchingSettings = BatchingSettings.newBuilder()
.setDelayThreshold(Duration.ofSeconds(1))
.setElementCountThreshold(100L)
.setRequestByteThreshold(100L)
.setFlowControlSettings(
FlowControlSettings.getDefaultInstance()
)
.build();
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
FlowController flowController = new FlowController(batchingSettings.getFlowControlSettings());
ApiCallContext callContext = FakeCallContext.createDefault();
BatcherImpl<Object, Object, Object, Object> batcher = new BatcherImpl<>(
batchingDescriptor, unaryCallable, prototype, batchingSettings, executor, flowController, callContext
);
ApiFuture<Object> f = batcher.add(new Object());
// get never returns
Assert.assertThrows(ExecutionException.class, f::get);
Proposed solution
BatcherImpl#sendOutstanding() should catch exceptions from futureCall() and treat it as if the RPC failed