distributed icon indicating copy to clipboard operation
distributed copied to clipboard

[WIP] MPI communication layer

Open ianthomas23 opened this issue 5 years ago • 11 comments

This is an implementation of the communication layer using MPI, for demonstration purposes. It is far from production standard and much is suboptimal, but I wanted to make the code available for others to look at and discuss. There is a related issue dask/dask-mpi#48.

It comes with a demo to run in demo/demo.py. You will need both mpi4py and dask-mpi, and you can start the demo from the command line using something like

mpirun -np 5 python demo.py

which will create 5 processes (1 Scheduler, 1 Client and 3 Workers). If you set want_sleep = True in the script it will sleep before and after the calculations to give you time to open the dashboard in a web browser. Much logging is performed; each MPI process logs to a different file, e.g. rank3.log, which has proved invaluable for debugging.

The actual MPI code is minimal and it fits fairly well into the Comm/Connector/Listener/Backend architecture except that it doesn't work well with asyncio such that the handling of asynchronous sends and receives is achieved by regular polling, which is far from ideal.

It is too early to talk about performance, but I can't help it. It is poor. And there is significant difference between mpi4py using different MPI implementations, e.g. MPICH vs OpenMPI. But I am mostly concerned about it working correctly at this stage.

Some improvements I have started to think about:

  1. Use of the distributed utilities to convert between messages as python objects and byte streams rather that the (suspected slow) mpi4py equivalents.
  2. Reduce the excessive polling for MPI messages which is currently on a per-Comm basis. Doing this once per MPI rank and passing it to the correct Comm should be much better.

Certainly the latter requires an understanding of how the higher-level Scheduler/Worker/Client layer interacts with the communications layer, and I will no doubt have some questions about this.

Enjoy!

ianthomas23 avatar Oct 01 '20 21:10 ianthomas23

Well this is fun to see :)

cc @kmpaul and @andersy005 from dask-mpi , who might find this interesting

Let's also cc @dalcinl from mpi4py to see if he has interest / time. I think that the point around waiting to recv from many senders simultaneously is an interesting design challenge. I agree in general that polling on each one probably isn't ideal. Ideally there would be some event in MPI itself that we could use to trigger things. MPI wasn't designed for a dynamic application like Dask, but I wouldn't be surprised if there was some internal system that we could hook into here.

mrocklin avatar Oct 02 '20 15:10 mrocklin

@ianthomas23 I'm also curious, was this done just for fun, or are you working on something in particular? I'm also curious to know if there are any performance differences. A computation that you might want to try is something like the following:

from dask.distributed import Client
client = Client()  # or however you set up

import dask.array as da
import time

x = da.random.random((20000, 20000)).persist()

start = time.time()
y = (x + x.T).transpose().sum().compute()
stop = time.time()

print(stop - start)

mrocklin avatar Oct 02 '20 16:10 mrocklin

@ianthomas23 This looks very cool! Thanks for sharing it. (And @mrocklin, thanks for CCing me.)

I won't have time to look at the PR closely for a while, but I'll take a look and respond more next week.

kmpaul avatar Oct 02 '20 16:10 kmpaul

@mrocklin There is no particular problem that I am trying to solve, so I guess we are in "just for fun" territory! I was looking around dask for issues I could help with and when I saw there was some interest in using MPI, for which my skillset is ideally suited, I thought I would take a stab at it.

ianthomas23 avatar Oct 02 '20 18:10 ianthomas23

@mrocklin Polling many senders to receive a message is builtin in MPI, you just specify a ANY_SOURCE wildcard value. The new code in this PR seems to be taking advantage of that.

dalcinl avatar Oct 02 '20 18:10 dalcinl

Oh I see, great. So we're not polling on each Comm every 5ms, we're polling on all comms every 5ms.

@dalcinl you mention thread safety. If mpi4py is threadsafe then another option here would be to set up a thread that just blocks on waiting for a message. Once it comes in it would alert the main asyncio event loop thread which would then respond. This might be more responsive than polling. Thoughts anyone?

mrocklin avatar Oct 02 '20 18:10 mrocklin

@mrocklin I do not know all the details of all the code in this PR. I'm just saying that in MPI you can very well block in a receive call polling many sources, it is a builtin feature of MPI.

About thread safety, mpi4py is certainly thread-safe. However, the backend MPI implemenation maybe not, or users my request various levels of thread support. Besides that detail, given the way MPI implementations work (at least by default), blocking on a recv() call in a thread will make your processor core go 100% CPU usage; this is certainly not nice.

mpi4py provides mpi4py.futures, which simply an MPI implementation of Python's stdlib concurrent.futures. I use threads in the implementation, but I still do polling with probe+sleep on an infinite while loop.

@ianthomas23 Look in mpi4py.futures sources, look for the Backoff helper class, It is a slightly smarter way to handle the pause interval, you may want to implement a similar logic here.

dalcinl avatar Oct 03 '20 13:10 dalcinl

Oh I see, great. So we're not polling on each Comm every 5ms, we're polling on all comms every 5ms.

@mrocklin No, you were correct first time, each Comm (and Listener) is polling separately. At least this means there is plenty of room for improvement.

ianthomas23 avatar Oct 03 '20 17:10 ianthomas23

@dalcinl Thanks for your suggestions on the improved iprobe/recv combination and on mpi4py.futures. I will take a look at both.

ianthomas23 avatar Oct 03 '20 17:10 ianthomas23

@mrocklin Your example computation

x = da.random.random((20000, 20000)).persist()
start = time.time()
y = (x + x.T).transpose().sum().compute()
print('Seconds:', time.time() - start)

using 5 MPI processes (3 workers) on my 4-core i5 laptop takes ~2.2 s using the tcp protocol and ~2.3 s using the mpi protocol, whether using mpich 3.3.2 or openmpi 4.0.5 conda packages. This is much better than I was observing in my demo example which uses an excessive number of small chunks to force lots of communication, for which mpi was 2 to 4 times slower than tcp.

ianthomas23 avatar Oct 03 '20 18:10 ianthomas23

This is as polished as I am going to get it. Now just a single object (per Scheduler/Client/Worker) polls for incoming messages and passes them to the appropriate receiver.

On a single multicore machine with 1 thread per worker (and all the debug stuff turned off) it is about 10% slower than TCP. Under these circumstances it is using shared memory. But a more realistic test across multiple nodes of an Infiniband cluster it is only about half the speed of TCP, so comms were dominating.

I suspect that a problem could be selected to perform better than this, e.g. one that sends a smaller number of larger messages. But ultimately the small message 'chatter' between Scheduler/Client/Workers is slow due to the extra latency introduced by the MPI <-> dask communications. MPI isn't really designed to work the dask event-driven way. Also, all of the low level comms code in dask is an unnecessary overhead here as MPI deals with all of it, but removing it from the distributed MPI code path would be serious refactoring work.

I note that I haven't touched this code for a few months and there have been changes to distributed in the mean time that may make a difference (either way!).

It has been an interesting experiment, but has now dropped off the bottom of my priority list. If someone else wants to continue with it they are welcome to do so.

ianthomas23 avatar Mar 02 '21 08:03 ianthomas23