stream_transform icon indicating copy to clipboard operation
stream_transform copied to clipboard

Provide an API to combine a stream list of any size

Open IchordeDionysos opened this issue 5 years ago • 3 comments

If one wants to combine a list of streams that may be also empty or only contains one stream, the solution would be to do this over and over:

final streams = <Stream>[...];
if (streams.isEmpty) {
  return Stream.value(<T>[]);
}
if (streams.length == 1) {
  return streams[0].map<List<T>>((event) => <T>[event]);
}
return streams[0].combineLatestAll(streams.sublist(1));

This is not very clean to write over and over and seems to me like a pretty common use-case especially if you deal with server data collections.

I would propose to add a function like this to stream transform natively:

Stream<List<T>> combineStreams<T>(List<Stream<T>> streams) {
  if (streams.isEmpty) {
    return Stream.value(<T>[]);
  }
  if (streams.length == 1) {
    return streams[0].map<List<T>>((event) => <T>[event]);
  }
  return streams[0].combineLatestAll(streams.sublist(1));
}

IchordeDionysos avatar Oct 24 '20 11:10 IchordeDionysos

For this use case the recommended api is StreamGroup.merge from package:async.

https://pub.dev/documentation/async/latest/async/StreamGroup/merge.html

natebosch avatar Jan 12 '21 00:01 natebosch

@natebosch well the APIs are different!

StreamGroup.merge:

Stream<T> merge<T>(Iterable<Stream<T>> streams);

combineLatestAll:

Stream<List<T>> combineLatestAll<T>(Iterable<Stream<T>> streams)

Basically, the parameter is the same, but the result is different.


StreamGroup.merge, from what I understand listens to all streams, and once one stream fires the output stream fires with the result of that firing stream.

So, let's say: Input Streams: Stream 1, Stream 2, Stream 3 Output Streams: Stream 4

Now let's look at what would be firing in each stream:

Input Output
Stream 1 -> A Stream 4 -> A
Stream 3 -> B Stream 4 -> B
Stream 2 -> C Stream 4 -> C
Stream 1 -> D Stream 4 -> D

combineLatestAll combines the result of all streams and fires the latest result of all streams as a list of all results from all input streams.

Let's take the example from above:

Input Output
Stream 1 -> A -
Stream 3 -> B -
Stream 2 -> C Stream 4 -> [A, C, B]
Stream 1 -> D Stream 4 -> [D, C, B]

The behavior is very different here.

IchordeDionysos avatar Jan 12 '21 01:01 IchordeDionysos

Ah I see, I misread. Sorry about that.

I'll give some thought to whether we should include an API like this. It doesn't fit the existing pattern for this package, but we could also put something in package:async since these two packages are going to be merged at some point.

This would differ from StreamZip in that it would emit when any source stream emits, not when every source stream emits.

natebosch avatar Jan 12 '21 01:01 natebosch