RxCpp icon indicating copy to clipboard operation
RxCpp copied to clipboard

Need static version of combine latest that accepts vector

Open tonicsoft opened this issue 4 years ago • 8 comments

I would like to be able to do the following:

    auto o1 = rxcpp::observable<>::interval(std::chrono::milliseconds(2));
    auto o2 = rxcpp::observable<>::interval(std::chrono::milliseconds(3));
    auto o3 = rxcpp::observable<>::interval(std::chrono::milliseconds(5));
    auto inputs = std::vector{o1, o2, o3};

    auto values_varargs_api = rxcpp::observable<>::combine_latest(o1, o2, o3);
    auto values = rxcpp::observable<>::combine_latest(inputs); // vector API
    values.
        take(5).
        subscribe(
            [](std::tuple<int, int, int> v){printf("OnNext: %d, %d, %d\n", std::get<0>(v), std::get<1>(v), std::get<2>(v));},
            [](){printf("OnCompleted\n");});

The reason is to be able to handle the case where the number of elements to combine is not known at compile time.

Furthermore, the non-static version of the function is not pleasant to use in some situations because it makes one of the inputs special (the one you have to call the function). It doesn't really belong as an instance function and should be static so that all the inputs are treated the same.

See rx-java for example.

Note: the example given has 3 hard coded observables, but my real use cases required a genuinely dynamic number of observables.

tonicsoft avatar Apr 03 '20 10:04 tonicsoft

Hi,

This is commonly solved with iterate()

iterate(inputs).combine_latest();

kirkshoop avatar Apr 05 '20 23:04 kirkshoop

Thanks for the reply, I'm having a similar issue with lack of observable based debounce operator, but you've prompted me to think how it can be implemented in terms of concat and switch_on_next.

Back to combine_latest though, I'd really appreciate some help breaking down your suggestion. It's very difficult to interpret parameter and return types from the source due to all the templates, and I can't find any occurances of .combine_latest() (parameterless call) in any of the examples or tests of rxCpp.

here is the function signature I'm interested in (using namspace std and rxcpp):

template <class T>
observable<vector<T>> combine_latest(vector<observable<T>> inputs) {
    ...
}

The returned observable should emit each time any of the input observables emit. I have two working implementations already, one by copying the library's version of combine_latest and replacing std::tuple with std::vector, and one by recursively combining the elements pairwise using the library combine_latest, so I'm quite sure this signature is the one I need.

Taking your code and extracting variables so we can see the types, I get the following:

observable<observable<T>> iterated = observable<>::iterate(inputs);
observable<tuple<observable<T, dynamic_observable<T>>>> combined = iterated.combine_latest();

What I can't see is how to convert this into an observable of vectors, and in fact it's not even clear to me what iterate does in this context, and what combine_latest does when called with no parameters. The appearance of dynamic_observable is also confusing me.

As I said, any help you can provide will be much appreciated.

tonicsoft avatar Apr 08 '20 13:04 tonicsoft

You are correct. I forgot that combine_latest, with_latest_from and zip require all the sources up-front so that they can compute the tuple type.

Any solution to this will require a fixed size for the vector. std::array is the most accurate type for this.

Example:

something
  .map([](std::array<observable<int>, 4> arr){
    return combine_latest(arr[0], arr[1], arr[2], arr[3]);
  })
  .concat()
  //..

kirkshoop avatar Apr 08 '20 14:04 kirkshoop

std::array must have the size known at compile time, and unfortunately in my case the size is known only at runtime (although it does indeed have to be fixed as you say).

Looking back at my original example I realise that wasn't really clear, sorry. The 3 hard coded streams were a place holder for a genuinely dynamic number of input streams.

tonicsoft avatar Apr 08 '20 16:04 tonicsoft

Yes, this use case will require algorithms that use a collection, like vector, in place of these algorithms, which use tuple. Building those from scratch would be best. The alternative is a bit of a hack.

A vector of the input observables could be materialized and then merged and then scaned into vectors of results which could be filtered through distinct_until_changed

kirkshoop avatar Apr 08 '20 21:04 kirkshoop

Doesn't merging the materialized observables have the same issue (only accepting variadic arguments)? Or are you suggesting to apply the recursive merge trick here?

tonicsoft avatar Apr 08 '20 22:04 tonicsoft

Materialize is not strictly required. It can be useful to interact with notifications for all three signals from each input observable.

The actual requirement is to associate the index of the observable with each value it produces.

This way each value produced can be used in the scan to replace the previous value at that index and emit the new vector of values.

kirkshoop avatar Apr 09 '20 00:04 kirkshoop

But is there any way to merge/scan/combine the N observables without the pairwise recursion hack (which for java at least has quite poor performance compared to custom operator)? It seems all the features offered by the library act on variadic arguments only and none of them operate on a vector.

tonicsoft avatar Apr 10 '20 13:04 tonicsoft