kotlinx.coroutines
kotlinx.coroutines copied to clipboard
Proposal: Flow pause cooperatively
trafficstars
Basic idea
interface CoroutinePausing : CoroutineContext.Element {
val isPaused: StateFlow<Boolean>
override val key: CoroutineContext.Key<*> get() = Key
companion object Key : CoroutineContext.Key<CoroutinePausing>
}
- Downstream may provide this to signal, if 'currently interested in emissions'. If CoroutinePausing is not in context, downstream is automatically considered Unpaused.
- Upstream may respect that, and observe it to avoid unnecessary work.
Value of this comes from libraries using it, but it is not imposed on anyone.
Motivation 1: SharedFlow
- upstream is Unpaused when*: at least one collector is Unpaused (or, equivalently: Paused when: no collectors, or all Paused)
- It is mostly orthogonal, and the key can just always be provided to the upstream flow, regardless of other strategies. (Although, some pausing strategies can just be equivalent to wrapping the upstream flow in some pausing-aware operator.)
- (* Should also respect Key from context in which it runs. Probably as 'if context is paused, pause upstream even if there are Unpaused collectors'.)
- Makes it possible to pause upstream without 'recollecting' it, which might be expensive or impossible.
Motivation 2: Lifecycle (Android)
- Key can be provided in
LifecycleCoroutineScope. - Libraries (e.g. Room) would use this just like they do with LiveData now.
- This was the (I think) last case where LiveData was needed over just using Flow everywhere.
- (Obviously, not only Lifecycle. Can be tied to, say, a view being visible or not. It's very flexible and
StateFlow<Boolean>is easy to provide.)
Benefits
- Simple.
- Lots of value for little work.
- No change to most of codebase. (library or clients)
- Direct communication between both ends of flow by default. (only 'intercepted' when needed: i.e. SharedFlow, or operators that add pausing, or even prevent pausing...)
- No need to affect the 'processing' part of a flow: Directly pauses source, and thus nothing is emitted through the flow.
- Significantly simplifies usage of SharedFlow, and potentially even internals.
- Can be easily extended, if it turns out
isPaused: StateFlow<Boolean>is not enough to communicate all 'intents' from downstream to upstream. - Not limited to Flow: Can be a general pausing mechanism for all coroutines. (hence my initial name above not being DownstreamPausing or HotFlowPausing)
Downsides
- Requires cooperation. (but that is already the case for coroutine cancellation, etc.)
- Possibly even a good thing, as 'pausing' in an arbitrary point of execution is probably a horrible idea.
Conceptual usage
(names are poor, ignore those)
suspend fun awaitUnpaused() {
val p = coroutineContext[CoroutinePausing] ?: return
if (!p.isPaused.value) return
p.isPaused.takeWhile { it }.collect()
}
//to wrap a flow that does not support this mechanism:
fun <T> Flow<T>.awaitingUnpaused() = flow {
awaitUnpaused()
collect {
emit(it)
//after emit, so an old value is not emitted later, once unpaused
// instead immediately 'resumes' upsteram
awaitUnpaused()
}
}
//to wrap a flow that does not support this mechanism:
fun <T> Flow<T>.recollectWhenUnpaused(timeoutMillis: Long = 0) = flow<T> {
//this is, I believe, the currently intended way for SharedFlow to handle upstream 'pausing'
//it could be separated from SharedFlow
TODO("stop collecting when paused; start collecting when unpaused")
}