grpc-java
grpc-java copied to clipboard
BlockingStub returns iterator that cannot be closed
I use a blocking sub for a method that returns a stream of messages. The return type of it is Iterator<T>
. It turns out I cannot make grpc client to end the request (e.g. because of client failure). There are a number of cases where an open connection may stuck in client because of that
The implementation uses
io.grpc.stub.ClientCalls#blockingServerStreamingCall(io.grpc.Channel, io.grpc.MethodDescriptor<ReqT,RespT>, io.grpc.CallOptions, ReqT)
call, which in turn uses BlockingResponseStream
. There is no way to reach the io.grpc.ClientCall#cancel
method form the Iterator. Could be nice to wrap an Iterator or make it implement an interface in the similar way as done with ServerStreamObserver/ClientStreamObserver
The other approach could be to implement AutoCloseable
from Java with this iterator
There is also a comment on the Iterator implementation class
// TODO(ejona86): determine how to allow ClientCall.cancel() in case of application error.
I'm not wild about how the iterator has turned out either. Streaming+blocking in grpc-java is weak/missing. To do AutoCloseable
we'd need to create a new interface that extends both AutoCloseable
and Iterator
.
The main way to cancel would be to create and attach a CancellableContext
, and then cancel the context as you leave the block. You should always cancel the CancellableContext
when the unit of work it encompases is complete, even for success.
I decided to use blocking API in tests, to simplify calls I only use in tests. It was not clear form the code. I was no javadoc hints for that. Will try CancellableContext
.
Another option would be to have a method that returns a java.util.stream.Stream<RespT>
since streams are AutoCloseable
, but I'm guessing grpc-stub can't use Java 8 features.
I'd like to add that not only an application error can prevent closing the iterator. I just tracked down a "direct buffer" ("LEAK: ByteBuf.release() was not called before...") memory leak in our system.
It was caused by not iterating through all elements returned by the blocking stub's Iterator
(BlockingResponseStream
):
stub.getElements(request)
.asSequence()
.any { it.someProperty == 1234 }
This is kotlin code and it looks very innocent. The .asSequence
call converts the Iterator
returned from the blocking stub to a Sequence
, on which we can call .map
/.filter
/.any
etc.
However, because a sequence is evaluated lazily, as soon as any
finds a matching object the remaining elements are not going to be processed, leaving the iterator "open" and causing the leak, apparently.
@systemfreund There isn't anything we can really do about that. The suggestion to use CancellableContext
is the way to prevent such leaks. Any other approaches are likely going to have bad side effects, worse than the occasional nag log.
@carl-mastrangelo Even CancellableContext
does not work correctly, because the onClose()
method of any attached ClientInterceptor
is not called in the case of a cancellation.
This might be solveable by draining the ThreadlessExecutor
of ClientCalls.blockingServerStreamingCall in a Context.CancellationListener
?
@pskiwi, I think the behavior you are referring to was fixed in #6255.
@ejona86 I tried, but it doesn't seem to fix my problem. I was hoping, the following test code should work: ClientCallsTest.java
Is this expected to work?
@pskiwi, no, that wouldn't work. After the cancellation you need to "drain" the Iterator; keep going until you get hasNext() == false. If you have a while (iter.hasNext()) iter.next();
after the finally
, then it would work.
Draining the ThreadlessExecutor from a Context.CancellationListener scares me because of the threading involved.
It would be possible to add a close() method to the Iterator (and require you to cast), but in general we're not wild about the blocking iterator API as it is far too hard to use properly. We think the proper fix here is to have a "real" blocking API.
Does anyone have any examples for the use of CancellableContext
with a blocking stub? I'm having a hard time finding a good official source after an hour of searching.