turbine icon indicating copy to clipboard operation
turbine copied to clipboard

Testing flows combined from other flows is difficult

Open cbruegg opened this issue 1 year ago • 4 comments

Turbine offers a nice API for awaiting individual items of flows, which then allows for running assertions on them. This requires the test code to have knowledge about the sequence of emissions from the flow.

Unfortunately, this does not work well for flows combined from other flows. As an example, consider the following flows:

val flowA = MutableStateFlow("a1")
val flowB = MutableStateFlow("b1")
val flowAB = combine(flowA, flowB) { a, b -> Pair(a, b) }

The flowAB will emit an updated pair whenever flowA or flowB emit an element. Consider this function that emits to flowA and flowB:

fun update() {
  flowA.value = "a2"
  flowB.value = "b2"
}

Calling this method causes two emissions on flowAB. Theoretically a test for the emissions of flowAB when calling update() could anticipate this and ignore the first emission and run its assertions on the second emission. However, in my opinion, tests should not be aware of such implementation details. What if we swap the order of the value assignments in the update method? What if we need to add another flow to the combined flow? In both cases, we would need to update the test as well.

One way around this issue is to await items of the flow until one matches assertions. This way we don't care about the precise sequence of emissions and instead wait for a valid emission to appear. The downside of this would be that such a function would need to have a timeout mechanism for the case where a valid element is not emitted.

I propose an alternative overload of awaitItem that implements this:

suspend fun awaitItem(assertions: suspend (T) -> Unit): T // assertions could also be non-suspending instead

It would run val item = awaitItem() in a loop, and run assertions(item) on each. If assertions(...) completes normally, the item is immediately returned. If it throws an AssertionError, that gets rethrown if the Turbine timeout has been reached. If it has not been reached, the next item is awaited and the cycle repeats.

An alternative to accepting an assertions lambda would be to accept a predicate lambda instead, but this does not work well for validating multiple conditions, since a predicate lambda would not return a message describing the failure. It would also encourage long, unreadable && chains.

With the assertions lambda, test code would look like this:

someCombinedFlow.test {
  awaitItem { item ->
    assertEquals("item.first should be a2", "a2", item.first)
    assertEquals("item.second should be b2", "b2", item.second)
  }
}

cbruegg avatar Jun 22 '23 08:06 cbruegg

I do not like the idea of an assertions lambda. Conflating exception usage for logic and test failure within a single test function is not something that I think is intuitive or has precedent in other libraries. A predicate lambda has equivalents in Flow operators, other stream libraries, and the standard library's collection operators. A predicate-based function would also allow you to build an exception-based one, if you so desire.

JakeWharton avatar Jun 22 '23 13:06 JakeWharton

Valid concerns and good workaround - thanks! A predicate-based function would actually work well then.

cbruegg avatar Jun 22 '23 14:06 cbruegg

I had a different use case, but similarly I wanted to await for "right" value to show up in the flow, so I wrote this extension

suspend fun <T> ReceiveTurbine<T>.awaitItemUntilAssertionPasses(
    timeout: Duration = DEFAULT_TURBINE_TIMEOUT,
    assertion: (T) -> Unit
): T {
    var error: AssertionError? = null
    try {
        return withTimeout(timeout) {
            var t: T
            while (true) {
                t = awaitItem()
                try {
                    assertion(t)
                    break
                } catch (e: AssertionError) {
                    error = e
                }
            }
            return@withTimeout t
        }
    } catch (e: TimeoutCancellationException) {
        if (error != null) {
            e.addSuppressed(error!!)
        }
        throw e
    }
}

(there's a bug in kotlin compiler so this var t looks a bit weird, but it has to be like that) Might help

NemanjaBozovic-TomTom avatar Jun 30 '23 14:06 NemanjaBozovic-TomTom

What we do to get around is call runCurrent() first (to make sure all async coroutine code gets executed) and then call expectMostRecentItem() to get the item instead of awaitFirst().

matejdro avatar Aug 11 '23 06:08 matejdro