dask-mpi
dask-mpi copied to clipboard
Cluster shutdown hangs in batch mode on Linux Python >3.8
What happened:
When Dask-MPI is used in batch mode (i.e., using initialize()
) on Linux with Python >3.8, it does not properly shut down the scheduler and worker processes when the client script completes. It hangs during shutdown. This means that the Python 3.9 and Python 3.10 tests of dask_mpi/tests/test_core.py
and dask_mpi/teststest_no_exit.py
hangs and never finish on CI.
Note that this only occurs on Linux. MacOS executes without hanging.
What you expected to happen:
When the client script completes, the scheduler and worker processes should be shut down without error or hanging.
Minimal Complete Verifiable Example:
Manually executing the dask_mpi/tests/core_basic.py
script, with Python 3.9+ on Linux, like so:
mpirun -l -np 4 python dask_mpi/tests/core_basic.py
results in:
Full Logs
[0] 2022-04-20 19:45:00,550 - distributed.scheduler - INFO - State start
[0] 2022-04-20 19:45:00,556 - distributed.scheduler - INFO - Clear task state
[0] 2022-04-20 19:45:00,557 - distributed.scheduler - INFO - Scheduler at: tcp://172.17.0.2:36407
[0] 2022-04-20 19:45:00,557 - distributed.scheduler - INFO - dashboard at: :8787
[2] 2022-04-20 19:45:00,573 - distributed.worker - INFO - Start worker at: tcp://172.17.0.2:45639
[2] 2022-04-20 19:45:00,573 - distributed.worker - INFO - Listening to: tcp://172.17.0.2:45639
[2] 2022-04-20 19:45:00,574 - distributed.worker - INFO - dashboard at: 172.17.0.2:37653
[2] 2022-04-20 19:45:00,574 - distributed.worker - INFO - Waiting to connect to: tcp://172.17.0.2:36407
[2] 2022-04-20 19:45:00,574 - distributed.worker - INFO - -------------------------------------------------
[2] 2022-04-20 19:45:00,574 - distributed.worker - INFO - Threads: 1
[2] 2022-04-20 19:45:00,575 - distributed.worker - INFO - Memory: 0.96 GiB
[2] 2022-04-20 19:45:00,576 - distributed.worker - INFO - Local Directory: /root/dask-mpi/dask_mpi/tests/dask-worker-space/worker-sev4vqjo
[3] 2022-04-20 19:45:00,579 - distributed.worker - INFO - Start worker at: tcp://172.17.0.2:38821
[3] 2022-04-20 19:45:00,580 - distributed.worker - INFO - Listening to: tcp://172.17.0.2:38821
[3] 2022-04-20 19:45:00,580 - distributed.worker - INFO - dashboard at: 172.17.0.2:45157
[3] 2022-04-20 19:45:00,580 - distributed.worker - INFO - Waiting to connect to: tcp://172.17.0.2:36407
[3] 2022-04-20 19:45:00,581 - distributed.worker - INFO - -------------------------------------------------
[3] 2022-04-20 19:45:00,581 - distributed.worker - INFO - Threads: 1
[3] 2022-04-20 19:45:00,581 - distributed.worker - INFO - Memory: 0.96 GiB
[3] 2022-04-20 19:45:00,581 - distributed.worker - INFO - Local Directory: /root/dask-mpi/dask_mpi/tests/dask-worker-space/worker-08kqqntu
[3] 2022-04-20 19:45:00,582 - distributed.worker - INFO - -------------------------------------------------
[2] 2022-04-20 19:45:00,585 - distributed.worker - INFO - -------------------------------------------------
[0] 2022-04-20 19:45:00,998 - distributed.scheduler - INFO - Receive client connection: Client-5d823051-c0e2-11ec-8020-0242ac110002
[0] 2022-04-20 19:45:01,009 - distributed.core - INFO - Starting established connection
[0] 2022-04-20 19:45:01,053 - distributed.scheduler - INFO - Register worker <WorkerState 'tcp://172.17.0.2:45639', name: 2, status: undefined, memory: 0, processing: 0>
[0] 2022-04-20 19:45:01,054 - distributed.scheduler - INFO - Starting worker compute stream, tcp://172.17.0.2:45639
[0] 2022-04-20 19:45:01,054 - distributed.core - INFO - Starting established connection
[2] 2022-04-20 19:45:01,055 - distributed.worker - INFO - Registered to: tcp://172.17.0.2:36407
[2] 2022-04-20 19:45:01,056 - distributed.worker - INFO - -------------------------------------------------
[0] 2022-04-20 19:45:01,057 - distributed.scheduler - INFO - Register worker <WorkerState 'tcp://172.17.0.2:38821', name: 3, status: undefined, memory: 0, processing: 0>
[2] 2022-04-20 19:45:01,059 - distributed.core - INFO - Starting established connection
[0] 2022-04-20 19:45:01,060 - distributed.scheduler - INFO - Starting worker compute stream, tcp://172.17.0.2:38821
[0] 2022-04-20 19:45:01,060 - distributed.core - INFO - Starting established connection
[3] 2022-04-20 19:45:01,060 - distributed.worker - INFO - Registered to: tcp://172.17.0.2:36407
[3] 2022-04-20 19:45:01,061 - distributed.worker - INFO - -------------------------------------------------
[3] 2022-04-20 19:45:01,063 - distributed.core - INFO - Starting established connection
[0] 2022-04-20 19:45:01,325 - distributed.scheduler - INFO - Remove client Client-5d823051-c0e2-11ec-8020-0242ac110002
[0] 2022-04-20 19:45:01,325 - distributed.scheduler - INFO - Remove client Client-5d823051-c0e2-11ec-8020-0242ac110002
[0] 2022-04-20 19:45:01,326 - distributed.scheduler - INFO - Close client connection: Client-5d823051-c0e2-11ec-8020-0242ac110002
[1] Error in atexit._run_exitfuncs:
[1] Traceback (most recent call last):
[1] File "/root/miniconda/envs/py-3.9/lib/python3.9/site-packages/distributed/utils.py", line 349, in f
[1] result = yield future
[1] File "/root/miniconda/envs/py-3.9/lib/python3.9/site-packages/tornado/gen.py", line 762, in run
[1] value = future.result()
[1] File "/root/miniconda/envs/py-3.9/lib/python3.9/site-packages/distributed/client.py", line 1193, in _start
[1] await self._ensure_connected(timeout=timeout)
[1] File "/root/miniconda/envs/py-3.9/lib/python3.9/site-packages/distributed/client.py", line 1256, in _ensure_connected
[1] comm = await connect(
[1] File "/root/miniconda/envs/py-3.9/lib/python3.9/site-packages/distributed/comm/core.py", line 289, in connect
[1] comm = await asyncio.wait_for(
[1] File "/root/miniconda/envs/py-3.9/lib/python3.9/asyncio/tasks.py", line 479, in wait_for
[1] return fut.result()
[1] File "/root/miniconda/envs/py-3.9/lib/python3.9/site-packages/distributed/comm/tcp.py", line 439, in connect
[1] stream = await self.client.connect(
[1] File "/root/miniconda/envs/py-3.9/lib/python3.9/site-packages/tornado/tcpclient.py", line 265, in connect
[1] addrinfo = await self.resolver.resolve(host, port, af)
[1] File "/root/miniconda/envs/py-3.9/lib/python3.9/site-packages/distributed/comm/tcp.py", line 424, in resolve
[1] for fam, _, _, _, address in await asyncio.get_running_loop().getaddrinfo(
[1] File "/root/miniconda/envs/py-3.9/lib/python3.9/asyncio/base_events.py", line 861, in getaddrinfo
[1] return await self.run_in_executor(
[1] File "/root/miniconda/envs/py-3.9/lib/python3.9/asyncio/base_events.py", line 819, in run_in_executor
[1] executor.submit(func, *args), loop=self)
[1] File "/root/miniconda/envs/py-3.9/lib/python3.9/concurrent/futures/thread.py", line 169, in submit
[1] raise RuntimeError('cannot schedule new futures after '
[1] RuntimeError: cannot schedule new futures after interpreter shutdown
HANGS HERE!!! Requires CTRL-C to exit.
Anything else we need to know?:
I believe this is due to changes in asyncio
that occurred with the release of Python 3.9+. In particular, it seems that the asyncio.wait_for
function blocks when cancelling a task due to timeout until the task has finished cancellation. (See the Python 3.9 release notes) This appears to be due to the dask_mpi.initialize()
shutdown procedure depending upon an asyncio
call taking place in an atexit
handler. It seems that at the time the atexit
handler is called, the asyncio
loop has been closed, resulting in the RuntimeError: cannot schedule new futures after interpreter shutdown
and the subsequent hanging.
Environment:
- Dask version:
2022.4.1
- Python version:
3.9.12
- Operating System: Linux
- Install method (conda, pip, source): conda
Note that even with the merge of #89, these errors still occur.
One potential solution to this is to not use an atexit
handler to shut down the cluster. Instead, require that the user shutdown the cluster manually at the end of the script. This would effectively mean that the canonical use case of dask_mpi.initialize()
would change from:
initialize()
with Client() as c:
# dask stuff
with the Client.shutdown()
method called by the atexit
handler, to
initialize()
with Client() as c:
# dask stuff
c.shutdown()
with the c.shutdown()
call above being required or the process will hang. We could even encapsulate the c.shutdown()
in a function that bookends the initialize()
method like so:
def finalize(...):
with Client() as c:
c.shutdown()
so that the dask-mpi client script might look like:
initialize()
with Client() as c:
# dask stuff
# maybe more stuff
finalize()
However, this definitely breaks backwards compatibility. It does fix the above hanging, though. Is this acceptable?
CC @mrocklin @jacobtomlinson
Possible API changes
If we have to break backwards compatibility, then I want to suggest some other changes that might work with the suggested changes above.
With @joezuntz's changes to allow users to pass into Dask-MPI an existing MPI communicator object and turn off explicit "shutdown" of the Dask cluster, there is now a foreseeable mechanism for stopping/restarting a Dask-MPI cluster from within a client batch process. With my understanding, I think this requires 3 components:
- a Dask-MPI
initialize()
function that starts the scheduler and worker processes and subsequently block on all MPII ranks except the client rank (e.g., rank==1), - a Dask-MPI
is_client()
function that returnsTrue
when the MPI rank is equal to the client rank, and - a Dask-MPI
finalize()
function that stops the scheduler and worker processes.
The general outline of how to use these three functions together would be like so:
from dask_mpi import initialize, is_client, finalize
initialize() # Blocks scheduler and worker MPI ranks HERE!!!
if is_client():
# Do your client dask operations here
# When the scheduler and worker MPI ranks unblock due to the finalize call below,
# this section will be skipped by the scheduler and worker MPI ranks
finalize() # Everything after this can act like a normal mpi4py script
...This whole thing is starting to look to me like a context manager, no?
I wonder if using a context manager would feel more natural here.
def initialize(...):
with Client() as c:
...
@jacobtomlinson: Yeah. I was thinking of something like:
with MPICluster(...) as cluster, Client(cluster) as c:
if cluster.is_client():
...
where the MPICluster(...)
object would essentially be the initialize(...)
method, except turned into a context manager with finalize()
being called in __exit__()
.
...but I don't like the nested if
pattern. It's fiddly and prone to errors, I think.
This is kind of where I was going with the MPIRunner
class in https://github.com/dask/distributed/pull/4710 and https://github.com/dask/dask-mpi/pull/69. The Cluster
class doesn't quite line up with the use case here, given that the job and resources already exist and we are trying to populate them, rather than create them in the first place. This is why I tried to add Runner
and a sibling to Cluster
for this use case.
However @mrocklin suggested that Cluster
would be a fine way to implement it and adding a new base class only increases complexity. So perhaps it would be good to resurrect #69 but base it on Cluster
instead.
same issue here. I will try python3.8 for now
Experiencing the same, python 3.8.
Can we think of some temporary workaround here? Maybe closing the MPI cluster manually after the c.shutdown or "request killing the workers/check worker count/repeat again if needed/close scheduler"? I do already implement exiting logic manually, so that I don't mind adding a few more lines...
Yeah. If you have time for a PR, that would be great, @evgri243!