RxCpp
RxCpp copied to clipboard
identity_current_thread is bad "default" scheduler for operators
Hi everyone and hi @kirkshoop!
Most of the operators in rxcpp which requires schedulers has such an fallback scheduler:
If scheduler is omitted, identity_current_thread is used.
For example, merge operator also has it as default. BUT it doesn't provide ANY synchronization/serialization in case of multithreaded application.
Example:
rxcpp::observable<>::just(1, rxcpp::observe_on_new_thread())
.repeat()
.merge(rxcpp::observable<>::just(2, rxcpp::observe_on_new_thread())
.repeat())
.take(10).as_blocking().subscribe([](int v){
std::cout << "==================\n" << std::this_thread::get_id() << " START " << std::endl;
std::this_thread::sleep_for(std::chrono::seconds{1});
std::cout << v << std::endl;
std::cout << std::this_thread::get_id() << " END " << std::endl << "==================\n\n";
});
Possible output:
==================
0x70000ee59000 START
==================
0x70000eedc000 START
1
0x70000ee59000 END
==================
2
0x70000eedc000 END
==================
==================
0x70000ef5f000 START
==================
0x70000efe2000 START
1
0x70000efe2000 END
==================
2
0x70000ef5f000 END
==================
==================
0x70000ee59000 START
==================
0x70000eedc000 START
1
0x70000ee59000 END
==================
2
0x70000eedc000 END
==================
==================
0x70000ef5f000 START
==================
0x70000efe2000 START
1
0x70000ef5f000 END
==================
2
0x70000efe2000 END
==================
==================
0x70000ee59000 START
==================
0x70000eedc000 START
1
0x70000ee59000 END
==================
2
0x70000eedc000 END
As you can see, it is mixed, but ReactiveX requires that any observable should be serialized.
In case of using any valid scheduler in merge like observe_on_new_thread
output is valid:
==================
0x700001ce1000 START
1
0x700001ce1000 END
==================
==================
0x700001ce1000 START
2
0x700001ce1000 END
==================
==================
0x700001ce1000 START
1
0x700001ce1000 END
==================
==================
0x700001ce1000 START
2
0x700001ce1000 END
==================
==================
0x700001ce1000 START
1
0x700001ce1000 END
==================
==================
0x700001ce1000 START
2
0x700001ce1000 END
==================
==================
0x700001ce1000 START
1
0x700001ce1000 END
==================
==================
0x700001ce1000 START
2
0x700001ce1000 END
==================
==================
0x700001ce1000 START
1
0x700001ce1000 END
==================
==================
0x700001ce1000 START
2
0x700001ce1000 END
==================
Expected: at least default behavior of any operator should be thread-safe...
In my understanding, best default scheduler for such an "multhithreaded" opertators can be "serialize_immediate" (not exist, but actually just emit emissions under mutex to provide exclusive access to subscriber and guarantee that only one observable pushes item at the same time). Tested locally: also provides valid output
BTW: it is how i've implemented merge in ReactivePlusPlus: each callback to subscriber of merge just called under mutex. As a result there is no way to obtain "mixed" log. @kirkshoop, what do you think ?