grpc-java icon indicating copy to clipboard operation
grpc-java copied to clipboard

BlockingClientCall should be AutoCloseable

Open benjaminp opened this issue 5 months ago • 3 comments

Using the blocking v2 stubs, I've often found myself using a try-finally pattern like this:

var call = SomeGrpc.newBlockingV2Stub(channel).bidiRpc();
try {
  while (true) {
    var serverMessage = call.read();
    if (serverMessage == null) {
      break;
    }
    var clientMessage = doSomeWork(serverMessage);
    call.write(clientMessage);
  }
  call.halfClose();
} finally {
  call.cancel("done", null);
}

The idea is that if doSomeWork throws an exception, the call is properly cleaned up.

This pattern is cumbersome, though. I also believe the unconditional cancel invocation can send a superfluous RST_STREAM frame in the case that the call is already successfully closed.

It would be nice if BlockingClientCall had a close() that did whatever was necessary to clean up the call. Then, try-with-resources could be used like this:

try (var call = SomeGrpc.newBlockingV2Stub(channel).bidiRpc()) {
  while (true) {
    var serverMessage = call.read();
    if (serverMessage == null) {
      break;
    }
    var clientMessage = doSomeWork(serverMessage);
    call.write(clientMessage);
  }
  call.halfClose();
}

benjaminp avatar Jul 02 '25 02:07 benjaminp

Sending RST_STREAM happens in the transport and stream in different places dealing with scenarios such as detecting server stream closed, data write errors or errors in flow control. io.grpc.internal.Stream exposes no such method for call implementations to know if a RST_STREAM has already been sent. Even if we handle it internally within the stream implementations to avoid sending a duplicate RST_STREAM the work will will be in the stream and transport rather than in BlockingClientCall.

kannanjgithub avatar Jul 09 '25 18:07 kannanjgithub

One problem with Closeable is linters. There are linters that track Closeables and if you don't call close() then they complain. There will be no way for the linter to know whether the RPC was already closed, or deal with in a multi-threaded fashion. So it isn't just a convenience for you, but changes the calling expectations for many other cases too. Maybe the world has changed some here; it seems ErrorProne isn't as strict as it once was check1, check2, as it is now opt-in (and we wouldn't mark our API @MustBeClosed). I'm not really confident that won't change in the future without notice.

There's also the problem that we want to provide cancel information into the call so when you see the StatusException(CANCELLED) you know what happened. Unfortunately, Java's try-with-resources is the most limited form of such a feature among languages and doesn't include the exception information. Basically, I'd encourage code more similar to:

try (var call = SomeGrpc.newBlockingV2Stub(channel).bidiRpc()) {
  while (true) {
    var serverMessage = call.read();
    if (serverMessage == null) {
      break;
    }
    var clientMessage = doSomeWork(serverMessage);
    call.write(clientMessage);
  }
  // call.halfClose(); // This halfClose did nothing, because the RPC was already complete
} catch (SomeException ex) {
  call.cancel(null, ex);
  throw ex;
} // but in case of bug in the while loop, or a RuntimeException, this would leak

I also believe the unconditional cancel invocation can send a superfluous RST_STREAM frame in the case that the call is already successfully closed.

No, if read() returned null then it wouldn't send anything. There may be some bookkeeping inside gRPC, but nothing on-the-wire.

Could we add a new API to opt-in?

// This gets us try-with-resources, but seems no better than try-finally?
var call = SomeGrpc.newBlockingV2Stub(channel).bidiRpc();
try (var trash = call.asCloseable("exited try")) { // Maybe? String is in case reader is on another thread
  while (true) {
    var serverMessage = call.read();
    if (serverMessage == null) {
      break;
    }
    var clientMessage = doSomeWork(serverMessage);
    call.write(clientMessage);
  }
} // call.cancel("exited try", null) is called

Yeah, it seems we either need to make BlockingClientCall closeable, or do nothing. Things look different if you do reading and writing on different threads, but you'd still likely want the try-with-resources in the reading thread.

Note: It seems you are reading and writing on the same thread. That was a use-case I was pretty interested in, but others had trouble understanding the use-case. So the API right now is mostly assuming you are using different threads for reading and writing; but we can add some new APIs to allow you to read+write on the same thread simultaneously.

ejona86 avatar Jul 24 '25 04:07 ejona86

One problem with Closeable is linters. There are linters that track Closeables and if you don't call close() then they complain. There will be no way for the linter to know whether the RPC was already closed, or deal with in a multi-threaded fashion. So it isn't just a convenience for you, but changes the calling expectations for many other cases too. Maybe the world has changed some here; it seems ErrorProne isn't as strict as it once was check1, check2, as it is now opt-in (and we wouldn't mark our API @MustBeClosed). I'm not really confident that won't change in the future without notice.

Are these linters specific to Closeable? Note I'm asking about AutoCloseable, which is a superinterface of Closeable and perhaps is not historically subject to these kinds of nits.

There's also the problem that we want to provide cancel information into the call so when you see the StatusException(CANCELLED) you know what happened.

Who is going to see this StatusException(CANCELLED)? I think only a stream tracer can possibly observe it.

Unfortunately, Java's try-with-resources is the most limited form of such a feature among languages and doesn't include the exception information. Basically, I'd encourage code more similar to:

try (var call = SomeGrpc.newBlockingV2Stub(channel).bidiRpc()) { while (true) { var serverMessage = call.read(); if (serverMessage == null) { break; } var clientMessage = doSomeWork(serverMessage); call.write(clientMessage); } // call.halfClose(); // This halfClose did nothing, because the RPC was already complete } catch (SomeException ex) { call.cancel(null, ex); throw ex; } // but in case of bug in the while loop, or a RuntimeException, this would leak

Yeah, I'm sympathetic to the need to enhance cancellation causes, but I'm most worried about leaks. I suspect the need for cancellation causes is lessened with the blocking client also. If you see the stacktrace of whatever exits the try block, you can safely assume that was the cancellation cause. BlockingClientCall should be like a normal java resource; e.g., you generally don't need to wonder why a OutputStream/InputStream was closed.

I also believe the unconditional cancel invocation can send a superfluous RST_STREAM frame in the case that the call is already successfully closed.

No, if read() returned null then it wouldn't send anything. There may be some bookkeeping inside gRPC, but nothing on-the-wire.

Let's suppose the other side has closed. Where in the the cancel path do we not send a RST_STREAM?

  1. BlockingClientCall.cancel simply calls ClientCallImpl.cancel.
  2. ClientCallImpl.cancel calls cancelInternal. cancelCalled = false and stream != null, so we call stream.cancel.
  3. AbstractClientStream.cancel forwards to NettyClientStream.Sink.cancel.
  4. NettyClientStream.sink.cancel unconditionally enqueues CancelClientStreamCommand.
  5. (asynchronously) NettyClientHandler.cancelStream is invoked. cmd.stream().isNonExistent() is false because setNonExistent() is only called by createStream(), so we call encoder().writeRstStream().
  6. Now in netty, we end up in Http2ConnectionHandler.resetStream. Here, there is a lot of complex netty state, but it seems we will end up in DefaultHttp2FrameWriter.writeRstFrame putting a RST_STREAM frame on to the connection...

Could we add a new API to opt-in?

// This gets us try-with-resources, but seems no better than try-finally? var call = SomeGrpc.newBlockingV2Stub(channel).bidiRpc(); try (var trash = call.asCloseable("exited try")) { // Maybe? String is in case reader is on another thread while (true) { var serverMessage = call.read(); if (serverMessage == null) { break; } var clientMessage = doSomeWork(serverMessage); call.write(clientMessage); } } // call.cancel("exited try", null) is called

Yeah, it seems we either need to make BlockingClientCall closeable, or do nothing. Things look different if you do reading and writing on different threads, but you'd still likely want the try-with-resources in the reading thread.

Note: It seems you are reading and writing on the same thread. That was a use-case I was pretty interested in, but others had trouble understanding the use-case. So the API right now is mostly assuming you are using different threads for reading and writing; but we can add some new APIs to allow you to read+write on the same thread simultaneously.

Yes, I would like an API that blocked until I can read or write. This is probably another issue!

benjaminp avatar Jul 24 '25 05:07 benjaminp