kotlinx.coroutines icon indicating copy to clipboard operation
kotlinx.coroutines copied to clipboard

Channel onUndeliveredElement doesn't work

Open artemkaxboy opened this issue 3 years ago • 5 comments

I wanted to use channel with buffer but was disappointed to find out that it doesn't work as expected.

When I use a channel with capacity = 1, like this:

val capacity = 1

val channel = Channel<Int>(
    capacity = capacity,
    onBufferOverflow = BufferOverflow.DROP_OLDEST,
    onUndeliveredElement = { value -> println("Dropped value: $value") }
)

runBlocking {
    (1..3).forEach { value ->
        channel.send(value)
        println("Sent value: $value")
    }
}

It works perfectly as I want, output:

Sent value: 1
Dropped value: 1
Sent value: 2
Dropped value: 2
Sent value: 3

So I expected the same behavior with greater capactity = 2:

val capacity = 2

val channel = Channel<Int>(
    capacity = capacity,
    onBufferOverflow = BufferOverflow.DROP_OLDEST,
    onUndeliveredElement = { value -> println("Dropped value: $value") }
)

runBlocking {
    (1..3).forEach { value ->
        channel.send(value)
        println("Sent value: $value")
    }
}

But onUndeliveredElement never called in that case

Sent value: 1
Sent value: 2
Sent value: 3

BufferOverflow.DROP_LATEST doesn't work either.

artemkaxboy avatar Jun 20 '22 09:06 artemkaxboy

This is indeed a bug, but it seems to be only fixable in 1.7.0.

The story is the following:

  • BufferOverflow doesn't acknowledge onUndeliveredElement in any channels at all, except CONFLATED
  • Channel() function has an internal optimization -- if capacity is 1 and strategy is DROP_LATEST, it picks CONFLATED as an underlying implementation because it's semantically the same.

So we have to fix all channels except CONFLATED which potentially can be considered as breaking change for most of the channels

qwwdfsad avatar Jun 22 '22 09:06 qwwdfsad

While we're evaluating it, could you folks please describe your use case for this?

qwwdfsad avatar Jun 22 '22 11:06 qwwdfsad

My use case from #3327 is that I am using a bounded channel in a thread that receives some data, processes it, then sends it over the network. Sometimes, the network takes longer than expected so the channel reaches max capacity and the BufferOverflow cleans the oldest data. I would like to be able to monitor the number of messages that are dropped for analysis and log the lost data.

Edit: I'm wondering if it would be better in a callback or if the send call should return the dismissed element

sakex avatar Jun 22 '22 14:06 sakex

We have almost the same use case. We receive some data from the network and send items to a channel while the background worker processes them. When we have too many messages we'd like to drop the oldest. Planned to use callback for metrics and logging.

artemkaxboy avatar Jun 22 '22 15:06 artemkaxboy

Thanks, folks!

That's not the use-cases we expected when designing this API, but they are definitely the ones we'd like to support

qwwdfsad avatar Jun 22 '22 19:06 qwwdfsad

Fixed by #3621

qwwdfsad avatar Mar 09 '23 13:03 qwwdfsad