kotlinx.coroutines icon indicating copy to clipboard operation
kotlinx.coroutines copied to clipboard

Map StateFlows to StateFlow

Open qwwdfsad opened this issue 3 years ago • 17 comments

Duplicating the content of https://youtrack.jetbrains.com/issue/KT-45502 for further discussion and use-cases.

=== I have a pattern that I use often while developing an UI (in Compose). The current Flow API lacks methods for mapping StateFlows to StateFlow that leads to increasing code complexity.

Imagine you have a ViewModel that has some data fields and one method that can be invoked only if the fields have correct values.

If we implemented it with LiveData it would look like this:

class MyLiveDataViewModel {

    private val _firstField = MutableLiveData("value1")
    val firstField: LiveData<String> = _firstField

    private val _secondField = MutableLiveData(300)
    val secondField: LiveData<Int> = _secondField

    val canDoSomethingScary = MediatorLiveData<Boolean>()
        .apply {
            fun update() {
                val first = firstField.value
                val second = secondField.value

                value = !first.isNullOrBlank() && second != null && second > 0 && second < 10
            }

            addSource(firstField) { update() }
            addSource(secondField) { update() }

            update()
        }

    fun doSomethingScary() {
        if (canDoSomethingScary.value != true) return

        // do something scary
    }
}

Nothing special, let's try to implement it with the current Flow API:

class MyFlowViewModel(
    // We have to have coroutineScope
    val coroutineScope: CoroutineScope
) {

    private val _firstField = MutableStateFlow("value1")
    val firstField: StateFlow<String> = _firstField

    private val _secondField = MutableStateFlow(300)
    val secondField: StateFlow<Int> = _secondField

    val canDoSomethingScary: Flow<Boolean> = combine(firstField, secondField) { first, second ->
        first.isNotBlank() && second > 0 && second < 10
    }

    // method became suspended
    suspend fun doSomethingScary1() {
        if (!canDoSomethingScary.stateIn(coroutineScope).value) return

        // do something scary
    }

    // or code duplication
    fun doSomethingScary2() {
        if (firstField.value.isNotBlank() && secondField.value > 0 && secondField.value < 10) return

        // do something scary
    }
}

Both methods look over complicated. We don't like them. We have to have coroutineScope or code duplication for such simple tasks (imagine how real world examples look).

But if we added some quite simple primitives that allow us to map StateFlows to StateFlow, the code would look fine:

class LastStateFlow<T>(
    private val getValue: () -> T,
    private val flow: Flow<T>
) : StateFlow<T> {

    override val replayCache: List<T>
        get () = listOf(value)

    override val value: T
        get () = getValue()

    @InternalCoroutinesApi
    override suspend fun collect(collector: FlowCollector<T>) {
        flow.collect(collector)
    }
}

fun <T1, T2, R> combineStates(flow: StateFlow<T1>, flow2: StateFlow<T2>, transform: (a: T1, b: T2) -> R): StateFlow<R> {
    return LastStateFlow(
        getValue = { transform(flow.value, flow2.value) },
        flow = combine(flow, flow2) { a, b -> transform(a, b) }
    )
}

class MyStateFlowViewModel {

    private val _firstField = MutableStateFlow("value1")
    val firstField: StateFlow<String> = _firstField

    private val _secondField = MutableStateFlow(300)
    val secondField: StateFlow<Int> = _secondField

    val canDoSomethingScary: StateFlow<Boolean> = combineStates(firstField, secondField) { first, second ->
        first.isNotBlank() && second > 0 && second < 10
    }

    fun doSomethingScary() {
        if (!canDoSomethingScary.value) return

        // do something scary
    }
}

It seems that it would be really useful to have such primitives in the standard coroutines library. I don't insist on the implementation (it is obviously naive).

===

qwwdfsad avatar Apr 06 '21 11:04 qwwdfsad

Some additions on it.

Change name

It seems that the more appropriate name is DerivedStateFlow (instead of LastStateFlow).

Performance notes

  1. Thetransform lambda will be invoked for each collector of the flow on each change.
  2. More over, getValue() invokes transform on each invocation too.

They aren't problems, if one uses it in appropriate use cases (not heavy calculations). But nevertheless, invoking transform every time looks excessive.

Performance improvements proposals

As far as I can see, we can not easily solve it without patching the original StateFlowImpl (or implementing the new one). It should be able to notify derived states synchronously on value updating. If it can notify synchronously on value updating, we can subscribe to it in the combineStates method and update derived states synchronously. In this case, derived states won't transform on every getValue() and for each collector. The main drawback that I see is the necessity to unsubscribe from this notification when it isn't needed anymore.

For subscribing to value changes, we can use an external object, something like TransactionManager, and use it for convenient unsubscribing and escaping excessive recalculation. Modern state management frameworks have opportunities like this (MobX is a good example). For example:

  • We have 3 initial states and 2 derived states from the first 3
  • We want to update the first 3, but we don't want to notify derived states on each update because derived states can be in inconsistent states until all the 3 initial states are updated.

So, we can write something like this

val transactionManager = TransactionManager()

val state1 = MutableStateFlow("oldValue1")
val state2 = MutableStateFlow("oldValue2")
val state3 = MutableStateFlow("oldValue3")

val derivedState1 = combineStates(transactionManager, state1, state2) { /* do transformation */ }
val derivedState2 = combineStates(transactionManager, state2, state3) { /* do transformation */ }

transactionManager.transaction {
    state1.value = "newValue1"
    state2.value = "newValue2"
    state3.value = "newValue3"
} // recalculation the derived states will be here and only once

What do you think about it?

kirillvasilenko avatar May 26 '21 09:05 kirillvasilenko

This will be a really nice addition :)

teodorpenkov avatar Jun 11 '21 10:06 teodorpenkov

Since this mentions Compose, there's already an idiomatic solution in that domain using Compose's snapshots that has a number of desirable properties. Translating the above:

class MySnapshotViewModel {

    var first: String by mutableStateOf("value1")
        private set

    var second: Int by mutableStateOf(300)
        private set

    val canDoSomethingScary: Boolean
        get() = first.isNotBlank() && second > 0 && second < 10

    fun doSomethingScary() {
        Snapshot.withMutableSnapshot {
            if (!canDoSomethingScary) return

            // do something scary
        }
    }
}

Snapshot state is transitively observable and transaction-consistent. Accessing viewModel.canDoSomethingScary is an observed read of both first and second when performed in a snapshot monitoring reads. (Compose and Compose UI do this automatically on your behalf; it is already the primary means of invalidating composition, layout, or drawing.) This enables the following Compose code:

if (viewModel.canDoSomethingScary) {
    Button(onClick = { viewModel.doSomethingScary() }) {
        Text("Do something scary")
    }
} else {
    Text("You can't do something scary")
}

Changing either first or second such that the result of canDoSomethingScary changes will cause the Button to appear or disappear from the UI accordingly.

If a derived calculation is particularly expensive and should be cached rather than recomputed at each read, derivedStateOf {} acts as a form of lazy {} that will invalidate if its inputs change, causing the next access to evaluate, cache, and yield the appropriate result for the current snapshot:

val expensiveDerivedValue by derivedStateOf { calculateExpensiveResult(first, second) }

As implied by the name, snapshots are consistent records across multiple state objects. The body of doSomethingScary above, by using the Snapshot.withMutableSnapshot {} API, will always observe transactionally consistent values of all snapshot state objects. As composition is performed with a consistent snapshot, Compose code is protected against snapshot state values being changed by another thread and becoming inconsistent with one another across a composition while composition is in progress. As we continue to explore moving Compose to a multithreaded model this will become increasingly important.

The following code ensures that first and second can never be independently observed to be out of sync with one another:

Snapshot.withMutableSnapshot {
    second++
    // second is only changed in this pending snapshot,
    // the change is not yet visible to other threads.
    first = "value$second"
}
// Changes to both second and first are now visible to other threads

As snapshots do not require explicit links across objects to remain consistent, several changes across graphs of independent snapshot-powered objects can be performed this way using the standard public API of those objects:

Snapshot.withMutableSnapshot {
    viewModel1.doSomethingScary()
    viewModel2.doSomethingElseScary()
}

If other code in an app wishes to observe and consume snapshot data, the low-level Snapshot.take[Mutable]Snapshot APIs are available as well as the snapshotFlow {} API, which can be used like this:

val flow = snapshotFlow { viewModel.canDoSomethingScary }

Which bridges over to the cold Flow world. The block is evaluated with a consistent snapshot and will run and emit a new result from the block whenever the snapshot inputs have changed.

This post is a good resource for more information on snapshots: https://dev.to/zachklipp/introduction-to-the-compose-snapshot-system-19cn

Consuming Flows from Compose code is a subscription to each Flow that publishes emitted values into snapshot state objects that Compose consumes, so the choice of tool is not snapshots or MutableStateFlow but rather snapshots or (MutableStateFlow and snapshots.) Even in the presence of the proposed feature above, snapshots still carry the property of consistency across independent state objects that are not explicitly associated with one another by app code via operators that seems likely to be out of scope for Flow to implement.

This requested feature for StateFlows may be generally useful and desirable, but it would not be a replacement for the MySnapshotViewModel approach above in Compose contexts. It is not a requirement for better supporting Compose users. :)

adamp avatar Jun 11 '21 15:06 adamp

Accomplishing common things using StateFlows in a viewModel feels a little clunky and combineStates seems very useful for reducing some of that awkwardness.

Also, that's a fantastic write up @adamp! It really highlights a lot of the parallel work that's happening, which those who haven't dove into Compose might be missing (personally, I haven't picked up Compose yet and snapshots were NOT on my radar. Others are probably in the same boat).

Whatever the outcome here, it would be nice if this page is updated with the best-practices for tying it all together. A section on "combining StateFlows with snapshots" or a link to content similar to your post under Flows in Jetpack libraries would do a lot of good but it would probably require someone like you who is in position to see the whole picture.

gmale avatar Jun 15 '21 16:06 gmale

@adamp, thank you very much for such a detailed explanation and useful links!

It is off topic, but we investigated and played around with the Compose management system a little bit and noticed some problems there. So we prepared the proposal with improvements. Please, take a look and vote if you are interested in it.

kirillvasilenko avatar Jun 28 '21 11:06 kirillvasilenko

FYI: The most fashionable implementation that one can use in their codebase until official implementation comes is the following:

/**
 * Does not produce the same value in a raw, so respect "distinct until changed emissions"
 * */
class DerivedStateFlow<T>(
    private val getValue: () -> T,
    private val flow: Flow<T>
) : StateFlow<T> {

    override val replayCache: List<T>
        get () = listOf(value)

    override val value: T
        get () = getValue()

    @InternalCoroutinesApi
    override suspend fun collect(collector: FlowCollector<T>): Nothing {
        coroutineScope { flow.distinctUntilChanged().stateIn(this).collect(collector) }
    }
}

fun <T1, R> StateFlow<T1>.mapState(transform: (a: T1) -> R): StateFlow<R> {
    return DerivedStateFlow(
        getValue = { transform(this.value) },
        flow = this.map { a -> transform(a) }
    )
}

fun <T1, T2, R> combineStates(flow: StateFlow<T1>, flow2: StateFlow<T2>, transform: (a: T1, b: T2) -> R): StateFlow<R> {
    return DerivedStateFlow(
        getValue = { transform(flow.value, flow2.value) },
        flow = combine(flow, flow2) { a, b -> transform(a, b) }
    )
}

// and so on

kirillvasilenko avatar Jun 29 '21 12:06 kirillvasilenko

The combine operation is actually not the best abstraction. Take a look at Jetpack Compose's derivedStateOf which is much more powerful and more (easily) optimal. My open-source library (ReactiveState) provides a similar abstraction with derived, though it currently depends on a CoroutineScope which I'd like to get rid of (edit: new release published which works without CoroutineScope).

This is what the ideal API would look like:

val a = MutableStateFlow(0)
val b = MutableStateFlow(0)

val computed = derived {
    if (a.value > 5) 2 * a.value else b.value + 10
}

// since that doesn't seem to be possible, with ReactiveState's derived you call `get()` to get the value
val computed = derived {
    if (get(a) > 5) 2 * get(a) else get(b) + 10
}

// there's also a coroutine-based derived which can be used with SharingStarted
// (or use derivedWhileSubscribed if you don't have a CoroutineScope and only need WhileSubscribed semantics)
val computed = scope.derived(initial = 0, started = WhileSubscribed()) {
    val data = someSuspendFun()
    get(a) + data
}

In our quite large code-base (almost 1M LOC) we have quite a few such usages where we access different StateFlows depending on if and when expressions and it's important to only observe the StateFlows that we really depend on in each situation instead of using combine to observe all of them even if only a subset is needed at any point in time.

The derived helper has become such an essential tool in our whole codebase and it has made our code significantly simpler and less bug-prone. After having used this kind of architecture successfully in multiple projects I'd highly recommend having it integrated in the official coroutines library. It's a much more solid abstraction than working with Flow.map and combine and other Flow-based operator sequences which in my experience lead even very experienced developers into traps and nasty bugs and overly complicated code.

wkornewald avatar Dec 30 '21 09:12 wkornewald

The method to be overriden collect() requires return type as Nothing so we need delegated collector of StateFlow to return. So tried this solution and it worked well for me.

In collector, used stateIn with current coroutine scope and we know that it'll never block collector since it'll return StateFlow's value at the moment quickly.

private class CombinedStateFlow<T>(
    private val getValue: () -> T,
    private val flow: Flow<T>
) : StateFlow<T> {

    override val replayCache: List<T> get() = listOf(value)

    override val value: T get() = getValue()

    override suspend fun collect(collector: FlowCollector<T>): Nothing =
        coroutineScope { flow.stateIn(this).collect(collector) }
}

/**
 * Returns [StateFlow] from [flow] having initial value from calculation of [getValue]
 */
fun <T> combineStates(
    getValue: () -> T,
    flow: Flow<T>
): StateFlow<T> = CombinedStateFlow(getValue, flow)

/**
 * Combines all [stateFlows] and transforms them into another [StateFlow] with [transform]
 */
inline fun <reified T, R> combineStates(
    vararg stateFlows: StateFlow<T>,
    crossinline transform: (Array<T>) -> R
): StateFlow<R> = combineStates(
    getValue = { transform(stateFlows.map { it.value }.toTypedArray()) },
    flow = combine(*stateFlows) { transform(it) }
)

/**
 * Variant of [combineState] for combining 3 state flows
 */
inline fun <reified T1, reified T2, reified T3, R> combineStates(
    flow1: StateFlow<T1>,
    flow2: StateFlow<T2>,
    flow3: StateFlow<T3>,
    crossinline transform: (T1, T2, T3) -> R
) = combineStates(flow1, flow2, flow3) { (t1, t2, t3) ->
    transform(
        t1 as T1,
        t2 as T2,
        t3 as T3
    )
}

Any issue with this approach? cc: @qwwdfsad

PatilShreyas avatar Jun 22 '22 05:06 PatilShreyas

Currently we could map a StateFlow like this, to have a view only of the needed "portion" of the initial elements:

val main = MutableStateFlow<List<Int>>(emptyList())
val onlyFirst = main.mapLatest { it.firstOrNull() }.stateIn(this, SharingStarted.Eagerly, null)

However, one of the problems here is that the second flow doesn't have instant updates of the value in the main flow. So assigning main.value = listOf(something) and then immediately checking onlyFirst.value can return one of the previous "portions".

Doing val onlyFirst = main.mapState { it.firstOrNull() } solves this problem!

SerVB avatar Jul 05 '22 16:07 SerVB

@PatilShreyas, thank you very much for the update.

One of the last updates of the coroutine library really broke the original proposal because of replacing Unit with Nothing as a result of the collect function. It wasn't obvious how to solve it, but your update did it.

I updated the most fashionable implementation here accordingly.

kirillvasilenko avatar Jul 17 '22 08:07 kirillvasilenko

fun <T, R> StateFlow<T>.stateMap(transform: (T) -> R): StateFlow<R> {
    return object : StateFlow<R> {

    override val replayCache: List<R>
        get() = [email protected] { transform(it) }

    override val value: R
        get() = transform([email protected])

    override suspend fun collect(collector: FlowCollector<R>): Nothing {
        [email protected] { transform(it) }.collect(collector)
        error("StateFlow collection never ends.")
    }
    }
}

another variation of the same

firmeldir avatar Jul 24 '23 11:07 firmeldir

I have a use case that I wanted to see if this would solve.

I currently am consuming a StateFlow from a 3rd party library that is of type Person. What I really care about is Person.name and so I need to pass a StateFlow into another library (that im the author of). I can send it the StateFlow<Person>, but then that means that my other library needs to depend on the 3rd party library because I need to be able to resolve the Person type. Is there a way to map a StateFlow<Person> to a StateFlow<String> so my other library doesn't have to depend on the 3rd party library for just the type?

ColtonIdle avatar Aug 31 '23 13:08 ColtonIdle

Yes. You can do that via sourceFlow.map { .... }.stateIn(scope) to map the original StateFlow to one with a new type.

elizarov avatar Aug 31 '23 14:08 elizarov

So I have two questions about that though.

  1. What scope should I use? I want the original scope, etc of the existing StateFlow. Is there a way to tell it to reuse that scope?
  2. I can't just use stateIn() because now I have to be in a coroutine/suspend function. So I have to use the scope, started, initialValue override. So now, even though I'm just trying to pass along this StateFlow thats not in my control, I need to give it a scope, started and initialValue (even though the initial value should be grabbed from the mapper I just gave it)

ColtonIdle avatar Aug 31 '23 15:08 ColtonIdle

You have to provide some scope in which you'll be doing the computation, if the source from where the state flow does not expose the scope in which its flow is active, then you'll have to provide the scope of your own component. For example, if you are going it as a part of your ViewModel, then you'll provide your view model's scope.

For initial value, if you know that your mapping function (say f) is simple and fast, and given that you want to start computing it immediately, you can use it for initial value, too. E.g. sourceFlow.map { f(it) }.stateIn(scope, SharingStarted.Eagerly, f(sourceFlow.value)) will do the trick in this case.

elizarov avatar Sep 01 '23 08:09 elizarov

You have to provide some scope in which you'll be doing the computation That makes sense when put that way. Thanks

SharingStarted.Eagerly Great. I wasn't sure which one to use here.

Thank you so much @elizarov . This makes much more sense. I appreciate you teaching me. 🙏

ColtonIdle avatar Sep 01 '23 17:09 ColtonIdle

For anyone wondering. im going this route

// The scope used here is the scope that is used for the mapping work
fun <T, M> StateFlow<T>.map(
    coroutineScope : CoroutineScope,
    mapper : (value : T) -> M
) : StateFlow<M> = map { mapper(it) }.stateIn(
    coroutineScope,
    SharingStarted.Eagerly,
    mapper(value)
)

ColtonIdle avatar Sep 01 '23 17:09 ColtonIdle