RxPY
RxPY copied to clipboard
Proposal for a subscribe scheduler
When I started with RxPY, I often run into the problem of missing a message when subscribing to an observable. This is especially true when working with the shared
operator. This can be demonstrated by the following example, ...
import rx
from rx import operators as rx_op
from rx.concurrency import CurrentThreadScheduler
my_scheduler = CurrentThreadScheduler()
# create an shared observable emitting a single element "3"
shared_obs = rx.just(3, scheduler=my_scheduler) \
.pipe(rx_op.share())
# take the first element of the shared observable
left_obs = shared_obs.pipe(rx_op.first())
right_obs = shared_obs.pipe(rx_op.first())
# zip the two single element observables
out_obs = rx.zip(left_obs, right_obs)
out_obs.subscribe()
... which raises the following exception.
File "/home/mike/workspace/python/RxPY/rx/core/observer/autodetachobserver.py", line 45, in on_completed
self._on_completed()
File "/home/mike/workspace/python/RxPY/rx/core/operators/firstordefault.py", line 18, in on_completed
observer.on_error(SequenceContainsNoElementsError())
File "/home/mike/workspace/python/RxPY/rx/core/observer/autodetachobserver.py", line 35, in on_error
self._on_error(error)
File "/home/mike/workspace/python/RxPY/rx/core/observer/autodetachobserver.py", line 35, in on_error
self._on_error(error)
File "/home/mike/workspace/python/RxPY/rx/internal/basic.py", line 34, in default_error
raise err
rx.internal.exceptions.SequenceContainsNoElementsError: Sequence contains no elements
What happens?
- the
zip
operator subscribes firstleft_obs
observable -
left_obs
observable subscribesshared_obs
observable -
shared_obs
observable subscribesjust
observable, which schedules the element "3" on themy_scheduler
. Note thatmy_scheduler
could also be a scheduler running on another thread. We then can not predict when the element "3" is exactly sent. In case of an "inactive" CurrentThreadScheduler, however, the element "3" is sent beforesubscribe
method of thejust
observable returns. - Element "3" is sent to
left_obs
observer and then tozip
observer -
just
observable completes -
right_obs
observable subscribes toshared_obs
observable, but does not receive any elements, and therefore raises a SequenceContainsNoElementsError exception
~~The problem is that two different kind of actions (subscribing and emitting elements) are getting scheduled on the same scheduler. This leads to situations where it becomes almost impossible to determine, in which order actions are scheduled. And anyway, it should not be the responsibility of the RxPY user to keep track in which order subscriptions and emissions of elements happen.~~
Update: The problem is that the emission of elements are scheduled during the "subscribe" process. That way, the source observable might send (depending on the scheduler that is used) an element via on_next
before all downstream observables subscribed to the source observable. Hence, some downstream observables will miss one or more elements.
My proposal is to extend the subscribe_
method with a third argument that represents the subscribe scheduler. The subscribe_
method of an Observable
would then look as follows.
Update: Changed subscribe
to subscribe_
def subscribe_(self, observer: Observer, scheduler: Scheduler, subscribe_scheduler: Scheduler):
""""
:param subscribe_scheduler: used to schedule "upstream" observables to start emitting their elements
:param scheduler: used to schedule the elements emitted by a source observable
""""
A subscribe_
makes sure that all "upstream" observables (e.g. observables connected to the observable that is getting subscribed to) are subscribed before the first element is emitted.
Another effect, is that the global variable current_thread_scheduler
would become obsolete, because it is replaced by the "subscribe" scheduler.
Hi,
Maybe I'm missing something but why not using the publish
operator which returns a connectable Observable for this particular case?
shared_obs = rx.just(3, scheduler=my_scheduler).pipe(rx_op.publish())
left_obs = shared_obs.pipe(rx_op.first())
right_obs = shared_obs.pipe(rx_op.first())
out_obs = rx.zip(left_obs, right_obs)
out_obs.subscribe(print)
shared_obs.connect()
The behavior you expect requires either the usage of the publish/connect operators (as explained by @jcafhe), either the usage of a scheduler that will delay emission of shared_obs after all subscribes has occured.
As an example, the second solution can be used with asyncio: If you use an asyncio scheduler instead of the CurrentThreadScheduler, and start the asyncio event loop after out_obs.subscribe, then left_obs and right_obs will be subscribed before shared_obs emits its first item. This however requires that your application is based on asyncio.
By the way, since RxPY v3, the subscribe operator now have a scheduler parameter. However it does not and cannot behave the way you would like: The behavior of the share operator is to subscribe to the source observable as soon as a first observer subscribes to it. The scheduler operator can be used to define a default scheduler that will be used by the whole operator chain.
Maybe I'm missing something but why not using the
publish
operator which returns a connectable Observable for this particular case?
Yes, this would solve the problem for this particular case. My intention with the "subscribe" scheduler, however, is to solve a more general problem. The approach of using the publish
operator in place of the share
operator requires to call the connect
method each time the publish
operator is used. The following example uses three times the publish
operator. The corresponding three ConnectableObservables need to be connected in the right order after calling subscribe
.
# create an shared observable emitting a single element "3"
conn_obs = rx.just(3, scheduler=scheduler) \
.pipe(rx_op.publish())
# take the first element of the shared observable
conn_left = conn_obs.pipe(rx_op.first()).pipe(rx_op.publish())
conn_right = conn_obs.pipe(rx_op.first()).pipe(rx_op.publish())
left_left = conn_left.pipe(rx_op.first())
left_right = conn_left.pipe(rx_op.first())
right_left = conn_right.pipe(rx_op.first())
right_right = conn_right.pipe(rx_op.first())
out_obs = rx.zip(rx.zip(left_left, left_right), rx.zip(right_left, right_right))
out_obs.subscribe()
# connect all ConnectableObservables in the right order!
conn_obs.connect()
conn_left.connect()
conn_right.connect()
This doesn't scale. The more the publish
operator is used, the more ConnectableObservables need to be connected somewhere. Furthermore, returning the out_obs to a client, would require to return the ConnectableObservable as well, which is definitely not the intention of observables.
By the way, since RxPY v3, the subscribe operator now have a scheduler parameter. However it does not and cannot behave the way you would like: The behavior of the share operator is to subscribe to the source observable as soon as a first observer subscribes to it.
My proposal is to let the subscribe_
method have three arguments: observer, scheduler and subscribe_scheduler (see description above). The "subscribe" scheduler is used to first subscribe all observables and then start up all source observables. Each source observable may then use the "normal" scheduler (2nd argument of the subscribe_
method) to schedule its elements. This means even with the "subscribe" scheduler the subscribe_
method call should (as you say) happen immediately without delaying it by a scheduler.
Update: The difference between the "subscribe" scheduler and "normal" scheduler is that the "subscribe" scheduler is bound to the chain of subscribe_
calls going from root observable to its upstream observables. This means that calling the root subscribe_
method will select some "subscribe" scheduler, which is then used for each subsequent call of the "subscribe" method (for the upstream observables). The "normal" scheduler, on the other hand, may change somewhere in the chain of subscribe_
method calls.
We adapted the subscribe
methode on our local copy of the RxPY a while ago, and never had an issue with missing an element again. The adapted project can be found here:
https://github.com/MichaelSchneeberger/RxPY/tree/release/v1.6.x
Here is a short summary of how we implemented this:
-
Rename subscribe to
unsafe_subscribe
(in RxPY v3 it would besubscribe_
) -
Add method
subscribe
to Observable that schedules the subscribe action on the "subscribe" scheduler, as shown here:
def subscribe(self, on_next=None, on_error=None, on_completed=None, observer=None, scheduler=None):
single_assignment_disposable = SingleAssignmentDisposable()
# schedule subscribing on "subscribe" scheduler
def subscribe_action(_, __):
disposable = self.unsafe_subscribe(on_next=on_next, on_error=on_error, on_completed=on_completed,
observer=observer, scheduler=scheduler)
single_assignment_disposable.disposable = disposable
# select the "subscribe" scheduler
scheduler = scheduler or CurrentThreadScheduler()
scheduler.schedule(subscribe_action)
return single_assignment_disposable
- Adapt the source observables to delay the emission of elements by the "subscribe" scheduler to make sure that all observables are subscribed. Here is an implementation of the
just
operator (in v1.6 style):
@extensionclassmethod(Observable)
def just(cls, value, scheduler=None):
outer_scheduler = scheduler
def subscribe(observer, scheduler, subscribe_scheduler):
scheduler_ = scheduler or outer_scheduler
# delay the emission of elements by the subscribe scheduler to make
# sure that all observables are subscribed.
def schedule_emit_elem_action(_, __):
def emit_elem_action(_, __):
observer.on_next(value)
observer.on_completed()
if scheduler:
return scheduler_.schedule(emit_elem_action)
else:
inner_action(None, None)
return subscribe_scheduler.schedule(schedule_emit_elem_action)
return AnonymousObservable(subscribe)
Yes, it's a problem that subscribe
means both "attach" and "run" at the same time. Turning cold observables hot and then trying to make them feel cold again will give edge cases when combined with CurrentThreadScheduler
. Thus you could instead use NewThreadScheduler
so zip
will finish the subscription of all sources before the item is sent down the pipeline. It could be that zip
should have scheduled its subscribe action to force CurrentThreadScheduler
to actually queue the schedule of item in just
. We can check that using subscribe_on
, so the subscribe get's scheduled e.g.
obs_out = rx.zip(left_obs, right_obs).pipe(ops.subscribe_on(my_scheduler))
I would not want to add another arg to subscribe as it would make things more confusing for users. It would be better to have the .attach()
/.run()
combo, but I don't think we should diverge more from rx at this point, especially when using obserables to implement iterables.
But I also don't see the observable problem we are trying to solve? I.e why not just implement this using iterables instead?
shared = [3]
first = shared[:1]
second = shared[:1]
iter_out = zip(first, second)
Thus you could instead use
NewThreadScheduler
sozip
will finish the subscription of all sources before the item is sent down the pipeline.
You can reproduce the same effect with schedulers running on another thread like NewThreadScheduler
or EventLoopScheduler
. It is just a bit harder to show. The following example ...
import rx
from rx import operators as rx_op
from rx.concurrency import EventLoopScheduler
scheduler1 = EventLoopScheduler()
scheduler2 = EventLoopScheduler()
source = rx.from_(range(10), scheduler=scheduler1).pipe(rx_op.share())
def gen_subscribers():
for i in range(100):
yield source.pipe(rx_op.observe_on(scheduler2), rx_op.to_iterable(), rx_op.map(lambda v: list(v)))
obs_to_be_zipped = list(gen_subscribers())
rx.merge(*obs_to_be_zipped).pipe(rx_op.do_action(print)).run()
... will start printing ...
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
... then it will eventually print ...
[1, 2, 3, 4, 5, 6, 7, 8, 9]
... because at some point, the downstream observables will miss an element.
I would not want to add another arg to subscribe as it would make things more confusing for users.
No, you dont have to add another argument to the subscribe
method. I know that was my initial suggestion, but I should have called it subscribe_
or unsafe_subscribe
(I updated it accordingly, see above).
The implementation of the subscribe
method in the current RxPY might then look like that ...
def subscribe(self, # pylint: disable=too-many-arguments,arguments-differ
observer: Union[typing.Observer, typing.OnNext] = None,
on_error: typing.OnError = None,
on_completed: typing.OnCompleted = None,
on_next: typing.OnNext = None,
*,
scheduler: typing.Scheduler = None,
) -> typing.Disposable:
if observer:
if isinstance(observer, typing.Observer) or hasattr(observer, "on_next"):
on_next = cast(typing.Observer, observer).on_next
on_error = cast(typing.Observer, observer).on_error
on_completed = cast(typing.Observer, observer).on_completed
else:
on_next = observer
subscribe_scheduler = CurrentThreadScheduler()
return self.subscribe_(on_next, on_error, on_completed, scheduler, subscribe_scheduler)
But I also don't see the observable problem we are trying to solve? I.e why not just implement this using iterables instead?
We want to use the elements emitted by an observable for different kind of computations executed on multiple threads as shown in the example of this post.
In RxJS, missing an element is somehow less of a problem, because a JavaScript application runs on a single thread.
In RxJava, they have an onSubscribe
method defined in the Observer class that is called prior to scheduling elements on a scheduler. See ObservableObserveOn (line 92) ...
downstream.onSubscribe(this);
schedule();
That way all subscribe
methods are called in one go, so that if the last subscribe
method is called all subscribe
(and onSusbcribe
) methods can be found in the traceback (e.g. in the stack). The subscribe
and onSusbcribe
calls of my first example of this issue is depicted here ...
call subscribe out_obs
call subscribe left_obs
call subscribe shared_obs
call subscribe just
call onSubscribe shared_obs
call onSubscribe left_obs
call onSubscribe out_obs
call subscribe right_obs
call subscribe shared_obs
return subscribe shared_obs
return subscribe right_obs
return onSubscribe out_obs
return onSubscribe left_obs
return onSubscribe shared_obs
start scheduling element in just
etc.
As you can see the scheduling happens only after all subscribe
methods have been called.
A CurrentThreadScheduler (or trampoline) is useful to reduce the function call depth. A "subscribe" scheduler would do something similar to the onSubscribe
method they use in RxJava, and would have the advantage of reducing the function call depth, e.g. reducing the risk of a stackoverflow.
We want to use the elements emitted by an observable for different kind of computations executed on multiple threads as shown in the example of this post.
In such situations, I always use asyncio in combination with ThreadPool schedulers (even when no asynchronous IO is used). On the following example, asyncio is used as the main loop, where files are read. Then computation is done with ThreadPoolScheduler, and finally the result of the computation is processed back on the asyncio event loop.
https://github.com/MainRo/deep-speaker/blob/master/deep_speaker/feature/process_path.py
some rewrite would improve readability, but on this example no items are missed. The event loop is started after all subscriptions are done, so the first item is emitted once the whole chain is built.
In such situations, I always use asyncio in combination with ThreadPool schedulers (even when no asynchronous IO is used).
Yes, this would solve the problem. I actually like this pattern and have used it as well. But not every applications fits this pattern. For instance, if a specific scheduler needs to be running on the main thread, or if blocking work should be scheduled on a dedicated scheduler. In that case, you need something like an EventLoopScheduler
, and therefore will face the problem of missing an element in an observable.