RxPY
RxPY copied to clipboard
Fix subscribe_on not forwarding scheduler to the source
This PR fixes the subscribe_on operator that actually doesn't forward the scheduler to the source.
This is noticeable when one wants to implement an obsersvable by using a subscribe function:
my_scheduler = EventLoopScheduler()
def source():
def subscribe(observer, _scheduler):
# here, _scheduler is None instead of my_scheduler
...
return rx.create(subscribe)
source().pipe(
ops.subscribe_on(my_scheduler)
).run()
I believe this is not aligned with the documentation which states the following:
wrap the source sequence in order to run its subscription and unsubscription logic on the specified scheduler
I think it should fix #541 as well.