grpc-java icon indicating copy to clipboard operation
grpc-java copied to clipboard

BlockingStub returns iterator that cannot be closed

Open jonnyzzz opened this issue 8 years ago • 10 comments

I use a blocking sub for a method that returns a stream of messages. The return type of it is Iterator<T>. It turns out I cannot make grpc client to end the request (e.g. because of client failure). There are a number of cases where an open connection may stuck in client because of that

The implementation uses io.grpc.stub.ClientCalls#blockingServerStreamingCall(io.grpc.Channel, io.grpc.MethodDescriptor<ReqT,RespT>, io.grpc.CallOptions, ReqT)

call, which in turn uses BlockingResponseStream. There is no way to reach the io.grpc.ClientCall#cancel method form the Iterator. Could be nice to wrap an Iterator or make it implement an interface in the similar way as done with ServerStreamObserver/ClientStreamObserver The other approach could be to implement AutoCloseable from Java with this iterator

There is also a comment on the Iterator implementation class

  // TODO(ejona86): determine how to allow ClientCall.cancel() in case of application error.

jonnyzzz avatar Nov 08 '16 11:11 jonnyzzz

I'm not wild about how the iterator has turned out either. Streaming+blocking in grpc-java is weak/missing. To do AutoCloseable we'd need to create a new interface that extends both AutoCloseable and Iterator.

The main way to cancel would be to create and attach a CancellableContext, and then cancel the context as you leave the block. You should always cancel the CancellableContext when the unit of work it encompases is complete, even for success.

ejona86 avatar Nov 08 '16 16:11 ejona86

I decided to use blocking API in tests, to simplify calls I only use in tests. It was not clear form the code. I was no javadoc hints for that. Will try CancellableContext.

jonnyzzz avatar Nov 08 '16 20:11 jonnyzzz

Another option would be to have a method that returns a java.util.stream.Stream<RespT> since streams are AutoCloseable, but I'm guessing grpc-stub can't use Java 8 features.

jhaber avatar Jun 27 '18 19:06 jhaber

I'd like to add that not only an application error can prevent closing the iterator. I just tracked down a "direct buffer" ("LEAK: ByteBuf.release() was not called before...") memory leak in our system.

It was caused by not iterating through all elements returned by the blocking stub's Iterator (BlockingResponseStream):

stub.getElements(request)
    .asSequence()
    .any { it.someProperty == 1234 }

This is kotlin code and it looks very innocent. The .asSequence call converts the Iterator returned from the blocking stub to a Sequence, on which we can call .map/.filter/.any etc.

However, because a sequence is evaluated lazily, as soon as any finds a matching object the remaining elements are not going to be processed, leaving the iterator "open" and causing the leak, apparently.

systemfreund avatar Dec 28 '18 20:12 systemfreund

@systemfreund There isn't anything we can really do about that. The suggestion to use CancellableContext is the way to prevent such leaks. Any other approaches are likely going to have bad side effects, worse than the occasional nag log.

carl-mastrangelo avatar Jan 03 '19 21:01 carl-mastrangelo

@carl-mastrangelo Even CancellableContext does not work correctly, because the onClose() method of any attached ClientInterceptor is not called in the case of a cancellation. This might be solveable by draining the ThreadlessExecutor of ClientCalls.blockingServerStreamingCall in a Context.CancellationListener?

pskiwi avatar Oct 09 '19 09:10 pskiwi

@pskiwi, I think the behavior you are referring to was fixed in #6255.

ejona86 avatar Oct 09 '19 13:10 ejona86

@ejona86 I tried, but it doesn't seem to fix my problem. I was hoping, the following test code should work: ClientCallsTest.java

Is this expected to work?

pskiwi avatar Oct 10 '19 13:10 pskiwi

@pskiwi, no, that wouldn't work. After the cancellation you need to "drain" the Iterator; keep going until you get hasNext() == false. If you have a while (iter.hasNext()) iter.next(); after the finally, then it would work.

Draining the ThreadlessExecutor from a Context.CancellationListener scares me because of the threading involved.

It would be possible to add a close() method to the Iterator (and require you to cast), but in general we're not wild about the blocking iterator API as it is far too hard to use properly. We think the proper fix here is to have a "real" blocking API.

ejona86 avatar Oct 10 '19 16:10 ejona86

Does anyone have any examples for the use of CancellableContext with a blocking stub? I'm having a hard time finding a good official source after an hour of searching.

mikeholler avatar Oct 25 '19 19:10 mikeholler