kotlinx.coroutines icon indicating copy to clipboard operation
kotlinx.coroutines copied to clipboard

Notification when a SendChannel is ready to send

Open andrewparmet opened this issue 3 years ago • 3 comments
trafficstars

I have an application with long-lived flows constructed via callbackFlow. I'm receiving messages from a third-party API and relaying them forwards. I want to manage the queue of inbound messages from the third-party API myself so that I can cancel the flow promptly, since the closure of the SendChannel is queued behind any message backlog (or is there a way around that?).

Is it possible to be notified asynchronously when a SendChannel won't suspend? The usage I'm imagining is to always call trySend and enqueue the message if the attempt doesn't succeed, but then I'd want to know when it's okay to start sending messages again.

Alternatively are there any other recommendations for a way to handle this pattern?

andrewparmet avatar Sep 09 '22 13:09 andrewparmet

I'm a bit lost as to what you are trying to achieve. Why do you need to manage the queue of inbound messages? (it is supposed to be managed for you). Can you give some self-contained example of what you are trying to do what what behavior is not like the one you are expecting? In particular, what do you mean by "prompt cancellation"? Can you show an example where the cancellation is not prompt enough for your needs?

elizarov avatar Sep 12 '22 07:09 elizarov

We're observing cases where it appears that our send calls are suspending due to an overloaded consumer that isn't recovering, so we'd like to close the callbackFlow, but it looks like those closures never reach the consumer since the queue is stuck. I could be misinterpreting our logs I suppose - this situation doesn't come up often.

We'd only want to manage the queues ourselves so that the close call reaches the consumer promptly, i.e., to free up the resources it's using. To be more concrete this is a callbackFlow used as a bidi stream in a gRPC call (see https://github.com/grpc/grpc-kotlin/issues/355). We want to close the call when we're unable to send messages to the client, i.e., all calls to send are suspending and our application grinds to a halt.

In grpc-java we accomplish prompt cancellation by using the ServerCallStreamObserver's onReady handler to postpone sending to the client until it is ready so we always have an open line to send a closure. That's the kind of API I've been looking to use, but I'm happy to use whatever is idiomatically available in the coroutines library as well!

andrewparmet avatar Sep 13 '22 03:09 andrewparmet

Can you reproduce the problem is some self-contained code?

elizarov avatar Sep 13 '22 06:09 elizarov