kotlinx.coroutines icon indicating copy to clipboard operation
kotlinx.coroutines copied to clipboard

Add withLatestFrom operator.

Open zach-klippenstein opened this issue 6 years ago • 10 comments

Use case: The processing of one flow (the operator's receiver) requires using the latest value emitted from another flow at the time of emission from the first flow. My specific use case involves combining a flow of events with a flow of event sinks ((Event) -> Unit). Whenever an event occurs (is emitted from the first flow), it should be sent to the most recent sink emitted from the second flow.

The resulting flow is entirely driven by the first flow. This operator conflates and caches values from the second flow and passes them to a lambda to be combined with values from the first flow whenever the first flow emits. The resulting flow will start collecting both flows immediately, but will not call the transform function or emit anything until both flows emit at least one value. The resulting flow will remain active until either flow throws an exception, or the first flow completes. If the first flow completes before the second, the second flow is cancelled. If the second flow completes before the first, the latest value will continue to be cached and passed to the transform function every time the first flow emits.

This operator is similar to combineLatest, but it always emits immediately and only when the first flow emits (values from the first flow are not cached). The implementation in this PR reuses as much of the infrastructure as possible from combineLatest. It collects the second flow using a CONFLATED channel since the operator only cares about caching the most recently emitted value, so the second flow does not need to have any backpressure applied.

This PR only includes a single overload of the operator, accepting a single other flow, since that is the only overload I have a concrete use case for.

zach-klippenstein avatar Jul 03 '19 03:07 zach-klippenstein

Gentle ping: Is the use case I've given adequate, or would more information be helpful?

zach-klippenstein avatar Jul 19 '19 18:07 zach-klippenstein

This will not be part of 1.3.0 release. We'll look at when planning for the next batch of operations in subsequent versions. To clarify, the main problem we facing here is we are trying to keep our API minimal and orthogonal, so we're looking at more primitive operators that would let us express operators like withLatestFrom and to consistently name them, too.

elizarov avatar Jul 22 '19 16:07 elizarov

Do I correctly understand that this is combineTransform with Latest extension? Actually, I'd love to see this.

My usecase is for example: loading places to map; user is moving with map, which cancels the previous fetching, also the loaded data depend on filter or other Flows.

hrach avatar Aug 27 '19 10:08 hrach

I don't think you can implement this with combineLatest - that operator is triggered whenever any of it's upstream flows emits. This operator would cache some upstream values but only trigger on a subset of them.

It's closer to a zip, which requires all operators to emit, combined with a (non-existent) operator on some upstreams that would be similar to conflate: A.withLatestFrom(B) means "zip A with B, but if B emits again while waiting for A then use the new value from B instead". Breaking the operator down in this way is more flexible because you could "withLatestFrom" an arbitrary number of streams and have fine grained control over which ones trigger updates and which ones are just cached, but I'm not sure how this variant of conflate could be implemented because it would need to affect synchronization logic that currently exists within and is very specific to the zip operator. It could be done with custom operator fusion but that feels hacky and brittle.

zach-klippenstein avatar Aug 27 '19 11:08 zach-klippenstein

A simple implementation using only stable Flow APIs can be found here: https://pl.kotl.in/IYfZx_sKY

elizarov avatar Aug 27 '19 14:08 elizarov

I'm having trouble writing unit tests for the implementation in https://github.com/Kotlin/kotlinx.coroutines/pull/1315#issuecomment-525338743

Here's what I've tried.

@Test
fun `When receiver flow emits then resulting flow emits`()  {

    val firstFlowChannel = ConflatedBroadcastChannel<Char>()
    val otherFlowChannel = ConflatedBroadcastChannel<Int>()

    val resultingFlow =
        firstFlowChannel.asFlow()
            .withLatestFrom(otherFlowChannel.asFlow()) { a, b -> "$a$b"}

    otherFlowChannel.offer(1)
    firstFlowChannel.offer('a')

    runBlocking {
        val receiveChannel = resultingFlow.produceIn(this)
        assertEquals("a1", receiveChannel.poll())
        receiveChannel.cancel()
    }
}

This always fails with

java.lang.AssertionError: Expected :a1 Actual :null

I see receiveChannel is empty. If I try to add breakpoints it looks like the withLatestFrom is not even being called. I suspect I've misconfigured the test but I cannot spot the problem.

curioustechizen avatar Sep 23 '19 13:09 curioustechizen

@curioustechizen Don't poll the receive channel. Use receive.

elizarov avatar Sep 23 '19 15:09 elizarov

@elizarov Thanks. That worked. If I change assertEquals(true, receiveChannel.isEmpty) the test passes.

Do I understand correctly that flow.produceIn() does not actually trigger the processing; but rather it is channel.receive() that triggers it?

curioustechizen avatar Sep 23 '19 16:09 curioustechizen

@curioustechizen produceIn (may) launch a coroutine to collect upstream, but that coroutine probably won't actually start executing before produceIn returns and you call poll immediately on the next line (depending on dispatcher and thread scheduling details). If it doesn't, poll will return null.

zach-klippenstein avatar Sep 23 '19 17:09 zach-klippenstein

Is there anything blocking this PR?

Dimezis avatar Oct 21 '21 18:10 Dimezis