kotlinx.coroutines icon indicating copy to clipboard operation
kotlinx.coroutines copied to clipboard

`CoroutineStart.UNDISPATCHED` flaky with `collectLatest`

Open vRallev opened this issue 8 months ago • 1 comments

This test due to the flaky code:

@Test
fun `CoroutineStart_UNDISPATCHED is flaky when using collectLatest`() = repeat(10) {
    runBlocking(Dispatchers.IO) {
        println("Iteration $it")

        val flow = MutableSharedFlow<Int>(extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)
        var valueReceived = false

        val job = launch(start = CoroutineStart.UNDISPATCHED) {
            flow.collectLatest {
                valueReceived = true
            }
        }

        check(flow.tryEmit(1))

        withTimeout(2_000L) {
            while (!valueReceived) {
                delay(20L)
            }
        }

        job.cancel()
    }
}

The goal is to launch a job and start collecting a flow before the next instructions happen. In the sample it so happens when tryEmit is called that there sometimes is a subscriber and sometimes there isn't. This behavior is surprising and unintuitive.

I found multiple workarounds to fix that, but ideally the sample would work fine:

  1. Use collect instead of collectLatest. With that the test is green, but in my production code I need the behavior of collectLatest.
  2. Use replay = 1 for the shared flow. Then the missed value on the flow would be replayed. But this isn't ideal for more values or if there other subscribers that don't want the replay behavior.
  3. Use Dispatchers.Unconfined when launching the job. This is a significant change with other side effects.
  4. Add a delay before emitting the value.

Is this behavior expected? What is the recommendation for this use case?

vRallev avatar Mar 11 '25 21:03 vRallev

First of all, yes, this is expected: collectLatest launches a separate coroutine in which it collects the upstream values, and at the moment of tryEmit, that internally launched coroutine may or may not have started. On a single-threaded dispatcher, as you can see by removing Dispatchers.IO from the runBlocking arguments, it will consistently not have started by the time tryEmit is called.

That said, I do believe we should change this behavior. In this particular scenario where the upstream is executed in the same coroutine context as the downstream, there is no danger in immediately entering the internal coroutine and starting the collection, and the use case of reliably subscribing to a shared flow before starting emissions is valuable.

dkhalanskyjb avatar Apr 23 '25 13:04 dkhalanskyjb