fgpyo
fgpyo copied to clipboard
Add futures module for clean parallelism
At least some fulcrum projects use Multiprocessing with Queues for parallel processing. e.g.
tasks = [
(
_function_to_process_in_parallel,
(task_argument, general_arg1, general_arg2), # note: kwargs are not possible
)
for task_argument in list_of_tasks
]
results: List = []
# Create queues
task_queue: "Queue[Any]" = Queue()
done_queue: "Queue[Any]" = Queue()
for task in tasks:
task_queue.put(task)
# Start worker processes
for i in range(threads):
Process(target=worker, args=(task_queue, done_queue)).start()
# Get results
for i in range(len(tasks)):
task_results, task_counter = done_queue.get()
results.extend(task_results)
if len(results) % 50 == 0:
logger.info("Processed {0} intervals.".format(len(results)))
logger.info("Processed {0} intervals.".format(len(results)))
# Tell child processes to stop
for i in range(threads):
task_queue.put('STOP')
sorted_results = sorted(results, key=some_sort_key)
Under the hood, worker
is a function that feeds tasks arguments to _function_to_process_in_parallel
.
Another approach is to use concurrent.futures
which like multiprocessing
is built-in. It has slightly less flexibility, but handles most use cases much more simply. The same job could be handled like so
import concurrent.futures
with concurrent.futures.ProcessPoolExecutor(max_workers=threads) as executor:
future_objects = [
executor.submit(
_function_to_process_in_parallel, task_argument, *args, **kwargs) # called with args or kwargs like any function
)
for task_argument in list_of_tasks
]
results_in_order = [
future.result() for future in future_objects
]
You can also get out-of-order results (e.g. for reductions) with concurrent.futures.as_completed(future_objects)
. In the future (e.g. Python > 13) using thread-based parallelism is as simple as replacing ProcessPoolExecutor with ThreadPoolExecutor (they have identical APIs).
There are two issues with concurrent.futures
- While you can check for exceptions, they don't naturally propagate through, and some work is needed to get the traceback.
- In the event of an exception, it's not easy or automatic to shut down the pool.
However I 95% addressed these issues while at the Broad. That software is open source, so it's no issue to re-use it. It would be just a few hours (say 5) to make a fgpyo module that automagically re-threw exceptions in remote processes with full traceback and shut down the pool. Perhaps a few more for writing tests and getting the PR reviewed.