RxCpp icon indicating copy to clipboard operation
RxCpp copied to clipboard

how to implement your own operators for observables efficiently in terms of compile time

Open maiermic opened this issue 8 years ago • 2 comments

I wrote my own operator subscribe_with_latest_from as a function that composes existing operators. However, it takes much long to compile. This example takes almost 6 seconds to compile on my system (with Clang):

#include <rxcpp/rx.hpp>

template<typename Fn, typename... Observables>
auto subscribe_with_latest_from(Fn f, Observables... observables) {
    return [=](auto &&source) {
        return source
                .with_latest_from(
                        [=](auto &&...args) {
                            f(args...);
                            return 0; // dummy value
                        },
                        observables...
                )
                .subscribe([](auto _) {});
    };
}

int identity(int value) { return value; }

auto process(rxcpp::observable<int> source) {
    // do some operations
    return source
            .map(identity)
            .map(identity)
            .map(identity)
            .map(identity)
            .map(identity);
};

int main() {
    const rxcpp::rxsub::subject<int> s1;
    const rxcpp::rxsub::subject<int> s2;
    const rxcpp::rxsub::subject<int> s3;

    process(s1.get_observable()) |
        subscribe_with_latest_from(
                [&](int v1, int v2, int v3) {
                    // do something
                },
                process(s2.get_observable()),
                process(s3.get_observable())
        );
    
    s1.get_subscriber().on_next(1);
    s2.get_subscriber().on_next(2);
    s3.get_subscriber().on_next(3);
    s1.get_subscriber().on_next(11);

}

It compiles in nearly 3 seconds if I replace the usage of my operator with

process(s1.get_observable())
        .with_latest_from(
                [&](int v1, int v2, int v3) {
                    // do something
                    return 0;
                },
                process(s2.get_observable()),
                process(s3.get_observable())
        )
        .subscribe([](int _) {});

How can I reduce/eliminate the compile time overhead of my custom operator subscribe_with_latest_from? Are there any recommended ways how to implement custom operators?

maiermic avatar Oct 20 '17 16:10 maiermic

Perhaps if the auto & variadic nested lambdas were replace with manually built templates with operator()() members for the function calls, the compilation might be better.

I do not consider this a good approach, I think that this topic of compile-time performance is becoming a focus with talks at cppnow and cppcon. I hope to be able to leverage some of the things shared there in the future.

kirkshoop avatar Nov 16 '17 05:11 kirkshoop

I tried, but had some issues (see TODO comments):

#include <functional>
#include <tuple>
#include <rxcpp/rx.hpp>

namespace detail {

    template<typename Fn, typename... Args>
    class always_return_zero {
        Fn f;
    public:
        explicit always_return_zero(const Fn &f) : f(f) {}

        int operator()(Args... args) {
            f(args...);
            return 0; // dummy value
        }
    };

    template<typename Fn, typename... Observables>
    struct subscribe_with_latest_from {
        Fn f;
        std::tuple<Observables...> observables;

        explicit subscribe_with_latest_from(
                const Fn &f,
                Observables... observables
            ) : f(f)
              , observables(observables...)
        {}

        // TODO take source observable of differen value type than int
        auto operator()(rxcpp::observable<int> &&source) {
            using source_value_type = typename rxcpp::observable<int>::value_type;
            return source
                    .with_latest_from(
                            always_return_zero<Fn, source_value_type, typename Observables::value_type...>(f),
                            // TODO pass observables of different lengths (not only two)
                            std::get<0>(observables),
                            std::get<1>(observables)
                    )
                    .subscribe([](int _) {});
        }
    };

}

template<typename Fn, typename... Observables>
detail::subscribe_with_latest_from<Fn, Observables...>
subscribe_with_latest_from (const Fn &f, Observables... observables) {
    return detail::subscribe_with_latest_from<Fn, Observables...>(f, observables...);
}

int identity(int value) { return value; }

auto process(rxcpp::observable<int> source) {
    // do some operations
    return source
            .map(identity)
            .map(identity)
            .map(identity)
            .map(identity)
            .map(identity);
};

int main() {
    using Source = rxcpp::observable<int>;

    const rxcpp::rxsub::subject<int> s1;
    const rxcpp::rxsub::subject<int> s2;
    const rxcpp::rxsub::subject<int> s3;

    process(s1.get_observable()) |
        // subscribe_with_latest_from<Source, Source>(
        subscribe_with_latest_from(
                [&](int v1, int v2, int v3) {
                    // do something
                },
                process(s2.get_observable()),
                process(s3.get_observable())
        );
    
    s1.get_subscriber().on_next(1);
    s2.get_subscriber().on_next(2);
    s3.get_subscriber().on_next(3);
    s1.get_subscriber().on_next(11);

}

If I add a template parameter Source to detail::subscribe_with_latest_from::operator() like this

template <typename Source>
auto operator()(Source &&source) {
    using source_value_type = typename Source::value_type;
    return source
            .with_latest_from(
                    always_return_zero<Fn, source_value_type, typename Observables::value_type...>(f),
                    // TODO pass observables of different lengths (not only two)
                    std::get<0>(observables),
                    std::get<1>(observables)
            )
            .subscribe([](int _) {});
}

I get an compile error

error: type 'const rxcpp::observable<int, rxcpp::operators::detail::lift_operator<int, rxcpp::operators::detail::lift_operator<int,
      rxcpp::operators::detail::lift_operator<int, rxcpp::operators::detail::lift_operator<int, rxcpp::operators::detail::lift_operator<int, rxcpp::dynamic_observable<int>,
      rxcpp::operators::detail::map<int, int (*)(int)> >, rxcpp::operators::detail::map<int, int (*)(int)> >, rxcpp::operators::detail::map<int, int (*)(int)> >,
      rxcpp::operators::detail::map<int, int (*)(int)> >, rxcpp::operators::detail::map<int, int (*)(int)> > > &' cannot be used prior to '::' because it has no members
            using source_value_type = typename Source::value_type;

How can I get the value type of any observable?

maiermic avatar Nov 19 '17 11:11 maiermic