`ServerStream.stream()` should release underlying resources when calling `Stream.close()` method
Is your feature request related to a problem? Please describe.
When I use ServerStream.stream() and call any Stream terminal operation that does not consume all elements of the stream like findAny(), anyMatch(Predicate<?>), all underlying resources of the ServerStream are not released.
ServerStream<X> serverStream= ...;
try(Stream<X> stream = serverStream.stream()) {
//consume partially the stream
}
//At this point serverStream underlying resources are not released
Describe the solution you'd like
The derived stream should close all underlying resources of the main ServerStream when it is closed.
To do that, the stream() method should register an onClose(Runnable) on the stream before returning it.
This Runnable should cancel the ServerStream when the stream is not fully consumed.
With this, the stream user should add the statement try-with-resources to ensure all underlying resources are released in all cases.
ServerStream<X> serverStream= ...;
try(Stream<X> stream = serverStream.stream()) {
//consume totally or partially the stream
}
//At this point serverStream is fully consumed or canceled
Proposal implementation steps
In ServerStreamIterator, a new method can be created to be able to know if the iterator is fully consumed.
boolean isFullyConsumed() {
return last == QueuingResponseObserver.EOF_MARKER;
}
In ServerStream, a new method can be created to cancel only when iterator is not fully consumed.
void cancelIfNecessary() {
if (!iterator.isFullyConsumed()){
cancel();
}
}
In ServerStream, the stream() method can be updated to register the onClose(Runnable) to call the above method
public Stream<V> stream() {
return StreamSupport.stream(this.spliterator(), false).onClose(this::cancelIfNecessary);
}
Describe alternatives you've considered I have tried two different workarounds.
Register a onClose(Runnable) that cancel the ServerStream
ServerStream<X> serverStream= ...;
return serverStream.stream().onClose(serverStream::cancel)
But with this, ServerStream is canceled even if all its items have been successfully consumed. I don't think it's a good practice to do this.
Not use stream() method, but create the stream from the iterator() and register the onClose(Runnable) on it that check result of iterator.hasNext() to cancel or not the ServerStream
ServerStream<X> serverStream= ...;
Iterator<X> iterator = serverStream.iterator();
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, 0), false).onClose(() -> {
if (iterator.hasNext()) {
serverStream.cancel();
}
})
But iterator.hasNext() can force the Thread to wait until next element is received, which is a waste of time in that case.