RxCpp icon indicating copy to clipboard operation
RxCpp copied to clipboard

Segfault when combine publish/repeat/zip

Open bornovalov opened this issue 6 years ago • 4 comments

auto s = rxcpp::observable<>::just(1).publish();

rxcpp::observable<>::from(1,2,3,4)
	.zip(s.repeat())
	.subscribe([](std::tuple<int, int> const&){});

s.connect();

or

auto s = rxcpp::observable<>::just(1).publish();

rxcpp::observable<>::from(1,2,3,4)
	.merge(s.repeat().take(4))
	.subscribe([](int){});

s.connect();

v4.1.0

Without publish/connect this code works as expected.

In real life i have one shared resource s (as shared_ptr), and zip it with several task contexts. Maybe i doing something wrong, and this is expected behavior?

My environment:

▶ uname -a
Darwin macbook-4106 19.3.0 Darwin Kernel Version 19.3.0: Thu Jan  9 20:58:23 PST 2020; root:xnu-6153.81.5~1/RELEASE_X86_64 x86_64
▶ /Applications/Xcode.app/Contents/Developer/Toolchains/XcodeDefault.xctoolchain/usr/bin/clang++ --version
Apple clang version 11.0.0 (clang-1100.0.33.12)

bornovalov avatar Mar 31 '20 15:03 bornovalov

Do you have full code example that can be compiled?

daixtrose avatar Apr 01 '20 08:04 daixtrose

#include <rxcpp/rx.hpp>

int main()
{
	auto s = rxcpp::observable<>::just(1).publish();

	rxcpp::observable<>::from(1,2,3,4)
		.zip(s.repeat())
		.subscribe([](std::tuple<int, int> const&){});

	s.connect();
	return 0;
}

and

#include <rxcpp/rx.hpp>

int main()
{
	auto s = rxcpp::observable<>::just(1).publish();

	rxcpp::observable<>::from(1,2,3,4)
		.merge(s.repeat().take(4))
		.subscribe([](int){});

	s.connect();
	return 0;
}

bornovalov avatar Apr 02 '20 01:04 bornovalov

Have you tried adding a coordination to your merge-operator? From the info you give I think that might be it.

(I didn’t test your code samples btw.. just chipping in)

It bit me once, see https://github.com/ReactiveX/RxCpp/issues/459.

thorstink avatar Apr 05 '20 18:04 thorstink

Hi,

This is causing a stack overflow in subscribe(). While this should not happen, there is a workaround.

make sure that the repeated subscription is subscribed on a coordination. s.subscribe_on(so).repeat()

Here are the examples with the changes needed to work.

int main()
{
    auto sc = rxcpp::rxsc::make_new_thread();
    auto so = rxcpp::synchronize_in_one_worker(sc);

	auto s = rxcpp::observable<>::just(1).publish();

	rxcpp::observable<>::from(so, 1,2,3,4)
		.zip(so, s.subscribe_on(so).repeat())
		.subscribe([](std::tuple<int, int> const&){});

	s.connect();
    std::this_thread::sleep_for(std::chrono::seconds(2));
	return 0;
}

and

int main()
{
    auto sc = rxcpp::rxsc::make_new_thread();
    auto so = rxcpp::synchronize_in_one_worker(sc);

	auto s = rxcpp::observable<>::just(1).publish();

	rxcpp::observable<>::from(so, 1,2,3,4)
		.merge(so, s.subscribe_on(so).repeat().take(4))
		.subscribe([](int){});

	s.connect();
    std::this_thread::sleep_for(std::chrono::seconds(2));
	return 0;
}

kirkshoop avatar Apr 05 '20 23:04 kirkshoop