kotlinx.coroutines
kotlinx.coroutines copied to clipboard
Introduce Flow.all/any/none operators
Use case
I have a complex Flow that possesses many elements. I have a business rule that is, literally, to do something if any of them satisfies a condition.
If I were using List or Set, I would use any { theCondition(it) }. However, Flow doesn't seem to have any.
There is an old issue (#2239) that asks for this feature, and is closed because of alternative implementations:
suspend fun <T : Any> Flow<T>.any(predicate: suspend (T) -> Boolean): Boolean { return this.firstOrNull { predicate(it) } != null } suspend fun <T> Flow<T>.all(predicate: suspend (T) -> Boolean): Boolean { return this.count { !predicate(it) } == 0 }
I dislike this solution because:
- The first one doesn't work with a
Flow<Foo?>becausefirstOrNullis only available on non-nullable types. - Both options obfuscate the operation I am actually attempting to perform.
I believe it is worth having any/all/none directly in the library because the proposed implementation have downsides.
The Shape of the API
suspend fun <T> Flow<T>.any(predicate: suspend (T) -> Boolean): Boolean
suspend fun <T> Flow<T>.all(predicate: suspend (T) -> Boolean): Boolean
suspend fun <T> Flow<T>.none(predicate: suspend (T) -> Boolean): Boolean
Prior Art
These three functions are already available on Iterable and Sequence. The behavior should be the same.
all and none can use the count operator under the hood, since it already shortcuts. I believe this is a possible implementation of any, though I haven't tested it yet:
suspend fun <T> Flow<T>.any(predicate: suspend (T) -> Boolean) = this
.transformWhile {
if (predicate(it)) {
emit(true)
false // ONE element MATCHES, no need to go further
} else {
true // continue
}
}
.onEmpty { emit(false) }
.first()
I can submit a PR if you think this is worth pursuing.
Simplified implementation:
suspend fun <T> Flow<T>.any(predicate: (T) -> Boolean) = this
.filter { predicate(it) }
.map { true }
.onEmpty { emit(false) }
.first()
We had a Slack discussion about the use case which prompted this (https://kotlinlang.slack.com/archives/C1CFAFJSK/p1723480063142319).
What I took away from that discussion surprised me, but I now believe that Flow is applicable everywhere Sequence is applicable, meaning that we should add Sequence API to Flow on demand without asking for use cases.
The use case is completely linear, with few hints at asynchronous behavior. A list is taken, converted to a Flow, then map { aSuspendingFunction() } is performed, and last, any { } collects the result.
Conceptually, Sequence is a good choice here: the use case is not to create cold streams of values, it's to enable short-circuiting behavior while traversing the list as a sequence. Calling a suspending function in map, however, breaks this nice concept and forces Flow into the code, even if we attempt to utilize the sequence function that allows running suspend code:
- Structured concurrency is broken if we call
suspend fun→fun→suspend fun. The middlefuncan berunBlocking, or it can besequence { }.any { }—in any case, cancellation stops working unless you write bespoke code to preserve it. - The thread is hogged by
funif we callsuspend fun→fun→suspend fun, so functions that callsuspend functions are a bit safer to use if the middlefunbecomes asuspend funas well. - If we have
suspend funrunning in aSequence, the fairly natural desire to actually make the code run asynchronously will not work, asSequencelacks the necessary facilities. The edit distance for adding buffering to aFlowis tiny, but adding buffering to aSequencemeans rewriting everything toFlow.
It seems to me like kotlinx.coroutines users should be able to use Flow whenever a Sequence is idiomatic, if only to break the suspend fun → fun → suspend fun call chain.
That's a really nice angle!
Originally, we thought of a Flow as an asynchronous cold (push-based) stream. It was never supposed to be a short-circuit analogy of channels or a suspendable sequence. Thus, not only was operator parity not a concern, but we also were extremely cautious about adding new ones as we made this mistake with channel operators. That's why we asked for use cases even for the trivial operators (also, the idea of "Flow operators are trivial, you can write your own in a few lines of code" is still here). We wanted to minimize API surface, minimize the potential of misuse and nudge people to a sequence where they needed a sequence.
Yet the reality begs to differ -- among other things, Flow is indeed used as a suspendable sequence, people expect all the batteries to be included and, if somethings looks like a sequence, quacks like a sequence and walks like a sequence [in trivial scenarios] it's better to have operators like a sequence.
There are not really many upsides of keeping the status quo, I think