grpc-java icon indicating copy to clipboard operation
grpc-java copied to clipboard

Tracking issue for StreamObservers

Open carl-mastrangelo opened this issue 7 years ago • 5 comments

carl-mastrangelo avatar Jul 28 '18 02:07 carl-mastrangelo

If you have a method like this (Kotlin):


  @Transactional
  override fun get(
    request: DummyRequest,
    responseObserver: StreamObserver<DummyResponse>,
  ) {
    myStreamingDatabaseQuery()
      .iterator()
      .forEach { responseObserver.onNext(it) }

    responseObserver.onCompleted()
  }

  fun myStreamingDatabaseQuery(): Stream<Dummy> = ...

It will work fine as the transaction is opened, the stream is processed, and the transaction is closed.

However, you might then run into memory issues and try to use StreamObservers like this:

  @Transactional
  override fun get(
    request: DummyRequest,
    responseObserver: StreamObserver<DummyResponse>,
  ) {
    val iterator = myStreamingDatabaseQuery().iterator()

    StreamObservers.copyWithFlowControl(
      iterator,
      responseObserver as CallStreamObserver<DummyResponse>,
    )
  }

  fun myStreamingDatabaseQuery(): Stream<Dummy> = ...

This won't work because StreamObservers starts processing the stream after get() has returned. However, the transaction will then have been closed and you can no longer read the stream.

Any idea how to solve this?

micheljung avatar Nov 21 '22 11:11 micheljung

The issue extends to closing the stream, which my code above lacks.

micheljung avatar Nov 30 '22 13:11 micheljung

@Transactional assumes all processing is done within the method. That requires the method to be blocking. But for streaming you have to implement the service method in an async fashion (in order to get onReady() callbacks). So @Transactional is simply incompatible with gRPC streaming until we have some blocking API on server-side. But that's not specific to the StreamObservers utility.

ejona86 avatar Nov 08 '23 18:11 ejona86

Agreed to mark it deprecated because it is of minimal usefulness and is not frequently used (nothing in g3 or sourcegraph). Prefer to have grpc-java usages of it replaced, instead of creating an iterator just use onReady.

larry-safran avatar Nov 08 '23 19:11 larry-safran

We should close this when the class is actually removed, which is also the point the reference to this issue is removed.

ejona86 avatar Nov 13 '23 15:11 ejona86