grpc-kotlin icon indicating copy to clipboard operation
grpc-kotlin copied to clipboard

Cancelling stuck flows

Open andrewparmet opened this issue 3 years ago • 2 comments
trafficstars

I have an application with thousands of active bidi streaming calls. We return a callbackFlow {} in our method.

Every once in a while we end up in a state where the flows stop being consumed and we receive an error of some sort from a third party library. In these cases we want to close the flows and disconnect the client to force a retry, but I'm seeing that the close() method on a SendChannel (the receiver of the callbackFlow {} builder) enqueues the closure on the same queue as normal messages, so the channel is not closing and we have a resource leak. We have something like this:

class ThirdPartyCallback<T>(
    private val channel: SendChannel // taken from `this` in a `callbackFlow {}` call
) {
    fun onNext(elem: T) {
        runBlocking { channel.send(elem) } // executes on some thread controlled by the third party library
    }

    fun onError(t: Throwable) {
        channel.close(t) // this doesn't seem to make it to gRPC to clean up the call
    }
}

Should we intercept the channel's queue into one of our own, use trySend, and store elements in our queue if the channel isn't ready to receive? I believe this is how grpc-java recommends implementing server-side backpressure - we do this in a Java service. There we have the ability to set an onReadyHandler on the ServerCallStreamObserver so we can asynchronously resume sending messages when the underlying call is ready. Is there an equivalent for grpc-kotlin?

Thanks!

andrewparmet avatar Sep 08 '22 04:09 andrewparmet

This reads to me like an issue with callbackFlow, not with grpc-kotlin -- you're unhappy with how a callbackFlow handles closure interrupting a stream of messages. Are you asking for suggestions about best practices around this issue, saying there's a bug in grpc-kotlin, or saying there's a bug in callbackFlow?

Managing your own queueing if you're unsatisfied with the SendChannel queueing seems reasonable to me; alternately, you might modify the callbackFlow you pass to gRPC with conflate, buffer, or some other mechanism to modify the buffering policy.

lowasser avatar Sep 08 '22 21:09 lowasser

I nearly submitted this to coroutines, actually. I'm looking for suggestions!

We don't want to conflate our channel as that appears to involve dropping messages. I think to manage our own queue we should set the buffer to zero and use trySend, but to ensure prompt delivery of responses we'd want to know when the ServerCall becomes ready. Is that possible to know, or alternatively would you recommend using something different from a callbackFlow?

andrewparmet avatar Sep 09 '22 12:09 andrewparmet

What if we forward requests to a channel and have the channel process the requests? When a third party fails we can cancel the channel to clear all buffered requests.

We have an additional requirement that each request is fully processed before starting the next which has complicated things. I think the launch/join in channel consumption should handle that.

suspend fun collect(requests: Flow<SubscribeRequest>) {
    // limit if we so desire, which will give backpressure to gRPC
    val channel = Channel<SubscribeRequest>(Channel.UNLIMITED)

    coroutineScope {
        channel.consumeEach {
            launch { handleRequest(it, channel) }.join()
        }
    }

    requests.collect(channel::send)
}

suspend fun handleRequest(request: Request, channel: Channel) {
    // if the third party has a fatal error, we can call channel.cancel() here
}

andrewparmet avatar May 06 '23 17:05 andrewparmet

Actually, is the solution much simpler? Can we pass the ProducerScope from the callbackFlow to ThirdPartyCallback as a CoroutineContext and cancel it? That should propagate up to the gRPC call if I'm understanding correctly.

andrewparmet avatar May 06 '23 18:05 andrewparmet

So long as the flow we return is cancellable this works.

callbackFlow {
    // produce some messages
    awaitClose()
}.cancellable()

andrewparmet avatar May 14 '23 23:05 andrewparmet