dask-mpi icon indicating copy to clipboard operation
dask-mpi copied to clipboard

How do I start Dask from within an MPI4Py workflow

Open mrocklin opened this issue 5 years ago • 0 comments

I'm sitting with @zonca and he's asking how to start a Dask application from within an mpi4py application. I'll give a brief explanation, and then some code snippets.

You can start a scheduler, client, and workers from within your Python script after you do other work, maybe you call a barrier, then on rank 0 start a scheduler, rank 1 start a client (and a bunch of other dask array/dataframe/delayed code) then on all the other ranks you start workers.

There are docs on how to start dask schedulers and workers from Python here: https://docs.dask.org/en/latest/setup/python-advanced.html .

Then @zonca asks

Well how do I get my data from the existing process?

There are many ways to do that, but a simple (if perhaps inelegant way) would be to attach the data on each process to some worker, and then run a task that collects that data on that worker, something like the following:

# on worker
w = Worker('tcp://127.0.0.1:8786', name='rank-' + rank)
w.my_special_data = data  # <--- we add this line to the docs mentioned above
w.start()

# on client
from dask.distributed import get_worker

def get_local_data():
    worker = get_worker()  # get the worker object from which this task is run
    return worker.my_special_data

# run that function on every rank that holds a worker
futures = [client.submit(get_local_data, pure=False, workers=['rank-' + str(rank)]) for rank in range(2, comm_size)]

Then you can do with those futures as you like

Then @zonca asks

Great, well then how do I cleanly shut down my Dask workers and continue on with my MPI execution?

I thought that calling client.retire_workers() would do this, but apparently it didn't. It looks like the Worker shuts down but the Torando IOLoop continues, which blocks the process. We can probably add a keyword argument to some method to improve this, or you can probably get around it by calling something like:

client.run(lambda : tornado.ioloop.IOLoop.current().stop())

Though this is somewhat rude to the workers :)

mrocklin avatar Mar 01 '19 00:03 mrocklin