fgpyo icon indicating copy to clipboard operation
fgpyo copied to clipboard

Add futures module for clean parallelism

Open TedBrookings opened this issue 1 year ago • 0 comments

At least some fulcrum projects use Multiprocessing with Queues for parallel processing. e.g.

tasks = [
        (
            _function_to_process_in_parallel,
            (task_argument, general_arg1, general_arg2),   # note: kwargs are not possible
        )
        for task_argument in list_of_tasks
    ]

    results: List = []

    # Create queues
    task_queue: "Queue[Any]" = Queue()
    done_queue: "Queue[Any]" = Queue()

    for task in tasks:
        task_queue.put(task)

    # Start worker processes
    for i in range(threads):
        Process(target=worker, args=(task_queue, done_queue)).start()

    # Get results
    for i in range(len(tasks)):
        task_results, task_counter = done_queue.get()
        results.extend(task_results)
        if len(results) % 50 == 0:
            logger.info("Processed {0} intervals.".format(len(results)))
    logger.info("Processed {0} intervals.".format(len(results)))

    # Tell child processes to stop
    for i in range(threads):
        task_queue.put('STOP')

    sorted_results = sorted(results, key=some_sort_key)

Under the hood, worker is a function that feeds tasks arguments to _function_to_process_in_parallel.

Another approach is to use concurrent.futures which like multiprocessing is built-in. It has slightly less flexibility, but handles most use cases much more simply. The same job could be handled like so

import concurrent.futures
    with concurrent.futures.ProcessPoolExecutor(max_workers=threads) as executor:
        future_objects = [
            executor.submit(
                _function_to_process_in_parallel, task_argument, *args, **kwargs)  # called with args or kwargs like any function
            )
           for task_argument in list_of_tasks
       ]
      results_in_order = [
          future.result() for future in future_objects
      ]

You can also get out-of-order results (e.g. for reductions) with concurrent.futures.as_completed(future_objects). In the future (e.g. Python > 13) using thread-based parallelism is as simple as replacing ProcessPoolExecutor with ThreadPoolExecutor (they have identical APIs).

There are two issues with concurrent.futures

  1. While you can check for exceptions, they don't naturally propagate through, and some work is needed to get the traceback.
  2. In the event of an exception, it's not easy or automatic to shut down the pool.

However I 95% addressed these issues while at the Broad. That software is open source, so it's no issue to re-use it. It would be just a few hours (say 5) to make a fgpyo module that automagically re-threw exceptions in remote processes with full traceback and shut down the pool. Perhaps a few more for writing tests and getting the PR reviewed.

TedBrookings avatar Feb 16 '24 22:02 TedBrookings