dask-mpi icon indicating copy to clipboard operation
dask-mpi copied to clipboard

Custom client/scheduler MPI rank placement

Open kmpaul opened this issue 1 year ago • 1 comments

Update - Ready for Review

I have added a dask_mpi.execute() function, though it looks a little different than the sketch in the "Previous Header" below. The idea is that the execute() function is that it should use MPI to launch the client code/function, the Dask Scheduler, and any needed Dask Workers, as specified. It is general purpose, and can run with or without client code function. It can also run with or without launching a Scheduler. This makes (or should make) execute() completely general, and it can/should work for all use cases either through the CLI or in batch-mode.

The execute() function works by taking a (synchronous) function as input (along with its args/kwargs) and runs the function on a single MPI rank in its own thread. A Dask Scheduler or Worker (or both) can be run on the same MPI rank, if desired. Thus, the entire Dask-MPI utility can launch a Dask cluster with a wide variety of customizable options:

  • with/without a client function: If the client function is absent, execute() works like the old Dask-CLI does, creating a scheduler and workers.
  • with/without a scheduler: If scheduler=False is specified, then it assumes the user supplies a scheduler_address and no actual Scheduler will be run, assuming a scheduler is running at the scheduler_address. This allows you to "add" to a Dask cluster. You can also run a client function to run in batch mode (which also shuts down the cluster when finished).
  • with exclusive/inclusive workers: Workers can be run on every MPI rank, or just on "unused" MPI ranks (where no client or scheduler is running).

All of the options encoded into the CLI/interactive mode of using Dask-MPI are available with execute(), as are all of the options available to the initialize() batch mode of using Dask-MPI.

NOTE: This PR only implements the execute() method. The old CLI has not been modified to use the execute() method, but it can easily be done. Additionally, it does not modify/deprecate/remove the initialize() method. So, all existing functionality will still work, but this new function will provide new functionality and (in a future PR) we can modify the CLI to use execute() and deprecate the initialize() method.

Previous Header

This is a mock-up of a new way of using Dask-MPI that allows custom placement of the client code and the scheduler on the available MPI ranks. The approach is described in https://github.com/dask/dask-mpi/issues/29#issuecomment-1740840584, the salient portion of which is quoted below:

  1. The client code would need to be "wrapped" in a function. It could be an asynchronous function, but it would still need to be wrapped in a function regardless.
  2. The dask_mpi.execute(...) function would take the client function as input (or the coroutine) and execute it on the specified client_rank with all of the additional options (e.g., scheduler_rank, exclusive_workers, ...).

To achieve the current Dask-MPI behavior, where workers, scheduler, and client are all in separate MPI ranks, the execute() function might look like:

def execute(func, *args, **kwargs):
    comm = mpi4py.MPI.COMM_WORLD
    this_rank = comm.Get_rank()

    if this_rank == 0:
        async def run_scheduler():
            async with Scheduler(...) as scheduler:
                comm.Barrier()
                await scheduler.finished()
        asyncio.run_until_complete(run_scheduler())

    elif this_rank == 1:
        comm.Barrier()

        ret = func(*args, **kwargs)

        with Client() as client:
            client.shutdown()

        return ret

    else:
        comm.Barrier()

        async def run_worker():
            async with Worker(...) as worker:
                await worker.finished()

        asyncio.run_until_complete(run_worker())

There would be an obvious modification for func being a Coroutine object to run func asynchronously.

And the above modifications to make it more customizable and make MPI rank placement easier would be easy to implement, too.

I particularly like this approach for a number of reasons:

  1. It is much more explicit, and it removes a lot of the previous Dask-MPI "magic" which, as @jacobtomlinson has already pointed out, makes it hard for new users to use Dask-MPI and diagnose problems with Dask-MPI when they have them.
  2. The dask_mpi.execute() function is essentially a decorator, which is an appropriate paradigm to fit the function of Dask-MPI (as opposed to a context manager).
  3. It makes it easy to run the client code in a thread to prevent collision with a scheduler or worker event loop.

This is a draft for the purposes of discussion.

kmpaul avatar Oct 12 '23 09:10 kmpaul

NOTE: The execute() tests are not complete. All of the CLI and initialize() tests should be ported to use the new execute() method, but when the CLI is modified to use execute(), the existing CLI tests will "just work." So, I'm holding off on debugging anything related to adding those tests until then. (But I'm obviously willing to fix any obvious bugs that anyone sees that the current tests didn't catch.)

kmpaul avatar Oct 13 '23 10:10 kmpaul