RxPY
RxPY copied to clipboard
Q: how to use scheduler with subject for concurrency?
Hello,
I am trying to split a csv file into multiple csv file based on a column value with rxpy.
Could you help me how to use scheduler with Subject?
I have tried below simple testing case, but all subscribing was done on the MainThread.
==test.py : SIMPLE testing with subject, scheduler for concurrency.
import time
import random
from threading import current_thread
import rx
import rx.operators as ops
from rx.scheduler import ThreadPoolScheduler
from rx.subject import Subject
def printthread(val):
print("{}, thread: {}".format(val, current_thread().name))
def intense_calculation(task,value):
printthread("{} calc {}".format(task, value))
time.sleep(random.randint(5, 20) * .1)
return value
if __name__ == "__main__":
scheduler = ThreadPoolScheduler(2)
subject = Subject()
subject.pipe(
ops.filter(lambda i: i % 2 == 0),
).subscribe(
on_next=lambda x: intense_calculation('task1', x),
on_completed=lambda: printthread("on_completed"),
on_error=lambda err: printthread("on_error: {}".format(err)),
scheduler=scheduler)
subject.pipe(
ops.filter(lambda i: i % 2 == 1),
).subscribe(
on_next=lambda x: intense_calculation('task2',x),
on_completed=lambda: printthread("on_completed"),
on_error=lambda err: printthread("on_error: {}".format(err)),
scheduler=scheduler)
obs = rx.of(1,2,3,4)
subscriber = obs.subscribe(subject)
scheduler.executor.shutdown()
subscriber.dispose()
printthread("\nAll done")
==output of test.py λ python test.py task2 calc 1, thread: MainThread task1 calc 2, thread: MainThread task2 calc 3, thread: MainThread task1 calc 4, thread: MainThread on_completed, thread: MainThread on_completed, thread: MainThread
All done, thread: MainThread
==EXPECTED OUTPUT (not sure thread name) task2 calc 1, thread: ThreadPoolExecutor-0_0 task1 calc 2, thread: ThreadPoolExecutor-0_1 task2 calc 3, thread: ThreadPoolExecutor-0_0 task1 calc 4, thread: ThreadPoolExecutor-0_1 on_completed, thread: ThreadPoolExecutor-0_0 on_completed, thread: ThreadPoolExecutor-0_1
All done, thread: MainThread
Thanks a lot in advance for your help for effective way to split a large csv file into multiple small files with rxpy. TW
Hi,
I have updated as below code and expected threads was assigned, but it looks like not work properly when using scheduler.executor.shutdown(wait=True) to wait all threads done.
If there are any way to use other type of scheduler, please add comment your idea.
== updated code
import time
import random
from threading import current_thread, RLock
from dataclasses import dataclass
from typing import Any # type: ignore
import rx # type: ignore
import rx.operators as ops # type: ignore
from rx.scheduler import ThreadPoolScheduler, NewThreadScheduler # type: ignore
from rx.subject import Subject # type: ignore
def printthread(val):
print("{}, thread: {}".format(val, current_thread().name))
class Task(rx.Observable):
def __init__(self, task, mark):
self.task = task
self.mark = mark
def on_next(self, row):
printthread("{} {}".format(self.task, row))
time.sleep(random.randint(1, 3) * .1)
return row
def on_error(self, err):
printthread(f'{self.task} error={str(err)}')
with self.mark.lock:
self.mark.endcount += 1
def on_completed(self):
printthread(f'{self.task} completed')
with self.mark.lock:
self.mark.endcount += 1
@dataclass
class Mark:
endcount: int = 0
lock: RLock = RLock()
if __name__ == "__main__":
scheduler = ThreadPoolScheduler(2)
source = rx.of(1, 2, 3, 4).pipe(ops.publish())
mark = Mark()
source.pipe(
ops.filter(lambda i: i % 2 == 0),
ops.observe_on(scheduler) # as subject is hot soruce, need to use observe_on instead of subscribe_on
).subscribe(Task('taks1', mark))
source.pipe(
ops.filter(lambda i: i % 2 == 1),
ops.observe_on(scheduler)
).subscribe(Task('task2', mark))
source.connect()
# how to wait subscribing of scheduler's threads to be done ?
#############################################################
if False:
time.sleep(0.1)
scheduler.executor.shutdown(wait=True) # not work properly. immediate terminnated. why and how ?
if True:
while True: # is there any better way ?
with mark.lock:
if mark.endcount >= 2:
scheduler.executor.shutdown(wait=True)
break
time.sleep(0.1)
printthread("\nAll done")
== output task2 1, thread: ThreadPoolExecutor-0_0 task1 2, thread: ThreadPoolExecutor-0_1 task1 4, thread: ThreadPoolExecutor-0_1 task2 3, thread: ThreadPoolExecutor-0_0 task2 completed, thread: ThreadPoolExecutor-0_0 task1 completed, thread: ThreadPoolExecutor-0_1
All done, thread: MainThread
Instead of polling fo completion your can use a future in you mark. Something like this:
from concurrent.futures import Future
completion = Future()
def on_completed(self):
printthread(f'{self.task} completed')
with self.mark.lock:
self.mark.endcount += 1
if self.mark.endcount == 2:
completion.set_result(True)
...
source.connect()
completion.result()
Note that you do not have to stop the threadpool by yourself. It will be done automatically on disposal of the pipelines.
Hi MainRo,
Thanks for your input and that is very helpful.
In term of threadpool thread lifecycle, could you give me a little more information.
When does the thread of threadpool, pipeline disposed?
- all threads on_completed (may be)
- any thread on_error (not sure)
- any uncaught exception in any thread's on_next (on_completed, on_error) (not sure)
Thanks in advance!
Hello,
Sorry... Is there any way to dispose ConnectableObservable?
Appreciate any comment for below code to make better rx way.
import time
import random
import logging
import threading
import traceback
from threading import current_thread, RLock
from dataclasses import dataclass
from concurrent.futures import Future
from typing import Any, Optional # noqa
import rx # type: ignore
import rx.operators as ops # type: ignore
from rx.scheduler import ThreadPoolScheduler # type: ignore
from rx.subject import Subject # type: ignore
def printthread(val):
print("{}, thread: {}".format(val, current_thread().name))
class Task(rx.Observable):
def __init__(self, task, mark):
self.task = task
self.mark = mark
self.count = 0
def domark(self, exc: Optional[BaseException] = None):
with self.mark.lock:
self.mark.todocount -= 1
if exc is not None:
self.mark.exc = exc
if self.mark.todocount < 1 or self.mark.exc:
self.mark.complete.set_result(True)
def on_next(self, row):
self.count += 1
if self.count < 2:
printthread("{} {}".format(self.task, row))
time.sleep(random.randint(1, 3) * .1)
return row
else:
try:
raise Exception('on_next exception')
except BaseException as exc:
self.domark(exc)
def on_error(self, exc):
printthread(f'{self.task} error={str(exc)}')
self.domark(exc)
def on_completed(self):
printthread(f'{self.task} completed')
self.domark()
@dataclass
class Mark:
todocount: int
exc: Optional[BaseException] = None
complete: Future = Future()
lock: RLock = RLock()
if __name__ == "__main__":
scheduler = ThreadPoolScheduler(2)
obv = rx.from_iterable([1,2,3,4])
conobv = obv.pipe(ops.publish())
mark = Mark(todocount=2)
conobv.pipe(
ops.filter(lambda i: i % 2 == 0),
ops.observe_on(scheduler) # as subject is hot soruce, need to use observe_on instead of subscribe_on
).subscribe(Task('taks1', mark))
conobv.pipe(
ops.filter(lambda i: i % 2 == 1),
ops.observe_on(scheduler)
).subscribe(Task('task2', mark))
conobv.connect()
time.sleep(0.1)
print(f'connect, active threads={threading.active_count()}')
mark.complete.result()
time.sleep(0.1)
print(f'complete, active threads={threading.active_count()}')
if True:
scheduler.executor.shutdown(wait=True)
print(f'shutdown, active threads={threading.active_count()}')
else: # how to dispose all subscriber ?
conobv.dispose() # no dispose attr in ConnectableObservable
print(f'dispose, active threads={threading.active_count()}')
if mark.exc:
print(''.join(traceback.format_exception(
type(mark.exc), mark.exc, mark.exc.__traceback__
)))
else:
printthread("\nAll done")
output:
ask2 1, thread: ThreadPoolExecutor-0_0
taks1 2, thread: ThreadPoolExecutor-0_1
connect, active threads=3
task2 completed, thread: ThreadPoolExecutor-0_0
complete, active threads=3
shutdown, active threads=1
Traceback (most recent call last):
File "/Users/tk/works/rxpy/test.py", line 43, in on_next
raise Exception('on_next exception')
Exception: on_next exception
Disposal of the source observable is done when a subscription of the connectable observable is disposed. So in your example the original obv observable will be disposed whenever one of the two other subscriptions is disposed:
disposable1 = conobv.pipe(
ops.filter(lambda i: i % 2 == 0),
ops.observe_on(scheduler) # as subject is hot soruce, need to use observe_on instead of subscribe_on
).subscribe(Task('taks1', mark))
disposable2 = conobv.pipe(
ops.filter(lambda i: i % 2 == 1),
ops.observe_on(scheduler)
).subscribe(Task('task2', mark))
...
conobv.connect()
disposable1.dispose() # obv is disposed here
disposable2.dispose()
If you want obv to be disposed when both subscriptions are disposed, have a look at the ref_count and share operators.
Hi, some general remarks: For achieving better turnaround times on CPU intense tasks you will gain nothing from python threads - CPU bound stuff is best done using the main thread, one python process can never utilize the cores in your system except one. Here is more: https://github.com/ReactiveX/RxPY/issues/521
If you want to utilize your cores I would do it along the lines of this:
cores = 8
S = my_async_scheduler # (e.g. GEventScheduler, ThreadPoolSched... whatever)
def spawn_process(core_nr):
# create a subprocess doing your work and communicate with it synconously,
# using suprocess.Popen or multiprocess, whatever fits best
return sub_proc_result
def by_core(core_nr):
# spawn a new thread or greenlet, responsible for interacting with a subproc:
return rx.just(core_nr).pipe(observe_on(S), map(spawn_process))
rx.from_(range(cores)).pipe(flat_map(by_core)).subscribe(assemble_final_result)