flyd
flyd copied to clipboard
[Help Needed] CombineLatest in the style of RxJs
Hi, guys
I wonder if someone could help me with this problem? I'm trying to create a static method on flyd that has exactly the same behaviour as RxJs's CombineLatest. Needs to have the following:
- Should allow two or more streams to be passed in as parameters eg
flyd.combineLatest(stream1, stream2)
. - Resulting stream emits an array of values, eg
flyd.combineLatest(stream1, stream2).map(([v1, v2]) => {...});
. - The resulting stream should only emit a value when all source streams have emitted at least one value.
- The resulting stream should end when any of the source streams end.
I've looked at the existing lift
module, but this does not seem to handle 3 & 4.
Here's my attempt so far:
flyd.combineLatest = curryN_1(2, function (...streams) {
const result$ = flyd.immediate(
combine(function (...finalStreams) {
const res = [];
const changed = finalStreams[finalStreams.length - 1];
const self = finalStreams[finalStreams.length - 2];
for (let i = 0; i < streams.length; ++i) {
if (!streams[i].hasVal) {
return;
} else {
res.push(streams[i]());
}
}
self(res);
}, streams),
);
flyd.endsOn(
combine(
function () {
return true;
},
streams.map(item => item.end),
),
result$,
);
return result$;
});
This seems to work, except for a very odd problem when combineLatest
is used inside another stream.
Happy to elaborate on this, but really would appreciate if anyone can cast an eye over my implementation and spot anything that is obviously wrong.
I should add that my attempt is a bit of guesswork at the moment copied blatantly from the merge
implemenation — in case you couldn't tell that already!
Thanks Noel