kotlinx.coroutines icon indicating copy to clipboard operation
kotlinx.coroutines copied to clipboard

Add start parameter to launchIn extension for Flow

Open LouisCAD opened this issue 6 years ago • 10 comments

Usage would look like so:

someFlow.launchIn(lifecycleScope, start = CoroutineStart.UNDISPATCHED)

My current use-case is to have the UI be set up synchronously by the flow collection start, avoiding an extra unneeded layout pass in an Android app in case collecting the flow has such side-effects.

LouisCAD avatar Jul 07 '19 13:07 LouisCAD

Can you give a slightly more context in your example, please (some more code around it). Where is your code that is trying to collect the flow? And what kind of flow you are collecting? I'm trying to grasp why it is important to be UNDISPATCHED in your case.

elizarov avatar Jul 08 '19 08:07 elizarov

This is my use case:

import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.channels.BroadcastChannel
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.launch

suspend fun main() {
    coroutineScope {
        val eventBus = BroadcastChannel<Int>(5)
        val subscription = eventBus.asFlow()
        launch(/* start = CoroutineStart.UNDISPATCHED */) {
            subscription.collect { println(it) }
        }

        repeat(10) { eventBus.send(it) }
        eventBus.close()
    }
}

I used the same trick, an active subscription should not lost messages.

fvasco avatar Jul 08 '19 09:07 fvasco

@elizarov, is the following example meaningful to you?

someFlow.onEach { // Start of collection of someFlow enables UI component.
    ui.updateUi(it) // If value is already available, we might benefit from
    // updating UI ASAP, not waiting for an extra dispatch.
}.launchIn(this) // We would prefer it to be undispatched to avoid 2 layout passes.

LouisCAD avatar Jul 09 '19 08:07 LouisCAD

Would simply using Dispatchers.Main.immediate have the same effect (globally)?

elizarov avatar Jul 14 '19 21:07 elizarov

It wouldn't exactly, as it would avoid dispatch for every coming value as opposed to the first one until the first suspension point.

Using Dispatchers.Main.immediate might be an alternative, but I read you discouraged its use as it might cause deadlocks in case of suspending code always taking non suspending paths.

Now that I'm thinking about it though, I wonder if this is plausible as even dispatched, collecting a flow leads to a suspending function to be called, which always passes through a dispatch through the ContinuationInterceptor making synchronous collection of first value impossible, right?

LouisCAD avatar Jul 15 '19 00:07 LouisCAD

Both start = UNDISPATCHED and immediate dispatchers present similar dangers and provide similar optimization opportunities with respect to a avoiding an extra redispatch onto the event queue. That is why I'm asking.

Indeed, start = UNDISPATCHED is somewhat more controllable, more explicit and, thus, maybe the less dangerous of the two, so it might be warranted to support it for the cases where you understand what's going on and want to have a bit of an extra performance.

elizarov avatar Jul 15 '19 08:07 elizarov

Hi @elizarov, my use case is slightly different by @LouisCAD one, I made a little demo above.

I use BroadcastChannel as event bus, I made a subscription before any event was sent but, in the above example, all events were losts.

Frankly it is not really clear for my if the asFlow fits my use case.

fvasco avatar Jul 15 '19 09:07 fvasco

#1340 can fix my use case

fvasco avatar Jul 15 '19 17:07 fvasco

@qwwdfsad Is the behaviour for

A:

someFlow.onEach {
  updateValue()
}.launchIn(scope + Dispatchers.Unconfined)

equivalent to

B:

scope.launch(start = CoroutineStart.UNDISPATCHED, context = Dispatchers.Unconfined) {
   someFlow.collect {
     updateValue()
}

I would have thought so, given the intention of Dispatchers.Unconfined. In this case, updateValue() is thread safe but we want it to happen synchronously as this collection is setup.

I have a bug in a test (only on the test environment) where A does not happen synchronously but B does.

There are other complications involved (updateValue() is setting a Compose State value) but I'm not sure that's the culprit here yet.

steve-the-edwards avatar Feb 28 '23 21:02 steve-the-edwards

See also #3679 and #3681

qwwdfsad avatar Mar 27 '23 12:03 qwwdfsad