grpc-java
grpc-java copied to clipboard
Tracking issue for StreamObservers
If you have a method like this (Kotlin):
@Transactional
override fun get(
request: DummyRequest,
responseObserver: StreamObserver<DummyResponse>,
) {
myStreamingDatabaseQuery()
.iterator()
.forEach { responseObserver.onNext(it) }
responseObserver.onCompleted()
}
fun myStreamingDatabaseQuery(): Stream<Dummy> = ...
It will work fine as the transaction is opened, the stream is processed, and the transaction is closed.
However, you might then run into memory issues and try to use StreamObservers like this:
@Transactional
override fun get(
request: DummyRequest,
responseObserver: StreamObserver<DummyResponse>,
) {
val iterator = myStreamingDatabaseQuery().iterator()
StreamObservers.copyWithFlowControl(
iterator,
responseObserver as CallStreamObserver<DummyResponse>,
)
}
fun myStreamingDatabaseQuery(): Stream<Dummy> = ...
This won't work because StreamObservers starts processing the stream after get() has returned. However, the transaction will then have been closed and you can no longer read the stream.
Any idea how to solve this?
The issue extends to closing the stream, which my code above lacks.
@Transactional assumes all processing is done within the method. That requires the method to be blocking. But for streaming you have to implement the service method in an async fashion (in order to get onReady() callbacks). So @Transactional is simply incompatible with gRPC streaming until we have some blocking API on server-side. But that's not specific to the StreamObservers utility.
Agreed to mark it deprecated because it is of minimal usefulness and is not frequently used (nothing in g3 or sourcegraph). Prefer to have grpc-java usages of it replaced, instead of creating an iterator just use onReady.
We should close this when the class is actually removed, which is also the point the reference to this issue is removed.