turbine
turbine copied to clipboard
How can I test back-pressure?
Hi, I was wondering if there is any mechanism to test the same back-pressure behavior that collect() has.
Here is an example on the different behaviors I encounter.
@Test
public fun `when I test - flow should backpressure`(): TestResult = runTest {
val myFlow:Flow<Int> = flow {
println("emitting 1")
emit(1)
println("emitted 1")
delay(10)
println("emitting 2")
emit(2)
println("emitted 2")
delay(10)
println("emitting 3")
emit(3)
println("emitted 3")
}
println("COLLECT")
myFlow.collect {
println("collected $it")
delay(50)
}
println()
println("TEST")
// Test
myFlow.test {
val item1 = awaitItem()
println("collected $item1")
delay(50)
val item2 = awaitItem()
println("collected $item2")
delay(50)
val item3 = awaitItem()
println("collected $item3")
awaitComplete()
}
}
Prints
COLLECT
emitting 1
collected 1
emitted 1
emitting 2
collected 2
emitted 2
emitting 3
collected 3
emitted 3
TEST
emitting 1
emitted 1
emitting 2
emitted 2
emitting 3
emitted 3
collected 1
collected 2
collected 3
Turbine uses an unconfined collector which runs concurrently with your test lambda to ensure that we see all values. We do not have a mechanism for applying backpressure to the upstream source.
Can you describe what you're actually testing?
This is probably not a good implementation, but I'm trying to see if I could have a flow that exposes an editable queue. Each collect suspends and takes some time to execute, while asynchronously the rest of the queue could be cleared or have new items added to it.
So while collect is still suspended, and the queue is cleared, the next item would not be emitted.
Any suggestions on how to achieve this would be appreciated :)
Here is a bit of a naive implementation.
val eventQueueList = mutableListOf<Event>()
val eventQueueFlow = flow {
while (true) {
val event = eventQueueList.removeFirstOrNull()
if (event != null) {
emit(event)
}
delay(10)
}
}
We can possibly design a variation of the API that allows applying backpressure by using a channel with a smaller buffer and potentially an intermediate coroutine which controls the number of allowed items before it blocks the collector.
Okay my thought here is that we basically let you control the number of buffered items by blocking the collector. By default it's infinite (the current behavior). You can explicitly increase the count which will unblock the collector for N items. Or if the collector is blocked and you call awaitItem we implicitly increase the count by 1 (effectively creating rendezvous behavior).
Ok I think I got it, so the default behavior would be the same, but somehow I could make this a blocking test, and there I would be able to call awaitItem() to increase the buffer by 1 and get the next item, with the behavior I expected on the description of the issue, or I could also call something like awaitItems(3) to increase the buffer by N and collect the next N values. Effectively suspending the producer until await* is called.
Is my understand correct? Do you think this could me implemented in the same flow.test{} API without breaking other behaviors, or should it be something like flow.blockingTest{}? Because I think with the blocking one you will lose the capability of detecting a timeout on a hanging await* right?
I think it would be the same API.
You would call .test(initialCapacity = 1) { .. } (or whatever you want) which would allow at most 1 item into the channel and then block the collector. We probably have to force you to at least allow 1 from the start because we need to start collecting from the Flow. But then after that, because the collector is blocked, backpressure is applied. So if your Flow is conflated and you trigger two operations they should result in only a single notification when you eventually unblock the collector with a second awaitItem call or a call to a new function such as increaseCapacity(1) (but with a better name).
I agree with enforcing at least 1.
But I can't think of a use case where I would want to call increaseCapacity(N) without an associated awaitItem. Something like unblocking the Flow production in a controlled manner without collecting it, but I guess there might be some convoluted use case for it since the buffer already exists.
I mean we can omit it and start with only two modes: rendezvous or unlimited buffer. If someone complains at least we have an idea on how to build support for arbitrary buffer capacities, but maybe no one actually needs it.