Segfault when combine publish/repeat/zip
auto s = rxcpp::observable<>::just(1).publish();
rxcpp::observable<>::from(1,2,3,4)
.zip(s.repeat())
.subscribe([](std::tuple<int, int> const&){});
s.connect();
or
auto s = rxcpp::observable<>::just(1).publish();
rxcpp::observable<>::from(1,2,3,4)
.merge(s.repeat().take(4))
.subscribe([](int){});
s.connect();
v4.1.0
Without publish/connect this code works as expected.
In real life i have one shared resource s (as shared_ptr), and zip it with several task contexts. Maybe i doing something wrong, and this is expected behavior?
My environment:
▶ uname -a
Darwin macbook-4106 19.3.0 Darwin Kernel Version 19.3.0: Thu Jan 9 20:58:23 PST 2020; root:xnu-6153.81.5~1/RELEASE_X86_64 x86_64
▶ /Applications/Xcode.app/Contents/Developer/Toolchains/XcodeDefault.xctoolchain/usr/bin/clang++ --version
Apple clang version 11.0.0 (clang-1100.0.33.12)
Do you have full code example that can be compiled?
#include <rxcpp/rx.hpp>
int main()
{
auto s = rxcpp::observable<>::just(1).publish();
rxcpp::observable<>::from(1,2,3,4)
.zip(s.repeat())
.subscribe([](std::tuple<int, int> const&){});
s.connect();
return 0;
}
and
#include <rxcpp/rx.hpp>
int main()
{
auto s = rxcpp::observable<>::just(1).publish();
rxcpp::observable<>::from(1,2,3,4)
.merge(s.repeat().take(4))
.subscribe([](int){});
s.connect();
return 0;
}
Have you tried adding a coordination to your merge-operator? From the info you give I think that might be it.
(I didn’t test your code samples btw.. just chipping in)
It bit me once, see https://github.com/ReactiveX/RxCpp/issues/459.
Hi,
This is causing a stack overflow in subscribe(). While this should not happen, there is a workaround.
make sure that the repeated subscription is subscribed on a coordination.
s.subscribe_on(so).repeat()
Here are the examples with the changes needed to work.
int main()
{
auto sc = rxcpp::rxsc::make_new_thread();
auto so = rxcpp::synchronize_in_one_worker(sc);
auto s = rxcpp::observable<>::just(1).publish();
rxcpp::observable<>::from(so, 1,2,3,4)
.zip(so, s.subscribe_on(so).repeat())
.subscribe([](std::tuple<int, int> const&){});
s.connect();
std::this_thread::sleep_for(std::chrono::seconds(2));
return 0;
}
and
int main()
{
auto sc = rxcpp::rxsc::make_new_thread();
auto so = rxcpp::synchronize_in_one_worker(sc);
auto s = rxcpp::observable<>::just(1).publish();
rxcpp::observable<>::from(so, 1,2,3,4)
.merge(so, s.subscribe_on(so).repeat().take(4))
.subscribe([](int){});
s.connect();
std::this_thread::sleep_for(std::chrono::seconds(2));
return 0;
}