kotlinx.coroutines icon indicating copy to clipboard operation
kotlinx.coroutines copied to clipboard

zip(Iterable<Flow>) operator ?

Open martinbonnin opened this issue 4 years ago • 13 comments

Is there an operator to zip a list of Flow ? Similar to combine except it will "group" items together instead of emitting the last items as soon as a new one comes in:.

Something similar to http://reactivex.io/documentation/operators/zip.html for Flows

martinbonnin avatar Mar 30 '20 16:03 martinbonnin

Could you please provide a compelling use-case for this operator? E.g. an example of a problem you want to solve with this operator

qwwdfsad avatar Mar 30 '20 20:03 qwwdfsad

One use case I have is I have a backend where I have to wait for 5 http api calls. I would love to be able to zip them all so they run in parallel and then I can make my computation based on those 5 results

mboudraa avatar Mar 30 '20 21:03 mboudraa

@mboudraa How Flow is useful in your case of http api calls? For concurrent calls we have the following idiom:

coroutineScope { 
    // make the calls concurrently
    val d1 = async { doCall1() }
    val d2 = async { doCall2() }
    ...
    val d5 = async { doCall5() }
    // await the call results & process them
    processResults(d1.await(), d2.await(), ...., d5.await())
}

Does it apply to your use-case?

elizarov avatar Mar 31 '20 08:03 elizarov

This came as a discussion on slack and I found it strange that this API wasn't there since the combine equivalent exists and also there's a zip for 2 Flows but not for a List so it feels asymmetrical.

Now that I think about it, I can't think of a valid use case for a zip(Iterable<Flow>). But I can't find one for zip(flow1, flow2) either 😅. I guess you can close the ticket and I'll reopen if I hear/think about a valid use case.

martinbonnin avatar Mar 31 '20 09:03 martinbonnin

@elizarov Yes it does apply to my use case, and to be honest, That's what I've been using so far.

I've been using zip(flow1, flow2) when I wrote a redux implementation using Flow. flow 1 was to reduce the state, flow2 was for side effects.

That's where i felt uncomfortable not seeing a zip(Iterable<Flow>) coming from a Rx World.

mboudraa avatar Mar 31 '20 13:03 mboudraa

I have a usecase. I have a notion of a reactive re-startable task which emits a success/failure result on whenever it is restarted:

interface Task<T> {
  val results: Flow<Either<T, Throwable>>
  fun start()
}

And I have a declarative UI which listens for these results whenever they arrive.

Now I have screen which needs to launch 4 such tasks and when all of them are completed (with either success or error), it transitions from loading state to "content" state.

So naturally I want to write some combinator which combines any number of tasks, zips their result and outputs a single Task abstraction which can be given to the screen:

fun zip(
  vararg tasks: Task<Unit>
) : Task<Unit> {
  return object : Task<Unit> {
    override val results: Flow<Either<T, Throwable>> {
       // can't do this, no multi-arg zip!
       return zip(tasks, ::combineResultOrError)
    }
    
    override val start() { tasks.forEach { it.start() } }
  }
}

fun combineResultOrError(items: Array<Either<T, Throwable>>) { ... }

Having no zip which accepts vararg/list stands in the way of such implementation.

I guess this can be done with continually zipping pairs of flows from the vararg list, but this is hard to write from the top of my head and not sure if it will perform nicely.

dimsuz avatar Feb 03 '22 18:02 dimsuz

I have a use case -- key generation for a database table by parts.

A table key may have N parts.

The key generation API takes a count argument which says how many table keys to generate.

Each key part has an associated generation strategy that also take a count of how many values to generate and returns a Flow<Long>

I need to combine N Flow<Long>s into a Flow<KeyType>, with zip semantics.

dfings avatar Mar 01 '22 01:03 dfings

I think that this issue should be reopened, there is definitely request for this, even if it not planned, at least it should be open for discussion

gildor avatar Mar 01 '22 07:03 gildor

For now I'm creating a List<ReceiveChannel<T>> using produceIn and then calling channels.map { it.receive() } in lockstep.

dfings avatar Mar 01 '22 20:03 dfings

Any update on this, everyone? @elizarov pls check @dfings use case. Besides, I also have another case:

  • I custom a FlowCallAdapterFactory to use it with Retrofit. So all my service apis always return Flow<X>
  • What should I do if need to call 5 or a list of that apis? I think you can supply zip two Flows, why not List<Flow<X>>?

VuHongKy avatar Jul 18 '22 17:07 VuHongKy

@VuHongKy IIRC, retrofit calls are single-shot so using combine should work like any potential .zip()?

In general, I opened this issue because I wanted to "combine" multiple values.

Zip semantics will wait for all Flows to emit their items before combining them:

Screenshot 2022-07-18 at 20 39 17

Combine semantics on the other hand combines them as soon as a new item is emitted:

Screenshot 2022-07-18 at 20 40 25

(marbles diagrams from the RxJava doc)

So combine emits more but also reduces latency, which was better for my original use case. I'm not saying there's no use case for .zip(Iterable<Flow>) but feels like a lot of the use cases in this thread would work the same, if not better using combine semantics?

martinbonnin avatar Jul 18 '22 18:07 martinbonnin

The use case I described requires zip semantics.

dfings avatar Jul 18 '22 19:07 dfings

@dfings Right! Sorry I wasn't sure about this one. If each Flow has a given number of items and the resulting Flow requires the same number of items then indeed seems like a good use case for .zip() 👍

martinbonnin avatar Jul 18 '22 19:07 martinbonnin

Is this issue addressed? I also have another use case where .zip(Iterable<Flow>) is needed. Let's say you have a loading UI and then you call 5 different flows, such as: getEmail(), getLoyaltyID(), etc. And then, you want to wait until all those five calls to be finished before hiding the loading UI.

I'm using combine now but the result is so ugly because it immediately removes the loading UI before calling all 5 APIs.

hanselgunawan avatar Mar 06 '23 21:03 hanselgunawan