grpc-kotlin icon indicating copy to clipboard operation
grpc-kotlin copied to clipboard

Using flowOn on the request flow fails silently

Open asarkar opened this issue 5 years ago • 3 comments
trafficstars

I've a bidi use case where more requests may be generated after receiving response from the server. In order to achieve this, I use a Channel that can be used by the response handler to send messages, and provide that Channel to the rpc method as a flow.

stub.service(requestChannel.receiveAsFlow())

This works, but the requests are processed on the default dispatcher. Trying to change the dispatcher by adding a flowOn fails silently without any stacktrace. My guess is that the flow invariant is violated somehow.

asarkar avatar Aug 05 '20 04:08 asarkar

Can you elaborate a little on what you mean by "the requests are processed on the default dispatcher"? And what you mean by "fails silently without any stacktrace"? What, exactly, happens?

If stub.service has a unary response, not a streaming response, then requestChannel.receiveAsFlow() will be evaluated in the same context that is calling stub.service -- stub.service doesn't choose a context, it uses the caller's. Similarly, if the response is streaming, then requestChannel.receiveAsFlow() won't be collected until the streaming response is collected, and then it will be collected in the same context that the streaming response is collected in.

flowOn ought to work just fine, but there's not really enough information here to debug why it's not working for you.

lowasser avatar Aug 11 '20 18:08 lowasser

@lowasser Thanks for your response. My use case is bidi streaming, where the client may generate more messages based on the server's response. The problem I reported is in a work project that I can't post code from, so I tried to reproduce the issue in this project by simulating a duel, where the client or server may randomly choose to "shoot", or "yield". This is very similar to the actual use case, with one difference: In the work project, the server is remote and someone else's; in my project, I wrote the client and the server, and both run on my localhost.

Here're the problems I see:

  1. I don't get the failure that I saw in my work project where attaching a flowOn failed without a stacktrace. However, the Dispatcher isn't changed either. See this method.
  2. The Dispatcher set for consuming the server responses or for sending requests are ignored; the code still executes on default dispatcher, which can be seen by running this test.
  3. If the client wants to generate more messages based on the server's response, they have to use an intermediate SendChannel or a MutableStateFlow. StateFlow ignores flowOn by design, and as mentioned in point 1 above, channel.receiveAsFlow().flowOn(Dispatchers.IO) has no effect, so the user is stuck with the default dispatcher.
  4. Consider the case where the client (challenger) yields, and after sending the server (opponent) a truce message, it wishes to stop processing the response flow, and doesn't want to send any more messages. Using a StreamObserver, this is as easy as three lines of code:
    requestObserver.onCompleted() // done sending requests
    shotsFired.close() // close intermediate channel
    ioScope.cancel() // cancel the coroutine scope that's processing the responses
    
    The roundabout way of doing this with a request/response Flow is using a takeWhile on the response flow, and cancelling the coroutine scope that's processing the responses. This implies that the response Flow must always be processed in a separate scope, otherwise the client would be stuck forever.

asarkar avatar Aug 13 '20 06:08 asarkar

I remain confused, but let me try to pick this apart: it sounds like we're on the same page that receiveAsFlow() more or less ignores direct follow-on calls to flowOn. I claim this is normal and desirable, because receiveAsFlow() isn't really doing any work and it doesn't especially matter which dispatcher it's running on. The work is presumably getting done in whichever scope is sending to the SendChannel, which remains completely under the user's control.

As far as generating messages based on the server responses, SendChannel is the simplest solution, but adapters exist (or can be built) that stay in "flow world." My favored adapter looks like this:

private object NullPlaceholder

/**
 * Outputs a flow defined in terms of itself.  For example,
 * `flowFixpoint { f -> f.onStart { emit(1) } }` emits 1, 1, 1, ..., which is the unique
 * flow that is 1 concatenated with itself.
 */
fun <T> flowFixpoint(
  fixed: (Flow<T>) -> Flow<T>
): Flow<T> = flow {
  val buffer = Channel<Any>(Channel.UNLIMITED)
  // buffer only actually accumulates as many elements as are unprocessed at any given time

  val bootstrap = flow {
    readLoop@while (true) {
      val polled = buffer.poll()

      @Suppress("UNCHECKED_CAST")
      when (polled) {
        null -> {
          if (buffer.isClosedForReceive) {
            break@readLoop
          } else {
            throw IllegalStateException(
              "flowFixpoint infinite loop: tried to read more elements than have been emitted"
            )
          }
        }
        NullPlaceholder -> emit(null as T)
        else -> emit(polled as T)
      }
    }
  }
  try {
    fixed(bootstrap)
      .collect {
        buffer.send(it ?: NullPlaceholder)
        emit(it)
      }
  } catch (thrown: Throwable) {
    buffer.close(thrown)
    throw thrown
  }
  buffer.close()
}

...which then gets used along the following lines:

val responses = flowFixpoint { responses ->
  val requests = flow {
    emitAll(initialRequests)
    responses.takeWhile { shouldKeepSendingRequests(it) }.collect { emitResponses(it) }
  }
  stub.bidirectionalRpc(requests)
}
responses.collect { doWhatever(response) }

Notably, there aren't any launches here, nor cancellations (other than the under-the-covers one in takeWhile's implementation).

lowasser avatar Aug 26 '20 20:08 lowasser