rxdart
rxdart copied to clipboard
Add "combineLatest" operator
As of https://rxmarbles.com/#combineLatest
Today we have the Rx.combineLatest2
but this is not an operator itself, it is a static method, thus cannot be chained. Example:
void main(List<String> arguments) async {
var streamName = BehaviorSubject.seeded("Daniel");
var streamSurname = BehaviorSubject.seeded("Oliveira");
var fullName =
Rx.combineLatest2(streamName, streamSurname, (n1, n2) => "$n1 $n2");
fullName.listen(print);
}
It prints Daniel Oliveira
With a combine latest operator we could chain calls in this nice idiomatic way:
void main(List<String> arguments) async {
var streamName = BehaviorSubject.seeded("Daniel");
var streamSurname = BehaviorSubject.seeded("Oliveira");
var fullName = streamName
.flatMap(_splitCharacters) //this is just a crazy example
.combineLatest(streamSurname, (n1, n2) => "$n1 $n2");
fullName.listen(print);
}
It prints:
D Oliveira
a Oliveira
n Oliveira
i Oliveira
e Oliveira
l Oliveira
This is a snippet of what this operator could be:
extension CombineLatest<A, B, T> on Stream<A> {
Stream<T> combineLatest(Stream<B> otherStream, T Function(A, B) combiner) =>
Rx.combineLatest2(this, otherStream, combiner);
}
- This method's behavior is very different from
withLatestFrom
, because withLatestFrom doesn't cache the latest result from the input streams, so, it doesn't work like above. - I don't see a need of implementing
combineLatest3
,combineLatest4
,combineLatest5
. Just combineLatest could be used to fill most of the needs. Also, it can be chained infinite times, so...
@shinayser I believe what you're looking for can be accomplished with the withLatestFrom operator. Given your example replace combineLatest with withLatestFrom and you will achieve the desired behavior.
There's a difference indeed between combineLatest and withLatestForm, easiest to explain is via marbles:
- https://rxmarbles.com/#combineLatest
- https://rxmarbles.com/#withLatestFrom
...but it's a bit more work than the example extension
extension CombineLatest<A, B, T> on Stream<A> {
Stream<T> combineLatest(Stream<B> otherStream, T Function(A, B) combiner) =>
Rx.combineLatest2(this, otherStream, combiner);
}
Rx.combineLatest2 returns a single subscription Stream, which would break the source Stream, if that were a broadcast one. So we'd need to implement this as a proper transformer instead.
And if we do this one, then we should probably provide zip as an operator as well...
I did not realize that the original request was looking for adding the additional streams to the source using the combineLatest operator. The example showed only a single source pull latest from the second stream.
I still think there is value in a combineLatest transformer. I'll take a look into creating one.
I'm a bit reluctant to just include it also as an operator,
I think using Rx.combineLatest
is actually clearer: all streams are treated equal, the api is easy to understand, you get the combined values and it closes when all streams provided close as well.
With combineLatest
as an operator, you're more inclined to assume that the source Stream
is in control, which is not the case. (withLatestFrom
actually closes when the source Stream closes for example, it does not await the combined one).
Aren't we making thing more complex by trying to make things... less complex?
Still without the extension method in the last version, i hope they can implement it soon