RxCpp
RxCpp copied to clipboard
Implementing logical OR between multiple operations
Hi all! We are currently developing an stream processing program using RxCpp, but we are having issues on how to correctly approach the following situation.
Let's say we have multiple operations, where each operation can succeed or fail. We want to lift a pipeline of observables that sequentially try each operation while they fail, returning as soon as one operation succeed.
The main questions is:
- What would be the correct way of implementing a bifurcation, depending on current operation result?
One possible implementation we come across is the following:
We define the OperationResult
class, just a wrapper over the event being processed and a boolean indicating if the operation succeeded or failed:
template <class T> class OperationResult{
private:
T m_event;
bool m_success;
public:
OperationResult(bool succeded, T event) : m_success{succeded}, m_event{event}{}
bool success() const{
return this->m_success;
}
T event() const{
return this->m_event;
}
};
And the Operation
class, that set up the rxcpp pipeline, where each operation exposes 2 observables, one that emits successfully processed items and one that emits the failed ones.
template <class T> class Operation
{
using function_t = std::function<OperationResult<T>(OperationResult<T>)>;
using observable_t = rxcpp::observable<OperationResult<T>>;
private:
std::string m_name;
function_t m_fn;
bool m_inputConnected;
public:
Operation(const std::string & name, function_t fn) : m_name{name}, m_fn{fn}, m_inputConnected{false}{}
// This function sets up the rxcpp pipeline, implementing the bifurcation depending on OperationResult
// in a way that no new observables/subscriptions are performed
std::pair<observable_t, observable_t> connect(const observable_t & input){
if (this->m_inputConnected){
throw std::runtime_error("Error, operation " + this->m_name + " is already connected");
}else{
auto result = input.map(this->m_fn).publish();
auto failure = result.filter([](OperationResult<T> result) { return !result.success(); });
auto success = result.filter([](OperationResult<T> result) { return result.success(); });
result.connect();
this->m_inputConnected = true;
return std::pair(failure, success);
}
}
};
This allows for setting up a logical OR
between multiple Operations like this:
// Operations
Operation<int> op0("is pair",
[](OperationResult<int> res)
{
cout << "pair got " << res.event() << endl;
return OperationResult<int>(res.event() % 2 == 0, res.event());
});
Operation<int> op1("is greater than 3",
[](OperationResult<int> res)
{
cout << "greater than 3 got " << res.event() << endl;
return OperationResult<int>(res.event() > 3, res.event());
});
Operation<int> op2("equals 1",
[](OperationResult<int> res)
{
cout << "equals 1 got " << res.event() << endl;
return OperationResult<int>(res.event() == 1, res.event());
});
// Input to the observable chain
auto input_sbj = subjects::subject<OperationResult<int>>();
auto input = input_sbj.get_observable();
// Subscriber to be called if one of the 3 operation succeeded
auto success_subscriber = make_subscriber<OperationResult<int>>([](OperationResult<int> res)
{ cout << "[Success] got " << res.event() << endl; });
// Subscriber to be called if all operations failed
auto error_subscriber = make_subscriber<OperationResult<int>>([](OperationResult<int> res)
{ cout << "[Error] got " << res.event() << endl; });
// Logical OR
auto outs = op0.connect(input);
outs.second.subscribe(success_subscriber);
outs = op1.connect(outs.first);
outs.second.subscribe(success_subscriber);
outs = op2.connect(outs.first);
outs.second.subscribe(success_subscriber);
outs.first.subscribe(error_subscriber);
// Emit input
auto s = input_sbj.get_subscriber();
s.on_next(OperationResult(false, 0)); // Success
s.on_next(OperationResult(false, 1)); // Success
s.on_next(OperationResult(false, 3)); // Error
s.on_next(OperationResult(false, 5)); // Success
s.on_completed();
Output:
pair got 0
[Success] got 0
pair got 1
greater than 3 got 1
equals 1 got 1
[Success] got 1
pair got 3
greater than 3 got 3
equals 1 got 3
[Error] got 3
pair got 5
greater than 3 got 5
[Success] got 5
Any help or insight you can give me is much appreciated!
What about most straightforward way? when you pass all comparisons to one lambda or something like this? like
observable.filter([](int v){return v % 2 == 0 || v > 3 || v ==1;})
another way I can imagine is use amb
operator, but you will need to create it for each obtained value and immediately execute. Like...
observable.flat_map([](int v)
{
return rxcpp::observable<>::just(v)
.filter([](int v){return v % 2 ==0;})
.amb(rxcpp::observable<>::just(v).filter([](int v){return v > 3;}),
rxcpp::observable<>::just(v).filter([](int v){return v == 1;}))
});
but not sure if it is really useful (due to every time for each value you will create new observable, configure and immediately run)
Let's say we have multiple operations, where each operation can succeed or fail. We want to lift a pipeline of observables that sequentially try each operation while they fail, returning as soon as one operation succeed.
That sounds like
Amb(op0.retry(), op1.retry(), ..)