RxPY icon indicating copy to clipboard operation
RxPY copied to clipboard

Proposal for a subscribe scheduler

Open MichaelSchneeberger opened this issue 5 years ago • 10 comments

When I started with RxPY, I often run into the problem of missing a message when subscribing to an observable. This is especially true when working with the shared operator. This can be demonstrated by the following example, ...

import rx

from rx import operators as rx_op
from rx.concurrency import CurrentThreadScheduler

my_scheduler = CurrentThreadScheduler()

# create an shared observable emitting a single element "3"
shared_obs = rx.just(3, scheduler=my_scheduler) \
    .pipe(rx_op.share())

# take the first element of the shared observable
left_obs = shared_obs.pipe(rx_op.first())
right_obs = shared_obs.pipe(rx_op.first())

# zip the two single element observables
out_obs = rx.zip(left_obs, right_obs)

out_obs.subscribe()

... which raises the following exception.

  File "/home/mike/workspace/python/RxPY/rx/core/observer/autodetachobserver.py", line 45, in on_completed
    self._on_completed()
  File "/home/mike/workspace/python/RxPY/rx/core/operators/firstordefault.py", line 18, in on_completed
    observer.on_error(SequenceContainsNoElementsError())
  File "/home/mike/workspace/python/RxPY/rx/core/observer/autodetachobserver.py", line 35, in on_error
    self._on_error(error)
  File "/home/mike/workspace/python/RxPY/rx/core/observer/autodetachobserver.py", line 35, in on_error
    self._on_error(error)
  File "/home/mike/workspace/python/RxPY/rx/internal/basic.py", line 34, in default_error
    raise err
rx.internal.exceptions.SequenceContainsNoElementsError: Sequence contains no elements

What happens?

  1. the zip operator subscribes first left_obs observable
  2. left_obs observable subscribes shared_obs observable
  3. shared_obs observable subscribes just observable, which schedules the element "3" on the my_scheduler. Note that my_scheduler could also be a scheduler running on another thread. We then can not predict when the element "3" is exactly sent. In case of an "inactive" CurrentThreadScheduler, however, the element "3" is sent before subscribe method of the just observable returns.
  4. Element "3" is sent to left_obs observer and then to zip observer
  5. just observable completes
  6. right_obs observable subscribes to shared_obs observable, but does not receive any elements, and therefore raises a SequenceContainsNoElementsError exception

~~The problem is that two different kind of actions (subscribing and emitting elements) are getting scheduled on the same scheduler. This leads to situations where it becomes almost impossible to determine, in which order actions are scheduled. And anyway, it should not be the responsibility of the RxPY user to keep track in which order subscriptions and emissions of elements happen.~~

Update: The problem is that the emission of elements are scheduled during the "subscribe" process. That way, the source observable might send (depending on the scheduler that is used) an element via on_next before all downstream observables subscribed to the source observable. Hence, some downstream observables will miss one or more elements.

My proposal is to extend the subscribe_ method with a third argument that represents the subscribe scheduler. The subscribe_ method of an Observable would then look as follows.

Update: Changed subscribe to subscribe_

def subscribe_(self, observer: Observer, scheduler: Scheduler, subscribe_scheduler: Scheduler):
    """"
    :param subscribe_scheduler: used to schedule "upstream" observables to start emitting their elements
    :param scheduler: used to schedule the elements emitted by a source observable
    """"

A subscribe_ makes sure that all "upstream" observables (e.g. observables connected to the observable that is getting subscribed to) are subscribed before the first element is emitted.

Another effect, is that the global variable current_thread_scheduler would become obsolete, because it is replaced by the "subscribe" scheduler.

MichaelSchneeberger avatar Feb 19 '19 21:02 MichaelSchneeberger

Hi, Maybe I'm missing something but why not using the publish operator which returns a connectable Observable for this particular case?

shared_obs = rx.just(3, scheduler=my_scheduler).pipe(rx_op.publish())

left_obs = shared_obs.pipe(rx_op.first())
right_obs = shared_obs.pipe(rx_op.first())
out_obs = rx.zip(left_obs, right_obs)

out_obs.subscribe(print)
shared_obs.connect()

jcafhe avatar Feb 20 '19 20:02 jcafhe

The behavior you expect requires either the usage of the publish/connect operators (as explained by @jcafhe), either the usage of a scheduler that will delay emission of shared_obs after all subscribes has occured.

As an example, the second solution can be used with asyncio: If you use an asyncio scheduler instead of the CurrentThreadScheduler, and start the asyncio event loop after out_obs.subscribe, then left_obs and right_obs will be subscribed before shared_obs emits its first item. This however requires that your application is based on asyncio.

By the way, since RxPY v3, the subscribe operator now have a scheduler parameter. However it does not and cannot behave the way you would like: The behavior of the share operator is to subscribe to the source observable as soon as a first observer subscribes to it. The scheduler operator can be used to define a default scheduler that will be used by the whole operator chain.

MainRo avatar Feb 20 '19 21:02 MainRo

Maybe I'm missing something but why not using the publish operator which returns a connectable Observable for this particular case?

Yes, this would solve the problem for this particular case. My intention with the "subscribe" scheduler, however, is to solve a more general problem. The approach of using the publish operator in place of the share operator requires to call the connect method each time the publish operator is used. The following example uses three times the publish operator. The corresponding three ConnectableObservables need to be connected in the right order after calling subscribe.

# create an shared observable emitting a single element "3"
conn_obs = rx.just(3, scheduler=scheduler) \
    .pipe(rx_op.publish())

# take the first element of the shared observable
conn_left = conn_obs.pipe(rx_op.first()).pipe(rx_op.publish())
conn_right = conn_obs.pipe(rx_op.first()).pipe(rx_op.publish())

left_left = conn_left.pipe(rx_op.first())
left_right = conn_left.pipe(rx_op.first())
right_left = conn_right.pipe(rx_op.first())
right_right = conn_right.pipe(rx_op.first())

out_obs = rx.zip(rx.zip(left_left, left_right), rx.zip(right_left, right_right))

out_obs.subscribe()

# connect all ConnectableObservables in the right order!
conn_obs.connect()
conn_left.connect()
conn_right.connect()

This doesn't scale. The more the publish operator is used, the more ConnectableObservables need to be connected somewhere. Furthermore, returning the out_obs to a client, would require to return the ConnectableObservable as well, which is definitely not the intention of observables.

MichaelSchneeberger avatar Feb 22 '19 18:02 MichaelSchneeberger

By the way, since RxPY v3, the subscribe operator now have a scheduler parameter. However it does not and cannot behave the way you would like: The behavior of the share operator is to subscribe to the source observable as soon as a first observer subscribes to it.

My proposal is to let the subscribe_ method have three arguments: observer, scheduler and subscribe_scheduler (see description above). The "subscribe" scheduler is used to first subscribe all observables and then start up all source observables. Each source observable may then use the "normal" scheduler (2nd argument of the subscribe_ method) to schedule its elements. This means even with the "subscribe" scheduler the subscribe_ method call should (as you say) happen immediately without delaying it by a scheduler.

Update: The difference between the "subscribe" scheduler and "normal" scheduler is that the "subscribe" scheduler is bound to the chain of subscribe_ calls going from root observable to its upstream observables. This means that calling the root subscribe_ method will select some "subscribe" scheduler, which is then used for each subsequent call of the "subscribe" method (for the upstream observables). The "normal" scheduler, on the other hand, may change somewhere in the chain of subscribe_ method calls.

MichaelSchneeberger avatar Feb 22 '19 18:02 MichaelSchneeberger

We adapted the subscribe methode on our local copy of the RxPY a while ago, and never had an issue with missing an element again. The adapted project can be found here:

https://github.com/MichaelSchneeberger/RxPY/tree/release/v1.6.x

Here is a short summary of how we implemented this:

  1. Rename subscribe to unsafe_subscribe (in RxPY v3 it would be subscribe_)

  2. Add method subscribe to Observable that schedules the subscribe action on the "subscribe" scheduler, as shown here:

def subscribe(self, on_next=None, on_error=None, on_completed=None, observer=None, scheduler=None):
    single_assignment_disposable = SingleAssignmentDisposable()

    # schedule subscribing on "subscribe" scheduler
    def subscribe_action(_, __):
        disposable = self.unsafe_subscribe(on_next=on_next, on_error=on_error, on_completed=on_completed,
                                         observer=observer, scheduler=scheduler)
        single_assignment_disposable.disposable = disposable

    # select the "subscribe" scheduler
    scheduler = scheduler or CurrentThreadScheduler()
    scheduler.schedule(subscribe_action)
    return single_assignment_disposable
  1. Adapt the source observables to delay the emission of elements by the "subscribe" scheduler to make sure that all observables are subscribed. Here is an implementation of the just operator (in v1.6 style):
@extensionclassmethod(Observable)
def just(cls, value, scheduler=None):
    outer_scheduler = scheduler

    def subscribe(observer, scheduler, subscribe_scheduler):
	scheduler_ = scheduler or outer_scheduler

	# delay the emission of elements by the subscribe scheduler to make
	#   sure that all observables are subscribed.
        def schedule_emit_elem_action(_, __):
            def emit_elem_action(_, __):
                observer.on_next(value)
                observer.on_completed()

            if scheduler:
                return scheduler_.schedule(emit_elem_action)
            else:
                inner_action(None, None)
        return subscribe_scheduler.schedule(schedule_emit_elem_action)
    return AnonymousObservable(subscribe)

MichaelSchneeberger avatar Feb 22 '19 18:02 MichaelSchneeberger

Yes, it's a problem that subscribe means both "attach" and "run" at the same time. Turning cold observables hot and then trying to make them feel cold again will give edge cases when combined with CurrentThreadScheduler. Thus you could instead use NewThreadScheduler so zip will finish the subscription of all sources before the item is sent down the pipeline. It could be that zip should have scheduled its subscribe action to force CurrentThreadScheduler to actually queue the schedule of item in just. We can check that using subscribe_on, so the subscribe get's scheduled e.g.

obs_out = rx.zip(left_obs, right_obs).pipe(ops.subscribe_on(my_scheduler))

I would not want to add another arg to subscribe as it would make things more confusing for users. It would be better to have the .attach()/.run() combo, but I don't think we should diverge more from rx at this point, especially when using obserables to implement iterables.

But I also don't see the observable problem we are trying to solve? I.e why not just implement this using iterables instead?

shared = [3]
first = shared[:1]
second = shared[:1]
iter_out = zip(first, second)

dbrattli avatar Feb 24 '19 07:02 dbrattli

Thus you could instead use NewThreadScheduler so zip will finish the subscription of all sources before the item is sent down the pipeline.

You can reproduce the same effect with schedulers running on another thread like NewThreadScheduler or EventLoopScheduler. It is just a bit harder to show. The following example ...

import rx

from rx import operators as rx_op
from rx.concurrency import EventLoopScheduler

scheduler1 = EventLoopScheduler()
scheduler2 = EventLoopScheduler()

source = rx.from_(range(10), scheduler=scheduler1).pipe(rx_op.share())

def gen_subscribers():
    for i in range(100):
        yield source.pipe(rx_op.observe_on(scheduler2), rx_op.to_iterable(), rx_op.map(lambda v: list(v)))

obs_to_be_zipped = list(gen_subscribers())
rx.merge(*obs_to_be_zipped).pipe(rx_op.do_action(print)).run()

... will start printing ...

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

... then it will eventually print ...

[1, 2, 3, 4, 5, 6, 7, 8, 9]

... because at some point, the downstream observables will miss an element.

I would not want to add another arg to subscribe as it would make things more confusing for users.

No, you dont have to add another argument to the subscribe method. I know that was my initial suggestion, but I should have called it subscribe_ or unsafe_subscribe (I updated it accordingly, see above).

The implementation of the subscribe method in the current RxPY might then look like that ...

    def subscribe(self,  # pylint: disable=too-many-arguments,arguments-differ
                  observer: Union[typing.Observer, typing.OnNext] = None,
                  on_error: typing.OnError = None,
                  on_completed: typing.OnCompleted = None,
                  on_next: typing.OnNext = None,
                  *,
                  scheduler: typing.Scheduler = None,
                  ) -> typing.Disposable:

        if observer:
            if isinstance(observer, typing.Observer) or hasattr(observer, "on_next"):
                on_next = cast(typing.Observer, observer).on_next
                on_error = cast(typing.Observer, observer).on_error
                on_completed = cast(typing.Observer, observer).on_completed
            else:
                on_next = observer

        subscribe_scheduler = CurrentThreadScheduler()
        return self.subscribe_(on_next, on_error, on_completed, scheduler, subscribe_scheduler)

But I also don't see the observable problem we are trying to solve? I.e why not just implement this using iterables instead?

We want to use the elements emitted by an observable for different kind of computations executed on multiple threads as shown in the example of this post.

MichaelSchneeberger avatar Feb 24 '19 15:02 MichaelSchneeberger

In RxJS, missing an element is somehow less of a problem, because a JavaScript application runs on a single thread.

In RxJava, they have an onSubscribe method defined in the Observer class that is called prior to scheduling elements on a scheduler. See ObservableObserveOn (line 92) ...

downstream.onSubscribe(this);
schedule();

That way all subscribe methods are called in one go, so that if the last subscribe method is called all subscribe (and onSusbcribe) methods can be found in the traceback (e.g. in the stack). The subscribe and onSusbcribe calls of my first example of this issue is depicted here ...

call subscribe out_obs
call subscribe left_obs
call subscribe shared_obs
call subscribe just
call onSubscribe shared_obs
call onSubscribe left_obs
call onSubscribe out_obs
call subscribe right_obs
call subscribe shared_obs
return subscribe shared_obs
return subscribe right_obs
return onSubscribe out_obs
return onSubscribe left_obs
return onSubscribe shared_obs
start scheduling element in just
etc.

As you can see the scheduling happens only after all subscribe methods have been called.

A CurrentThreadScheduler (or trampoline) is useful to reduce the function call depth. A "subscribe" scheduler would do something similar to the onSubscribe method they use in RxJava, and would have the advantage of reducing the function call depth, e.g. reducing the risk of a stackoverflow.

MichaelSchneeberger avatar Feb 24 '19 15:02 MichaelSchneeberger

We want to use the elements emitted by an observable for different kind of computations executed on multiple threads as shown in the example of this post.

In such situations, I always use asyncio in combination with ThreadPool schedulers (even when no asynchronous IO is used). On the following example, asyncio is used as the main loop, where files are read. Then computation is done with ThreadPoolScheduler, and finally the result of the computation is processed back on the asyncio event loop.

https://github.com/MainRo/deep-speaker/blob/master/deep_speaker/feature/process_path.py

some rewrite would improve readability, but on this example no items are missed. The event loop is started after all subscriptions are done, so the first item is emitted once the whole chain is built.

MainRo avatar Feb 25 '19 21:02 MainRo

In such situations, I always use asyncio in combination with ThreadPool schedulers (even when no asynchronous IO is used).

Yes, this would solve the problem. I actually like this pattern and have used it as well. But not every applications fits this pattern. For instance, if a specific scheduler needs to be running on the main thread, or if blocking work should be scheduled on a dedicated scheduler. In that case, you need something like an EventLoopScheduler, and therefore will face the problem of missing an element in an observable.

MichaelSchneeberger avatar Mar 01 '19 06:03 MichaelSchneeberger