dask-mpi
dask-mpi copied to clipboard
Start MPI Dynamically from Dask
I would like to explore the possibility of Dask starting MPI. This is sort of the reverse behavior of what the dask-mpi package does today.
To clarify the situation I'm talking about, consider the situation where we are running Dask with some other system like Kubernetes, Yarn, or SLURM, and are doing Dask's normal dynamic computations. We scale up and down, load data from disk, do some preprocessing. Then, we want to run some MPI code on the data we have in memory in the Dask worker processes. We don't currently have an MPI world set up (our workers were not started with mpirun
or anything) but would like to create one. Is this possible?
To do this, Dask will have to go through whatever process mpirun
/mpiexec
goes through to set up an MPI communicator. What is this process?
Ideally it would be able to do this without launching new processes. Ideally the same processes currently running the Dask workers would initialize some MPI code, be told about each other, then run some MPI program, then shut down MPI and continue on with normal Dask work.
We'll need to become careful about what to do if a worker goes away during this process. We'll probably have to restart the MPI job, which will be fine. I think that I can handle that on the scheduling/resiliency side.
I think that the people who know the answer to these questions will be people who have experience not only in using MPI, but also in deploying it.
I do not know how to initialize an MPI environment without starting a new process. Every MPI implementation is different, and so every mpirun
/mpiexec
does something different when executed. Its a strange thing that the launching process is not part of the MPI specification, which means there is no standardization.
The closest thing I can think of to launching an MPI job from within python is my personal mpirical
package (https://github.com/NCAR/mpirical) which I developed for running MPI tests. However, it still uses subprocess
to launch the MPI process with mpirun
.
If starting another process is a non-starter (bad pun?), then the only thing I can think of is tailoring a solution to a particular implementation of MPI, such as MPICH or OpenMPI.
OK, so lets say we pin ourselves to something like OpenMPI (or whatever is most common). Does this become possible? If so, what is the path to doing this?
I think it becomes possible, but I think we would need some OpenMPI or MPICH developers to chime in. Maybe an issue here: https://github.com/open-mpi/ompi?
I don't suggest limiting yourself to only a single MPI implementation.
@kmpaul I can't speak for every MPI implementation, but usually mpirun
just does the work of setting up your MPI_COMM_WORLD
for you.
It is possible, but ugly, to do this manually. I've done this before in a previous life.
It requires using the MPI_Open_port
, MPI_Comm_accept
, MPI_Comm_connect
, and MPI_Intercomm_merge
APIs documented here: https://www.mpi-forum.org/docs/mpi-2.2/mpi22-report/node212.htm#Node212
You end up having to build up your communicator a process at a time.
However, you should note that some MPI implementations flat out do not implement these APIs. I believe OpenMPI and Intel MPI both do, but MVAPICH does not.
MPI_COMM_JOIN is an additional API that allows you to do this that assumes you have an existing socket connection between your two processes. Last I heard the MPI Forum was trying to deprecate this API, so I definitely don't suggest using this one. https://www.mpi-forum.org/docs/mpi-2.0/mpi-20-html/node115.htm
@jrhemstad I agree that we shouldn't limit ourselves to a single MPI implementation, but you also point out a problem that finding a solution that works for all implementations may not be possible...easily. (This API is supported in the MPI 3.1 spec, too.)
I was suggesting that one avoids doing anything implementation specific. The APIs I suggest are part of the MPI standard, they're just so infrequently used that some implementations just neglect to implement them.
MPI libraries use an interface called Process Management Interface (PMI) to interact with the job launcher. Different MPI libraries use different versions of PMI that are incompatible with one another. There has been efforts for standardization and compatibility but AFAIK this is far from complete.
In current state, the job launcher/manager should provide the same PMI version that the MPI library is using. Hence, the tie-in between MPICH MPI library + hydra launcher, OpenMPI library + orte launcher. Slurm provides its own variant of PMI and both MPICH and OpenMPI have configuration option to use PMI interface compatible with SLURM.
It is a non-trivial effort for DASK to replace the MPI launcher. It will have to
- to provide implementation for the PMI interface use by a particular MPI library
- provide the backed for exchanging control information that happens through the launcher/deamon processes
And it will have redo 1 for each MPI library it has to be compatible with.
@jrhemstad, dynamic process model (MPI_Comm_spawn, MPI_Comm_accept, MPI_Comm_connect) are for a way for independent MPI applications to launch, discover and connect with each other. The assumption is that you are in an MPI environment already. So it does not obviate the need for the above, especially if you want to use existing MPI libraries.
@jrhemstad, dynamic process model (MPI_Comm_spawn, MPI_Comm_accept, MPI_Comm_connect) are for a way for independent MPI applications to launch, discover and connect with each other. The assumption is that you are in an MPI environment already. So it does not obviate the need for the above, especially if you want to use existing MPI libraries.
This is not correct.
It is certainly possible to launch a process w/o mpirun
and then bring it into an MPI communicator---I have done it. You just need to call MPI_Init
in the process.
See SO post here: https://stackoverflow.com/questions/15578009/difference-between-running-a-program-with-and-without-mpirun
This is not correct.
It is certainly possible to launch a process w/o
mpirun
and then bring it into an MPI communicator---I have done it. You just need to callMPI_Init
in the process.
@jrhemstad, my bad. I agree with you. at least according to the specification, one should be able to call these from a singleton process which was not launched through a process manger.
I am skeptical about the current state of implementation for these in MPI libraries though. Like you said, not all libraries support these APIs and even if they do, probably not on all system configs.
Also, this is not the most performant way of initializing MPI. And it can be quite ugly like you point out.
Given this seems ugly in general, and I hate to say let's create yet another standard, but should we start with OpenMPI and then slowly add other variants as time goes on that follow the same standard?
I am skeptical about the current state of implementation for these in MPI libraries though. Like you said, not all libraries support these APIs and even if they do, probably not on all system configs.
Also, this is not the most performant way of initializing MPI. And it can be quite ugly like you point out.
Absolutely! It's definitely not going to be as easy as just doing mpirun -n N ./a.out
and definitely not going to be as performant, but does that performance really matter? It's a one-time, upfront cost.
I think a good place to start is to create a simple client/server C++ code that exercises the MPI_Open_port
, MPI_Comm_accept
, MPI_Comm_connect
, and MPI_Intercomm_merge
APIs and it can be tested across various MPIs to get confirmation of which implementations actually support it.
Given this seems ugly in general, and I hate to say let's create yet another standard, but should we start with OpenMPI and then slowly add other variants as time goes on that follow the same standard?
To be clear, the APIs I described are part of the MPI standard that all MPI implementations should implement, it's not specific to OpenMPI.
From what I recall when I've played with this in the past, OpenMPI is one implementation that does support these APIs, so it's a good place to start.
To be clear, the APIs I described are part of the MPI standard that all MPI implementations should implement, it's not specific to OpenMPI.
From what I recall when I've played with this in the past, OpenMPI is one implementation that does support these APIs, so it's a good place to start.
Understood. So should we start with OpenMPI to see if the standard that's already suppose to be there is universally used :)
mpi4py.futures may be useful for this: https://mpi4py.readthedocs.io/en/stable/mpi4py.futures.html
. Absolutely! It's definitely not going to be as easy as just doing
mpirun -n N ./a.out
and definitely not going to be as performant, but does that performance really matter? It's a one-time, upfront cost.
What's the upfront cost like? Microseconds, milliseconds, seconds?
I think a good place to start is to create a simple client/server C++ code that exercises the
MPI_Open_port
,MPI_Comm_accept
,MPI_Comm_connect
, andMPI_Intercomm_merge
APIs and it can be tested across various MPIs to get confirmation of which implementations actually support it.
+1
I do not think using MPI Dynamic Process Management (DPM) (accept, connect, etc.) is about just the upfront cost. The DPM features though supported in an MPI library, may not work with all transports/networks. In this case, one would be limited to a lower-performant network (say sockets) for all communication. Something to keep in mind.
Also, the support in OpenMPI appears broken right now, see this open bug and comments say it is low on priority: https://github.com/open-mpi/ompi/issues/3458
I prefer the approach of DASK providing a PMI implementation compatible with MPI libraries though it is a much larger effort compared to using DPM.
Thanks for the input @spotluri ! Some followup questions if I may
to provide implementation for the PMI interface use by a particular MPI library
Do know have a sense for what this looks like? Can you recommend a good reference here for people to check out?
provide the backed for exchanging control information that happens through the launcher/deamon processes
Dask is more than happy to move around control information. My hope is that this would be done once at startup and then MPI would take over for most communication. Is this the case? For context, Dask communications will have latencies of at least a millisecond, which, as I'm sure you're aware, can be a long time for tightly-coupled MPI computations.
MPI libraries use an interface called Process Management Interface (PMI) to interact with the job launcher
I want to verify here that ideally we should be able to start MPI from the existing dask-worker processes, not from starting new processes as would be done with SLURM/etc.. Not a hard requirement, but definitely a nice-to-have. Is this goal consistent with PMI?
Also, the support in OpenMPI appears broken right now, see this open bug and comments say it is low on priority: open-mpi/ompi#3458
Ah yes! I remember stumbling across this back when I was originally exploring this stuff ~2 years ago. So my memory of OpenMPI was incorrect. In which case it was Intel MPI where these APIs do work.
From what I remember, MVAPICH doesn't support these APIs either (I think you actually get an "We don't support this function!" message if you try and use them).
This is indeed problematic because you don't want to pin yourself to a non-free MPI.
I prefer the approach of DASK providing a PMI implementation compatible with MPI libraries though it is a much larger effort compared to using DPM.
These is very dangerous territory. You would be programming to implementation specific details that may or may not be portable across MPI implementations.
The "right" solution here is to make use of the APIs in the standard. Unfortunately, support for these APIs is sparse. Therefore, though it may not be fast, the best long-term solution is to complain (loudly) to the various implementations that this feature is important to us, and they are out of compliance with the MPI standard by not supporting these features.
I agree with @jrhemstad, here. It is unfortunate that the MPI standard is...well...not very standard. However, this is the only appropriate way forward. We need to stick to the MPI specification. ...and for MPI implementations that claim to be open source, perhaps we can suggest (or even go so far as providing) implementations of the missing API.
A set of slides from an MPI BoF at SC. Link from @jrhemstad
https://www.mpi-forum.org/bofs/2018-11-sc/sessions.pdf
So based on outside conversation and further investigation, it seems like it is worth the effort of exploring using PMI or PMIx directly.
https://github.com/pmix/pmix
I only recently learned of this effort, but it looks to have wide support among the MPI implementations and contributors that matter. Process start-up is something that is outside of the MPI standard, so I believe that it is fine to explore options outside of the MPI_Comm_accept/connect
APIs (though we should still push on getting that fixed).
It may be worth having a conversation with the PMIx devs about our use case and how it maps to their design goals. @rhc54 is probably the guy to talk to.
I think you'll find that the PMIx Groups work has what you need: https://github.com/pmix/pmix-standard/pull/139
The implementation is already in PMIx master branch: https://github.com/pmix/pmix
The MPI Sessions WG is exploring it for what you have described - dynamic async construction of MPI communicators.
@rhc54 Thanks Ralph! We'll take a look.
For transparency, it appears that NVIDIA MPI folks have started conversations about this topic internally. We'll see what surfaces...
@mrocklin Out of curiosity, can you say more about this? Are they looking at using the PMIx Group capability, or some other approach?
Honestly I've stopped tracking the internal chatter, I'll ask someone there to summarize the situation and report up though.
Restating Dask's problem statement for clarity:
- Dask would like to be able to spawn
N
workers independent of MPI (i.e. nompirun
) - During the Dask workflow, Dask may dynamically resize
N
for whatever reason - At some point during a Dask workflow, we want to execute an MPI job across
M
ranks, where1 <= M <= N
. - For the duration of the MPI job, Dask will disable all dynamic process functionality (anything that would spawn or destroy a Dask worker) such that
N
is fixed for the duration of the MPI job - The MPI ranks would be accessing data given to them by the Dask workers
- Dask does not want to have to spawn new processes in order to execute the MPI job
- Thus, this requires using the existing Dask workers as MPI processes
- When the MPI job completes, Dask continues on doing other Dask work and re-enables the dynamic resizing of
N
until the next MPI job.
The fundamental issue with the above problem statement is that if we do not want to spawn new processes to do the MPI job, this requires calling MPI_Init
on the Dask worker, and then MPI_Finalize
once the MPI job is complete, i.e., calling MPI_Init
and MPI_Finalize
many times throughout the lifetime of the Dask worker.
However, this is expressly forbidden by the MPI Standard. Some have suggested we just "do it anyways" and exploit implementation specific behavior, but I think this is a non-starter.
I had lovely conversation with @rhc54 @jsquyres @hppritcha (and others I don't know the github handles for) where I introduced them to the above problem statement and we had a conversation about how we can solve it. For those of you who were in the call, please correct or clarify anything I've said that doesn't sound right.
In short, the result of the conversation is that MPI Sessions and https://github.com/mpi-forum/mpi-issues/issues/103 looks to be our best path forward for a long-term solution. Sessions came about in-part due to frustrations with the single init/finalize requirement. They are targeted for the MPI 4.0 standard, which is slated for early 2020. OpenMPI currently has an experimental prototype implementation of Sessions that we could begin experimenting with in the short-term.
I'm not going to try and give a functionally complete summary of MPI Sessions because I am not qualified to do so. However, this is my limited understanding:
An MPI session allows you to query the "resource manager" RM (think Dask scheduler here) for a list of "process sets" that can participate in an MPI job. For example, Dask workers [0, N)
can be a process set, workers [0, N/2)
and [N/2, N)
could be additional process sets, it's any arbitrary grouping of processes you want with a given identifier.
You use one of the available set of processes to initialize a Session and then create your MPI group/communicator and use that as you would any other MPI communicator* for your MPI job.
You can initialize/finalize a Session many times throughout the lifetime of a process, and MPI Job i
can use a different process set than MPI Job i + 1
. Thus, we've satisfied our problem statement in that we can use the existing Dask processes for our MPI job, and between MPI jobs, Dask is free to dynamically size the number of workers.
For OpenMPI, PMIx
is the layer below MPI that is used to maintain the "process sets" that are available to use to create a Session. The Dask scheduler would need to be modified to "speak" PMIx in order to setup the process sets that can be consumed by MPI Sessions. Fortunately, I've been told by @rhc54 that Python bindings from PMIx are already in the works.
Furthermore, Dask can also be modified (or an add-on) to do all of the work of setting up the communicator from the process set/Session, and that communicator can then just be handed off to whatever C/C++ library that is using MPI. In this way, no modification is necessary in the underlying C/C++ library using MPI**.
Finally, I believe the folks on the OpenMPI call that they were excited about having a solid use-case for Sessions and expressed interest in staying in close communication about our efforts here. It was my impression that there could be a fruitful opportunity to collaborate here.
*
Note that this precludes any library that is using MPI_COMM_WORLD
directly, but that's bad practice anyways and we should push against any library that does this to change the way their library is implemented to accept a communicator to use as part of their API.
** Similar to *, the libraries API needs to be defined that it just accepts a communicator rather than relying on MPI_COMM_WORLD
.
@jrhemstad Thanks for the links on mpi-sessions. Those were useful! Like we already discussed this at several places, there are the following cases that we can consider how to put dask and mpi together. I don't see how mpi-sessions would help us in any of these cases. May be I'm missing something here?
Case1a: If we are going with 'dask-first' approach and further decide to reuse dask-worker processes for MPI tasks, we still need to use the traditional 'MPI_Init' anyways to make these processes become MPI-aware, right? If so, how would mpi-sessions help us in this case? Maybe you meant it'll be useful when we want to regroup those processes differently for cudf part and the cuML part?
Case1b: If we are going with 'dask-first' approach but dask spawning new MPI processes for the current task (eg: running a cuML-algo), we might as well get its communicator and go ahead, no?
Case2: If we are going with 'mpi-first' approach (as in Matt's blog on dask-mpi), we still have to deal with the complexities due to the 2 extra mpi ranks being launched. Even here, we could just create a communicator out of those 'actual' N-2 ranks and use them for the cuML work, right?
I think @hppritcha would be the best source for MPI Sessions questions.
Case1a: If we are going with 'dask-first' approach and further decide to reuse dask-worker processes for MPI tasks, we still need to use the traditional 'MPI_Init' anyways to make these processes become MPI-aware, right? If so, how would mpi-sessions help us in this case?
@teju85
Case 1a is what we are ideally targeting with Dask.
@rhc54 or @hppritcha correct me if I am wrong, but Sessions help us because we do not have to explicitly call MPI_Init
, instead when initializing a Session, it will handle initializing the MPI Library for us if it has not already been done.
In this way, we do not have the problem of calling MPI_Init
/MPI_Finalize
multiple times within the same process.
See slide 50+ here: https://raw.githubusercontent.com/wiki/mpiwg-sessions/sessions-issues/2016-12-12-webex/2016-12-12-webex.pptx