gapic-generator-java icon indicating copy to clipboard operation
gapic-generator-java copied to clipboard

`ServerStream.stream()` should release underlying resources when calling `Stream.close()` method

Open rossignolloic opened this issue 1 year ago • 0 comments

Is your feature request related to a problem? Please describe. When I use ServerStream.stream() and call any Stream terminal operation that does not consume all elements of the stream like findAny(), anyMatch(Predicate<?>), all underlying resources of the ServerStream are not released.

ServerStream<X> serverStream= ...;
try(Stream<X> stream = serverStream.stream()) {
    //consume partially the stream
}
//At this point serverStream underlying resources are not released

Describe the solution you'd like The derived stream should close all underlying resources of the main ServerStream when it is closed. To do that, the stream() method should register an onClose(Runnable) on the stream before returning it. This Runnable should cancel the ServerStream when the stream is not fully consumed. With this, the stream user should add the statement try-with-resources to ensure all underlying resources are released in all cases.

ServerStream<X> serverStream= ...;
try(Stream<X> stream = serverStream.stream()) {
    //consume totally or partially the stream
}
//At this point serverStream is fully consumed or canceled

Proposal implementation steps

In ServerStreamIterator, a new method can be created to be able to know if the iterator is fully consumed.

boolean isFullyConsumed() {
    return last == QueuingResponseObserver.EOF_MARKER;
}

In ServerStream, a new method can be created to cancel only when iterator is not fully consumed.

void cancelIfNecessary() {
    if (!iterator.isFullyConsumed()){
        cancel();
    }
}

In ServerStream, the stream() method can be updated to register the onClose(Runnable) to call the above method

public Stream<V> stream() {
    return StreamSupport.stream(this.spliterator(), false).onClose(this::cancelIfNecessary);
}

Describe alternatives you've considered I have tried two different workarounds.

Register a onClose(Runnable) that cancel the ServerStream

ServerStream<X> serverStream= ...;
return serverStream.stream().onClose(serverStream::cancel)

But with this, ServerStream is canceled even if all its items have been successfully consumed. I don't think it's a good practice to do this.

Not use stream() method, but create the stream from the iterator() and register the onClose(Runnable) on it that check result of iterator.hasNext() to cancel or not the ServerStream

ServerStream<X> serverStream= ...;
Iterator<X> iterator = serverStream.iterator();
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, 0), false).onClose(() -> {
    if (iterator.hasNext()) {
        serverStream.cancel();
    }
})

But iterator.hasNext() can force the Thread to wait until next element is received, which is a waste of time in that case.

rossignolloic avatar Nov 08 '24 16:11 rossignolloic