turbine icon indicating copy to clipboard operation
turbine copied to clipboard

How can I test back-pressure?

Open caiofaustino opened this issue 3 years ago • 8 comments

Hi, I was wondering if there is any mechanism to test the same back-pressure behavior that collect() has. Here is an example on the different behaviors I encounter.

@Test
public fun `when I test - flow should backpressure`(): TestResult = runTest {
    val myFlow:Flow<Int> = flow {
        println("emitting 1")
        emit(1)
        println("emitted 1")
        delay(10)
        println("emitting 2")
        emit(2)
        println("emitted 2")
        delay(10)
        println("emitting 3")
        emit(3)
        println("emitted 3")
    }


    println("COLLECT")
    myFlow.collect {
        println("collected $it")
        delay(50)
    }

    println()
    println("TEST")
    // Test
    myFlow.test {
        val item1 = awaitItem()
        println("collected $item1")
        delay(50)
        val item2 = awaitItem()
        println("collected $item2")
        delay(50)
        val item3 = awaitItem()
        println("collected $item3")
        awaitComplete()
    }
}

Prints

COLLECT
emitting 1
collected 1
emitted 1
emitting 2
collected 2
emitted 2
emitting 3
collected 3
emitted 3

TEST
emitting 1
emitted 1
emitting 2
emitted 2
emitting 3
emitted 3
collected 1
collected 2
collected 3

caiofaustino avatar Sep 29 '22 12:09 caiofaustino

Turbine uses an unconfined collector which runs concurrently with your test lambda to ensure that we see all values. We do not have a mechanism for applying backpressure to the upstream source.

Can you describe what you're actually testing?

JakeWharton avatar Sep 29 '22 14:09 JakeWharton

This is probably not a good implementation, but I'm trying to see if I could have a flow that exposes an editable queue. Each collect suspends and takes some time to execute, while asynchronously the rest of the queue could be cleared or have new items added to it.

So while collect is still suspended, and the queue is cleared, the next item would not be emitted.

Any suggestions on how to achieve this would be appreciated :)

Here is a bit of a naive implementation.

val eventQueueList = mutableListOf<Event>()
val eventQueueFlow = flow {
    while (true) {
        val event = eventQueueList.removeFirstOrNull()
        if (event != null) {
            emit(event)
        }
        delay(10)
    }
}

caiofaustino avatar Sep 29 '22 15:09 caiofaustino

We can possibly design a variation of the API that allows applying backpressure by using a channel with a smaller buffer and potentially an intermediate coroutine which controls the number of allowed items before it blocks the collector.

JakeWharton avatar Sep 29 '22 15:09 JakeWharton

Okay my thought here is that we basically let you control the number of buffered items by blocking the collector. By default it's infinite (the current behavior). You can explicitly increase the count which will unblock the collector for N items. Or if the collector is blocked and you call awaitItem we implicitly increase the count by 1 (effectively creating rendezvous behavior).

JakeWharton avatar Nov 18 '22 17:11 JakeWharton

Ok I think I got it, so the default behavior would be the same, but somehow I could make this a blocking test, and there I would be able to call awaitItem() to increase the buffer by 1 and get the next item, with the behavior I expected on the description of the issue, or I could also call something like awaitItems(3) to increase the buffer by N and collect the next N values. Effectively suspending the producer until await* is called.

Is my understand correct? Do you think this could me implemented in the same flow.test{} API without breaking other behaviors, or should it be something like flow.blockingTest{}? Because I think with the blocking one you will lose the capability of detecting a timeout on a hanging await* right?

caiofaustino avatar Nov 18 '22 20:11 caiofaustino

I think it would be the same API.

You would call .test(initialCapacity = 1) { .. } (or whatever you want) which would allow at most 1 item into the channel and then block the collector. We probably have to force you to at least allow 1 from the start because we need to start collecting from the Flow. But then after that, because the collector is blocked, backpressure is applied. So if your Flow is conflated and you trigger two operations they should result in only a single notification when you eventually unblock the collector with a second awaitItem call or a call to a new function such as increaseCapacity(1) (but with a better name).

JakeWharton avatar Nov 18 '22 20:11 JakeWharton

I agree with enforcing at least 1.

But I can't think of a use case where I would want to call increaseCapacity(N) without an associated awaitItem. Something like unblocking the Flow production in a controlled manner without collecting it, but I guess there might be some convoluted use case for it since the buffer already exists.

caiofaustino avatar Nov 18 '22 20:11 caiofaustino

I mean we can omit it and start with only two modes: rendezvous or unlimited buffer. If someone complains at least we have an idea on how to build support for arbitrary buffer capacities, but maybe no one actually needs it.

JakeWharton avatar Nov 18 '22 20:11 JakeWharton