kotlinx.coroutines
kotlinx.coroutines copied to clipboard
StateFlow-like primitive for big states with delta updates
A common problem often appears when writing applications with a state flow in cases when a state is quite a big object, for example some kind of a list of events or other entities, a map, or something similar. These kinds of states are usually transferred over network as a one-time snapshot followed by incremental updates, and there is no ready-to-use primitive to represent them as a StateFlow.
A solution shall cleanly address two sides of the problem:
Ingress: turning a snapshot of the current state plus a sequence of updates into some kind of Flow representation.
Egress: turning a Flow representation into a snapshot of a current state, plus a sequence of updates.
A trivial solution is to use StateFlow<State> as a flow representation, which might be acceptable for moderately-sized state objects. In this representation, the ingress problem is solved like this:
val flow: StaetFlow<State> = flow<State> {
var currentState = getCurrentStateSnapshot()
emit(currentState)
forEachIncomingUpdate { update ->
currentState = currentState.apply(update) // compute updated state
emit(currentState) // emit updated state
}
}.stateIn(scope) // convert into StateFlow<State>
And the egress problem can be solved like this:
var lastState = emptyState() // domain-specific empty state
flow.collect { state ->
val update = state.diffFrom(lastState) // compute delta between states
processUpdate(update)
lastState = state
}
This implementation assumes that we have fun State.diffFrom(other: State): Update that computes a difference between two states and represents them as an update. This is usually possible and would work fine as long as the states are not very big. However, this does not scale easily. If state big (for example, a list of 100s of thousands of items), then typically-quadratic implementation of apply and diffFrom functions will make it too slow to be used in practice.
An efficient design will work somewhat like a combination of StateFlow<State> and SharedFlow<Update>. It will rely on the apply function only and will internally maintain a current state snapshot to be delivered to the new subscribed, followed by the corresponding updates in sync. To make this kind of design representable as a Flow I would suggest to limit ourselves to case where State <: Update (a value space of states is a subset of a value space of update) or, saying it another way, only to cases where sending a n State snapshot is just one kind of Update operation. We will represent the result as a Flow<Update> and will need a new kind of name for this primitive. Let's call it, tentatively, UpdateFlow for now.
A sketch of the corresponding API might look like this. On ingress side one writes:
val flow: UpdateFlow<Update> = flow<Update> {
emit(getCurrentStateSnapshot()) // Note: State <: Update
forEachIncomingUpdate { update -> emit(update) }
}.updateIn(state, ::apply) // convert into UpdateFlow<State>
The
applywill need to a type of(Update, Update) -> Update), that is a function that is capable of merging a snapshot plus update, and an arbitrary pair of updates into an update. It will be used internally to conflate updates for slow subscribers.
On egress side one writes:
flow.collect { update ->
// Note: the first update is guaranteed to be a snapshot
processUpdate(update)
}
All names in this "design" as TBD
We have use cases for this in IJ and would be very glad to see this in the library.
- In IJ we provide extension points, each plugin/module can register an extension. As plugins can be loaded or unloaded dynamically, it's very convenient to receive a stream (I intentionally omit
flowat the moment) of updates. I'm not really fond ofState <: Updateas it will require handling initialStatedifferently, but it will only happen once. Each new subscriber should initially receive a bunch ofAddedupdates as if it was subscribed from the very beginning before the extensions appeared in the collection, and then continue receivingAdded/Removedas the time goes on.
sealed interface Update<T> {
class Added<T>(val value: T) : Update<T>
class Removed<T>(val value: T) : Update<T>
}
val ep: ExtensionPoint = ...
launch {
ep.extensions.subscribe { update: Update ->
when (update) {
is Added -> //
is Removed -> //
}
}
}
It would be great to have a capability to squash Added-Removed pair into nothing, e.g. if a new subscriber did not yet receive an Added update, and Removed with the same value is already in the queue.
-
In editor tabs we have to compute diff between two lists to yield an
Addedupdate. -
In navigation popups we don't use coroutines yet, but we'd like to have a view model which is updatable from a BG thread, and which is rendered on EDT incrementally, i.e. handles additions/removals.
-
In remote dev we have UI models representing lists/tables/etc, and they are expected to be transferred over the network exactly as you suggest.
I'll not list every use case here, but we do have a few with the same semantics: a collection + updates of it.
It may be useful to receive custom data in updates, like Added update over a list should contain an index, so it might be more useful to design a primitive, on top of which various custom models can be built.
Ironically, the closest interface we have for list + updates is javax.swing.ListModel with its javax.swing.event.ListDataListener.
@dovchinnikov Let me elaborate how I see your use-case 1 encoded in the strawman design I've sketched.
Here I'd actually use List<Update> as the type in the update flow. You will provide a function that that takes a pair of such List<Update> objects and combines them into a single List<Update> object. The simplest implementation just concatenates these two lists, but in an optimized version you'll implement squashing of Add-Remove pairs in the combined list. This will ensure that the first list of updates that any subscriber receives will actually consist of just a list of Add operations only.
In the end, you'll get Flow<List<Update>> that you'll process like this:
extensions.onEach { updates: List<Update> ->
for (update in updates) when (update) {
is Added -> //
is Removed -> //
}
}
Looks good, I'd like to try this in real life
What about DiffFlow, for the name.
I share this use case, but we need three kind of operations: ADD, REMOVE, and UPDATE.
Obviously it is possible to design UPDATE like REMOVE+ADD, however this choice introduces application and performance issues, for example: if I want to deposit money in the bank, I don't want to close my current account and open a new one.