RxCpp
RxCpp copied to clipboard
how to implement your own operators for observables efficiently in terms of compile time
I wrote my own operator subscribe_with_latest_from as a function that composes existing operators. However, it takes much long to compile. This example takes almost 6 seconds to compile on my system (with Clang):
#include <rxcpp/rx.hpp>
template<typename Fn, typename... Observables>
auto subscribe_with_latest_from(Fn f, Observables... observables) {
return [=](auto &&source) {
return source
.with_latest_from(
[=](auto &&...args) {
f(args...);
return 0; // dummy value
},
observables...
)
.subscribe([](auto _) {});
};
}
int identity(int value) { return value; }
auto process(rxcpp::observable<int> source) {
// do some operations
return source
.map(identity)
.map(identity)
.map(identity)
.map(identity)
.map(identity);
};
int main() {
const rxcpp::rxsub::subject<int> s1;
const rxcpp::rxsub::subject<int> s2;
const rxcpp::rxsub::subject<int> s3;
process(s1.get_observable()) |
subscribe_with_latest_from(
[&](int v1, int v2, int v3) {
// do something
},
process(s2.get_observable()),
process(s3.get_observable())
);
s1.get_subscriber().on_next(1);
s2.get_subscriber().on_next(2);
s3.get_subscriber().on_next(3);
s1.get_subscriber().on_next(11);
}
It compiles in nearly 3 seconds if I replace the usage of my operator with
process(s1.get_observable())
.with_latest_from(
[&](int v1, int v2, int v3) {
// do something
return 0;
},
process(s2.get_observable()),
process(s3.get_observable())
)
.subscribe([](int _) {});
How can I reduce/eliminate the compile time overhead of my custom operator subscribe_with_latest_from? Are there any recommended ways how to implement custom operators?
Perhaps if the auto & variadic nested lambdas were replace with manually built templates with operator()() members for the function calls, the compilation might be better.
I do not consider this a good approach, I think that this topic of compile-time performance is becoming a focus with talks at cppnow and cppcon. I hope to be able to leverage some of the things shared there in the future.
I tried, but had some issues (see TODO comments):
#include <functional>
#include <tuple>
#include <rxcpp/rx.hpp>
namespace detail {
template<typename Fn, typename... Args>
class always_return_zero {
Fn f;
public:
explicit always_return_zero(const Fn &f) : f(f) {}
int operator()(Args... args) {
f(args...);
return 0; // dummy value
}
};
template<typename Fn, typename... Observables>
struct subscribe_with_latest_from {
Fn f;
std::tuple<Observables...> observables;
explicit subscribe_with_latest_from(
const Fn &f,
Observables... observables
) : f(f)
, observables(observables...)
{}
// TODO take source observable of differen value type than int
auto operator()(rxcpp::observable<int> &&source) {
using source_value_type = typename rxcpp::observable<int>::value_type;
return source
.with_latest_from(
always_return_zero<Fn, source_value_type, typename Observables::value_type...>(f),
// TODO pass observables of different lengths (not only two)
std::get<0>(observables),
std::get<1>(observables)
)
.subscribe([](int _) {});
}
};
}
template<typename Fn, typename... Observables>
detail::subscribe_with_latest_from<Fn, Observables...>
subscribe_with_latest_from (const Fn &f, Observables... observables) {
return detail::subscribe_with_latest_from<Fn, Observables...>(f, observables...);
}
int identity(int value) { return value; }
auto process(rxcpp::observable<int> source) {
// do some operations
return source
.map(identity)
.map(identity)
.map(identity)
.map(identity)
.map(identity);
};
int main() {
using Source = rxcpp::observable<int>;
const rxcpp::rxsub::subject<int> s1;
const rxcpp::rxsub::subject<int> s2;
const rxcpp::rxsub::subject<int> s3;
process(s1.get_observable()) |
// subscribe_with_latest_from<Source, Source>(
subscribe_with_latest_from(
[&](int v1, int v2, int v3) {
// do something
},
process(s2.get_observable()),
process(s3.get_observable())
);
s1.get_subscriber().on_next(1);
s2.get_subscriber().on_next(2);
s3.get_subscriber().on_next(3);
s1.get_subscriber().on_next(11);
}
If I add a template parameter Source to detail::subscribe_with_latest_from::operator() like this
template <typename Source>
auto operator()(Source &&source) {
using source_value_type = typename Source::value_type;
return source
.with_latest_from(
always_return_zero<Fn, source_value_type, typename Observables::value_type...>(f),
// TODO pass observables of different lengths (not only two)
std::get<0>(observables),
std::get<1>(observables)
)
.subscribe([](int _) {});
}
I get an compile error
error: type 'const rxcpp::observable<int, rxcpp::operators::detail::lift_operator<int, rxcpp::operators::detail::lift_operator<int,
rxcpp::operators::detail::lift_operator<int, rxcpp::operators::detail::lift_operator<int, rxcpp::operators::detail::lift_operator<int, rxcpp::dynamic_observable<int>,
rxcpp::operators::detail::map<int, int (*)(int)> >, rxcpp::operators::detail::map<int, int (*)(int)> >, rxcpp::operators::detail::map<int, int (*)(int)> >,
rxcpp::operators::detail::map<int, int (*)(int)> >, rxcpp::operators::detail::map<int, int (*)(int)> > > &' cannot be used prior to '::' because it has no members
using source_value_type = typename Source::value_type;
How can I get the value type of any observable?