kotlinx.coroutines
kotlinx.coroutines copied to clipboard
Channel onUndeliveredElement doesn't work
I wanted to use channel with buffer but was disappointed to find out that it doesn't work as expected.
When I use a channel with capacity = 1, like this:
val capacity = 1
val channel = Channel<Int>(
capacity = capacity,
onBufferOverflow = BufferOverflow.DROP_OLDEST,
onUndeliveredElement = { value -> println("Dropped value: $value") }
)
runBlocking {
(1..3).forEach { value ->
channel.send(value)
println("Sent value: $value")
}
}
It works perfectly as I want, output:
Sent value: 1
Dropped value: 1
Sent value: 2
Dropped value: 2
Sent value: 3
So I expected the same behavior with greater capactity = 2:
val capacity = 2
val channel = Channel<Int>(
capacity = capacity,
onBufferOverflow = BufferOverflow.DROP_OLDEST,
onUndeliveredElement = { value -> println("Dropped value: $value") }
)
runBlocking {
(1..3).forEach { value ->
channel.send(value)
println("Sent value: $value")
}
}
But onUndeliveredElement never called in that case
Sent value: 1
Sent value: 2
Sent value: 3
BufferOverflow.DROP_LATEST doesn't work either.
This is indeed a bug, but it seems to be only fixable in 1.7.0.
The story is the following:
BufferOverflowdoesn't acknowledgeonUndeliveredElementin any channels at all, exceptCONFLATEDChannel()function has an internal optimization -- if capacity is1and strategy isDROP_LATEST, it picksCONFLATEDas an underlying implementation because it's semantically the same.
So we have to fix all channels except CONFLATED which potentially can be considered as breaking change for most of the channels
While we're evaluating it, could you folks please describe your use case for this?
My use case from #3327 is that I am using a bounded channel in a thread that receives some data, processes it, then sends it over the network. Sometimes, the network takes longer than expected so the channel reaches max capacity and the BufferOverflow cleans the oldest data. I would like to be able to monitor the number of messages that are dropped for analysis and log the lost data.
Edit: I'm wondering if it would be better in a callback or if the send call should return the dismissed element
We have almost the same use case. We receive some data from the network and send items to a channel while the background worker processes them. When we have too many messages we'd like to drop the oldest. Planned to use callback for metrics and logging.
Thanks, folks!
That's not the use-cases we expected when designing this API, but they are definitely the ones we'd like to support
Fixed by #3621