RxPY
RxPY copied to clipboard
Concurrent Execution Not Working as Expected with RxPY ThreadPoolScheduler
Hello,
I'm currently encountering an issue with RxPY where I'm trying to execute multiple tasks concurrently using ThreadPoolScheduler, but they seem to be executing sequentially instead. My goal is to have the tasks "Start Alpha", "Beta", "Gamma" begin almost simultaneously, but there's a 2-second delay between each start, which is not what I was expecting. Below is the relevant part of my code:
import threading
import time
import reactivex
from reactivex import operators as ops
from reactivex.scheduler import ThreadPoolScheduler
from metastock.modules.core.logging.logger import Logger
pool_scheduler = ThreadPoolScheduler(5)
def intense_calculation(value):
Logger().info(f"Start {value}")
time.sleep(2) # Simulate an intensive calculation
return f"Result {value}"
# Create an Observable
source = reactivex.from_(["Alpha", "Beta", "Gamma"])
source.pipe(
ops.observe_on(ThreadPoolScheduler(5)),
ops.map(intense_calculation),
).subscribe(
on_next=lambda s: Logger().info(
f"Processed {s} on {threading.current_thread().name}"
),
on_error=lambda e: Logger().info(e),
on_completed=lambda: Logger().info("Process complete!"),
)
# Wait until all tasks are completed
input("Press any key to exit\n")
Logs are showing that the tasks are starting and finishing sequentially, each 2 seconds apart:
Press any key to exit
[03/24/24 12:05:31] INFO Start Alpha rx_test.py:16
[03/24/24 12:05:33] INFO Processed Result Alpha on rx_test.py:27
ThreadPoolExecutor-1_0
INFO Start Beta rx_test.py:16
[03/24/24 12:05:35] INFO Processed Result Beta on rx_test.py:27
ThreadPoolExecutor-1_1
INFO Start Gamma rx_test.py:16
[03/24/24 12:05:37] INFO Processed Result Gamma on rx_test.py:27
ThreadPoolExecutor-1_1
INFO Process complete! rx_test.py:31
I was under the impression that using ThreadPoolScheduler with a pool size of 5 would allow these tasks to run in parallel, but it appears they are not. I expect "Start Alpha", "Beta", "Gamma" to be logged simultaneously, with the operations happening concurrently on different threads.
Could anyone help identify what might be causing this sequential execution instead of concurrent execution, and how can I adjust my code to achieve the expected parallelism?
Thank you for any insights or suggestions you can provide!
Thank you in advance for your help!
Hi @vjcspy
For extra information you can refer to @dbrattli 's reply on an older issue
That issue being from v3 times, below is a sample code that should get you the expected parallelism.
def main():
import time
import reactivex
from reactivex import operators as ops
from reactivex.scheduler import ThreadPoolScheduler
start = time.time()
pool_scheduler = ThreadPoolScheduler(5)
messages = []
def log(message):
with threading.Lock():
messages.append(f"{round(time.time() - start , 1)}: {message}")
def intense_calculation(value):
time.sleep(2)
return f"Computed for {value}"
# Create an Observable
source = reactivex.from_(["Alpha", "Beta", "Gamma"])
source.pipe(
ops.flat_map(lambda s: reactivex.from_future(pool_scheduler.executor.submit(intense_calculation, s))),
).subscribe(
on_next=lambda s: log(
f"Processed {s} on {threading.current_thread().name}"
),
on_error=lambda e: log(f"ERROR {e}"),
on_completed=lambda: log("Process complete!"),
)
time.sleep(3)
log("End")
print(messages)
Additional issues that discuss this: https://github.com/ReactiveX/RxPY/issues/67
Hope this helps