rxdart icon indicating copy to clipboard operation
rxdart copied to clipboard

Add "combineLatest" operator

Open shinayser opened this issue 5 years ago • 5 comments

As of https://rxmarbles.com/#combineLatest

Today we have the Rx.combineLatest2 but this is not an operator itself, it is a static method, thus cannot be chained. Example:

void main(List<String> arguments) async {
  var streamName = BehaviorSubject.seeded("Daniel");
  var streamSurname = BehaviorSubject.seeded("Oliveira");

  var fullName =
      Rx.combineLatest2(streamName, streamSurname, (n1, n2) => "$n1 $n2");

  fullName.listen(print);
}

It prints Daniel Oliveira

With a combine latest operator we could chain calls in this nice idiomatic way:

void main(List<String> arguments) async {
  var streamName = BehaviorSubject.seeded("Daniel");
  var streamSurname = BehaviorSubject.seeded("Oliveira");

  var fullName = streamName
      .flatMap(_splitCharacters) //this is just a crazy example
      .combineLatest(streamSurname, (n1, n2) => "$n1 $n2");

  fullName.listen(print);
}

It prints:

D Oliveira
a Oliveira
n Oliveira
i Oliveira
e Oliveira
l Oliveira

This is a snippet of what this operator could be:

extension CombineLatest<A, B, T> on Stream<A> {
  Stream<T> combineLatest(Stream<B> otherStream, T Function(A, B) combiner) =>
      Rx.combineLatest2(this, otherStream, combiner);
}
  • This method's behavior is very different from withLatestFrom, because withLatestFrom doesn't cache the latest result from the input streams, so, it doesn't work like above.
  • I don't see a need of implementing combineLatest3, combineLatest4, combineLatest5. Just combineLatest could be used to fill most of the needs. Also, it can be chained infinite times, so...

shinayser avatar Jan 07 '20 14:01 shinayser

@shinayser I believe what you're looking for can be accomplished with the withLatestFrom operator. Given your example replace combineLatest with withLatestFrom and you will achieve the desired behavior.

EricHurt avatar Apr 07 '20 01:04 EricHurt

There's a difference indeed between combineLatest and withLatestForm, easiest to explain is via marbles:

  • https://rxmarbles.com/#combineLatest
  • https://rxmarbles.com/#withLatestFrom

...but it's a bit more work than the example extension

extension CombineLatest<A, B, T> on Stream<A> {
  Stream<T> combineLatest(Stream<B> otherStream, T Function(A, B) combiner) =>
      Rx.combineLatest2(this, otherStream, combiner);
}

Rx.combineLatest2 returns a single subscription Stream, which would break the source Stream, if that were a broadcast one. So we'd need to implement this as a proper transformer instead.

And if we do this one, then we should probably provide zip as an operator as well...

frankpepermans avatar Apr 08 '20 11:04 frankpepermans

I did not realize that the original request was looking for adding the additional streams to the source using the combineLatest operator. The example showed only a single source pull latest from the second stream.

I still think there is value in a combineLatest transformer. I'll take a look into creating one.

EricHurt avatar Apr 08 '20 13:04 EricHurt

I'm a bit reluctant to just include it also as an operator,

I think using Rx.combineLatest is actually clearer: all streams are treated equal, the api is easy to understand, you get the combined values and it closes when all streams provided close as well.

With combineLatest as an operator, you're more inclined to assume that the source Stream is in control, which is not the case. (withLatestFrom actually closes when the source Stream closes for example, it does not await the combined one).

Aren't we making thing more complex by trying to make things... less complex?

frankpepermans avatar Apr 08 '20 13:04 frankpepermans

Still without the extension method in the last version, i hope they can implement it soon

Velkamhell97 avatar Nov 15 '23 18:11 Velkamhell97