dask-mpi icon indicating copy to clipboard operation
dask-mpi copied to clipboard

MPICluster

Open kmpaul opened this issue 6 years ago • 10 comments

After the first implementation of the initialize function, making it possible to launch a Dask cluster from within the client script by using mpi4py, it seems like the natural next step is to implement an MPICluster object so that the canonical client script operational model of:

cluster = MPICluster(...args...)
client = Client(cluster)

will work the same way that calling initialize works.

kmpaul avatar Dec 20 '18 20:12 kmpaul

Recording some more thoughts on this:

I think that the MPICluster.__init__ method could effectively perform what the initialize function already performs. In fact, as a first pass, you could just have MPICluster.__init__ call initialize. Naively, I would like the send_close_signal function that is currently being registered with atexit to be moved to a MPICluster.close method. If we expand the initialize function out, this class might look like:

import sys
import dask
from mpi4py import MPI
COMM = MPI.COMM_WORLD
RANK = COMM.Get_rank()

SCHEDULER_RANK = 0
CLIENT_RANK = 1

class MPICluster(object):
    def __init__(self, ...):
        if RANK == SCHEDULER_RANK:
            scheduler = create_scheduler(...)
            addr = scheduler.address
        else:
            addr = None

        self.scheduler_address = COMM.bcast(addr)
        dask.config.set(scheduler_address=self.scheduler_address)
        COMM.Barrier()

        if rank == SCHEDULER_RANK:
            run_scheduler(scheduler)
            sys.exit()
        elif rank == CLIENT_RANK:
            pass
        else:
            create_and_run_worker(...)
            sys.exit()

    def close(self):
        send_close_signal()

but there is a problem with this: the initialize method (the __init__ method above) works by stopping the Scheduler and Worker ranks with a sys.exit() call after the close signal is sent to the Scheduler (i.e., after the Scheduler and Worker IOLoops have closed). Without the sys.exit() calls on the Scheduler and Worker ranks, those ranks continue from their last point of execution (in an MPI environment). And there is no clever way (that I can think of, at least) to "fast forward" these processes to the place in the code where the Client rank calls MPICluster.close().

In summary, I cannot think of a way of encapsulating the cluster start/stop procedure on the Scheduler and Worker ranks in an object-oriented way. The closest way I can think of is, frankly, a "cheat". That way is to have a dask_mpi.cluster module that calls initialize at the module level (which, of course, is not ideal because it either requires something like a dask_mpi.config object to set the initialize arguments before the dask_mpi.cluster module is imported, or it requires fixing the initialize arguments for the user). Everything else in this module would, therefore, only be executed on the Client rank, which would include the MPICluster class definition. Something like this:

from dask_mpi.core import initialize
initialize(...)

class MPICluster(object):
    def __init__(self):
        self.scheduler_address = dask.config.get('scheduler_address')

Hence, the MPICluster object would exist only as a holder for the scheduler_address attribute (which, in the initialize-function mode of operation is done by dask.config). This is fake object-oriented programming, and the "clean coder" in me wants to say, if you don't need an object, why create it?

Maybe someone out there has a better idea...

kmpaul avatar Dec 28 '18 16:12 kmpaul

Is there a particular motivation behind having an MPICluster class? The initialize method seems sufficient to me.

mrocklin avatar Dec 28 '18 16:12 mrocklin

The only reason I wanted to consider this option is that I like the pattern:

from dask_something import SomethingCluster
cluster = SomethingCluster()

from distributed import Client
client = Client(cluster)

...with the Client initializing from the Cluster object. I realize that the dask_mpi.initialize method is very similary...but it's not OO. And I like OO.

I'll leave this issue open, just in case some development makes this possible, but I agree that an MPICluster class is not actually necessary.

kmpaul avatar Dec 28 '18 21:12 kmpaul

As I am probably still not pythonic enough, what is the problem with the solution you propose above @kmpaul ? using sys.exit() in init() would not work?

guillaumeeb avatar Feb 04 '19 20:02 guillaumeeb

Hey, @guillaumeeb!

Yes. Using sys.exit() in __init__ would not have the operation that you would expect.

The Cluster objects allow you to start and stop a cluster (i.e., the scheduler and workers) from the Client process. And, strictly speaking, you could start a dask cluster, stop it, and then restart it. With the sys.exit() in the MPICluster.__init__ method, the scheduler and worker processes will be killed...which would cause an MPIAbort in most MPI implementations. That would then kill your Client process, meaning that you could not restart your cluster.

If you removed the sys.exit() calls from __init__, then once the scheduler and workers were closed, those processes would "pick up where they left off." Namely, they would return from the __init__ call and then run your Client code again...except in serial on multiple ranks.

kmpaul avatar Feb 04 '19 20:02 kmpaul

And, strictly speaking, you could start a dask cluster, stop it, and then restart it.

But do you really want to do this in a MPI job? Couldn't this be a documented limitation? Once your cluster is closed, this is the end of the MPI run?

But maybe you wanted to mix Dask and other MPI workload as in http://blog.dask.org/2019/01/31/dask-mpi-experiment?

guillaumeeb avatar Feb 04 '19 20:02 guillaumeeb

Short term I like the current initialize approach. I think that it does what we want.

Long term I think it would be interesting to dynamically launch and destroy MPI jobs from within Python, perhaps by calling something like mpirun directly. I'm not sure how valuable this would be in practice (you all would know more), but it could be fun to experiment with.

mrocklin avatar Feb 04 '19 23:02 mrocklin

@guillaumeeb Yes. I agree with you that for an MPI run, you probably only want to shut down the cluster at the end of your job in most (all?) cases. However, I think that if you do this, then the only part of the MPICluster that you need is the __init__ method (and the close method that gets called when the object is destroyed). And that is exactly what the initialize function already does, so there is no need for a class when just one function does the job.

@mrocklin I think that what you are describing could be accomplished by using Dask-jobqueue from within a Dask-MPI job. Although that would only work on a system with a job scheduler.

So, that does suggest some other configurable options, though. What if you only want to use a fraction of the available MPI ranks? And then use more ranks later? On a system with a job scheduler, this could be bad because the scheduled MPI job would have fixed/reserved resources for the duration of the job, and only a fraction would be used (unless you were doing something like the dask-mpi-experiment). However, on a system without a job scheduler, you could elastically scale your resources with additional mpirun usages. As you say, Matt, I'm not sure how valuable this would be in practice, but it would be interesting to experiment with.

kmpaul avatar Feb 05 '19 13:02 kmpaul

OK, Thank you both for your answers!

guillaumeeb avatar Feb 05 '19 14:02 guillaumeeb

You are welcome!

kmpaul avatar Feb 05 '19 14:02 kmpaul