turbine
turbine copied to clipboard
Testing flows combined from other flows is difficult
Turbine offers a nice API for awaiting individual items of flows, which then allows for running assertions on them. This requires the test code to have knowledge about the sequence of emissions from the flow.
Unfortunately, this does not work well for flows combined from other flows. As an example, consider the following flows:
val flowA = MutableStateFlow("a1")
val flowB = MutableStateFlow("b1")
val flowAB = combine(flowA, flowB) { a, b -> Pair(a, b) }
The flowAB
will emit an updated pair whenever flowA
or flowB
emit an element. Consider this function that emits to flowA
and flowB
:
fun update() {
flowA.value = "a2"
flowB.value = "b2"
}
Calling this method causes two emissions on flowAB
. Theoretically a test for the emissions of flowAB
when calling update()
could anticipate this and ignore the first emission and run its assertions on the second emission. However, in my opinion, tests should not be aware of such implementation details. What if we swap the order of the value
assignments in the update
method? What if we need to add another flow to the combined flow? In both cases, we would need to update the test as well.
One way around this issue is to await items of the flow until one matches assertions. This way we don't care about the precise sequence of emissions and instead wait for a valid emission to appear. The downside of this would be that such a function would need to have a timeout mechanism for the case where a valid element is not emitted.
I propose an alternative overload of awaitItem
that implements this:
suspend fun awaitItem(assertions: suspend (T) -> Unit): T // assertions could also be non-suspending instead
It would run val item = awaitItem()
in a loop, and run assertions(item)
on each. If assertions(...)
completes normally, the item is immediately returned. If it throws an AssertionError
, that gets rethrown if the Turbine timeout has been reached. If it has not been reached, the next item is awaited and the cycle repeats.
An alternative to accepting an assertions
lambda would be to accept a predicate lambda instead, but this does not work well for validating multiple conditions, since a predicate lambda would not return a message describing the failure. It would also encourage long, unreadable &&
chains.
With the assertions lambda, test code would look like this:
someCombinedFlow.test {
awaitItem { item ->
assertEquals("item.first should be a2", "a2", item.first)
assertEquals("item.second should be b2", "b2", item.second)
}
}
I do not like the idea of an assertions lambda. Conflating exception usage for logic and test failure within a single test function is not something that I think is intuitive or has precedent in other libraries. A predicate lambda has equivalents in Flow operators, other stream libraries, and the standard library's collection operators. A predicate-based function would also allow you to build an exception-based one, if you so desire.
Valid concerns and good workaround - thanks! A predicate-based function would actually work well then.
I had a different use case, but similarly I wanted to await for "right" value to show up in the flow, so I wrote this extension
suspend fun <T> ReceiveTurbine<T>.awaitItemUntilAssertionPasses(
timeout: Duration = DEFAULT_TURBINE_TIMEOUT,
assertion: (T) -> Unit
): T {
var error: AssertionError? = null
try {
return withTimeout(timeout) {
var t: T
while (true) {
t = awaitItem()
try {
assertion(t)
break
} catch (e: AssertionError) {
error = e
}
}
return@withTimeout t
}
} catch (e: TimeoutCancellationException) {
if (error != null) {
e.addSuppressed(error!!)
}
throw e
}
}
(there's a bug in kotlin compiler so this var t
looks a bit weird, but it has to be like that)
Might help
What we do to get around is call runCurrent()
first (to make sure all async coroutine code gets executed) and then call expectMostRecentItem()
to get the item instead of awaitFirst()
.