RxCpp
RxCpp copied to clipboard
observe_on usage /w order persistence across different subjects
First off, thank you for your great work bringing Rx to C++!
I assume my problem is due to my lack of understanding of what to use and how. The problem I am trying to solve is related to subscriptions on different subjects utilizing the same worker to observe their subscriptions seemingly not respecting order.
I've found that subscribing to a single subject and utilizing observe_on to move the results to a different thread seems to preserve order... but when two different subjects are subscribed and funnel to the same event worker/coordinator via observe_on, their order relative to each other seems to be lost.
General setup:
- Create multiple subjects
- Subscribe to those subjects, and print when they're fired
- Utilize observe_on funneling to a single worker to ensure the print subscription callbacks fire in the same order they occurred on the background thread, despite being moved to a different thread.
- Fire the subjects on a background thread
More concrete example (using a bit of QT, but the psuedocode gist should hopefully be clear):
class TestThread : public QThread
{
public:
TestThread();
// Starts the thread
void run();
rxcpp::subjects::subject<int> spam_;
rxcpp::subjects::subject<bool> done_;
};
void TestThread::run() {
for (int i = 0; i < 1000; ++i) {
spam_.get_subscriber().on_next(i);
}
LOG("Firing finished");
done_.get_subscriber().on_next(true);
}
/// Main thread
// Create background thread
auto background_thread = new TestThread();
// Set up coordinator to run on one worker
rxcpp::schedulers::other_thread;
rxcpp::composite_subscription sub;
auto worker = other_thread.create_worker(sub);
auto coord = rxcpp::serialize_same_worker(worker );
thread_->spam_.get_observable()
.observe_on(coord)
.subscribe([=](int i)
{
LOG("Spam %i", i);
});
thread_->done_.get_observable()
.observe_on(coord)
.subscribe([=](bool)
{
LOG("DONE");
});
// Fire subjects on background thread
background_thread.start();
The results I'm trying to achieve would look something like this:
Spam 0
Spam 1
...
Firing finished // At some point the background thread is done with the loop, cool
...
Spam 45
...
Spam 999
DONE // After all spam events have fired, we see the DONE subject's event
// End of file
What I see instead is DONE being interleaved and fired early:
Spam 0
Spam 1
...
Firing finished // At some point the background thread is done with the loop, cool
...
Spam 45
DONE // Oh no
Spam 46
...
Spam 999
// End of file
The above results would make sense to me if the spam_
and done_
subjects were fired from different threads, as locking the queue of the worker would not be ensured to be in the proper order; whoever received the mutex first would queue next, which might be out of order.
However, in the above example, since both subjects are fired from the same thread, I would've imagined all the spam events would have queued onto the worker before the done event was queued, as there's no thread racing between the two. As the worker fulfilled the events on the scheduler, I would've then expected the done_
event to always be fulfilled last.
Here is the applicable quote from the developer manual that makes me feel like the worker should be respecting the order:
worker owns a queue of pending schedulables for the timeline and has a lifetime. When the time for an schedulable is reached the schedulable is run. The queue maintains insertion order so that when N schedulables have the same target time they are run in the order that they were inserted into the queue.
Is there something I'm misunderstanding about these overall mechanics? Is there a scheduler setup that exists that I could use to achieve consistent queuing across several subjects subscriptions?
The code I've posted is just one of several other routes I've tried. Haven't gotten any combination to work so far.
Thanks in advance for any help or insight! 8)
So, digging a bit more, it seems the behavior might be due to each observe_on having its own set of fill queues, rather than it queuing up in the worker immediately? The race, then, is the two observables fighting over who will schedule its next queue item to the single worker, it seems?
Is there a way to have observe_on calls immediately schedule its incoming events onto the coordinator to be dispatched, and bypass any internal queues of its own? I guess this would break the promise of each event in a given observable being processed completely before its next, as some user down the line might observe_on the same coord.
Alternatively it possible to set up an observe_on type system that funnels several observables to share same internal fill queue when transferring to the target coordinator? Eg, only one event from any observable could be currently executing on the coord, and their incoming order is preserved when choosing who to fire next. I guess this is also hard, as the observe_on objects are typed, so that'd only be easily doable if the source observables were the same type.
Anyway, this seems to be turning into a more Rx general question, rather than rxcpp specific. Feel free to close if you agree. Otherwise I'd love to hear any feedback and/or ideas for achieving this synchronization.
Thanks! 8)
Hi, is there any following up on this issue?
I know I'm late, but in case anyone is still interested, here is what I would do:
If you just want to know when the spam
observable is done, you can use spam_.get_subscriber().on_completed()
instead of another subject. You can pass a second callback to subscribe
that is called if the observable is completed.
If you have to use two observables, I think you have to use concat
. But as you noted, both observables would have to emit elements of the same type. std::variant
could help with that.
I hope this helps.