dask-mpi
dask-mpi copied to clipboard
Worker raises `CommClosedError` on client shutdown
Describe the issue:
I'm trying out a simple hello-world style dask-mpi
example, and the computation returns the right result, but I'm getting exceptions when the client finishes. I'm running the below script under Slurm as srun -n3 python repro.py
, and the error is:
2022-10-05 14:05:41,510 - distributed.worker - INFO - Connection to scheduler broken. Closing without reporting. ID: Worker-a2bfbff7-5dcc-49da-868a-08c403ba78f9 Address tcp://10.250.145.105:38331 Status: Status.closing
2022-10-05 14:05:41,510 - distributed.batched - INFO - Batched Comm Closed <TCP (closed) Worker->Scheduler local=tcp://10.250.145.105:42110 remote=tcp://10.250.145.105:45101>
Traceback (most recent call last):
File "/mnt/home/lgarrison/scc/daskdistrib/venv8/lib/python3.8/site-packages/distributed/batched.py", line 115, in _background_send
nbytes = yield coro
File "/mnt/sw/nix/store/db63z7j5w4n84c625pv5b57m699bnbws-python-3.8.12-view/lib/python3.8/site-packages/tornado/gen.py", line 762, in run
value = future.result()
File "/mnt/home/lgarrison/scc/daskdistrib/venv8/lib/python3.8/site-packages/distributed/comm/tcp.py", line 269, in write
raise CommClosedError()
distributed.comm.core.CommClosedError
I thought this might be related to #87, but I'm running on Python 3.8 and there's just an exception, no hang.
Am I doing something wrong? It looks to me like the worker is complaining because the scheduler shuts down before the worker does. Is this expected? If I manually force the workers to shut down before the client and scheduler do, with:
def closeall(dask_scheduler):
for w in dask_scheduler.workers:
dask_scheduler.close_worker(w)
[...]
client.run_on_scheduler(closeall)
then everything exits with no exceptions. But this feels like a hack... am I missing something?
Minimal Complete Verifiable Example:
import dask_mpi
from distributed import Client
def f(a):
return a + 1
def main():
dask_mpi.initialize()
with Client() as client:
future = client.submit(f, 1)
res = future.result()
print(f'future returned {res}')
if __name__ == '__main__':
main()
Full log:
(venv8) lgarrison@scclin021:~/scc/daskdistrib$ srun -n3 python ./repro_commclosed.py
2022-10-05 14:05:40,460 - distributed.http.proxy - INFO - To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
2022-10-05 14:05:40,527 - distributed.scheduler - INFO - State start
2022-10-05 14:05:40,533 - distributed.scheduler - INFO - Scheduler at: tcp://10.250.145.105:45101
2022-10-05 14:05:40,533 - distributed.scheduler - INFO - dashboard at: :8787
2022-10-05 14:05:40,566 - distributed.worker - INFO - Start worker at: tcp://10.250.145.105:38331
2022-10-05 14:05:40,566 - distributed.worker - INFO - Listening to: tcp://10.250.145.105:38331
2022-10-05 14:05:40,566 - distributed.worker - INFO - Worker name: 2
2022-10-05 14:05:40,566 - distributed.worker - INFO - dashboard at: 10.250.145.105:34979
2022-10-05 14:05:40,566 - distributed.worker - INFO - Waiting to connect to: tcp://10.250.145.105:45101
2022-10-05 14:05:40,566 - distributed.worker - INFO - -------------------------------------------------
2022-10-05 14:05:40,566 - distributed.worker - INFO - Threads: 1
2022-10-05 14:05:40,566 - distributed.worker - INFO - Memory: 7.81 GiB
2022-10-05 14:05:40,566 - distributed.worker - INFO - Local Directory: /tmp/dask-worker-space/worker-az8e_3tm
2022-10-05 14:05:40,566 - distributed.worker - INFO - -------------------------------------------------
2022-10-05 14:05:41,354 - distributed.scheduler - INFO - Receive client connection: Client-5274d531-44d8-11ed-94ba-4cd98f221a38
2022-10-05 14:05:41,356 - distributed.core - INFO - Starting established connection
2022-10-05 14:05:41,385 - distributed.scheduler - INFO - Register worker <WorkerState 'tcp://10.250.145.105:38331', name: 2, status: init, memory: 0, processing: 0>
2022-10-05 14:05:41,386 - distributed.scheduler - INFO - Starting worker compute stream, tcp://10.250.145.105:38331
2022-10-05 14:05:41,386 - distributed.core - INFO - Starting established connection
2022-10-05 14:05:41,386 - distributed.worker - INFO - Registered to: tcp://10.250.145.105:45101
2022-10-05 14:05:41,386 - distributed.worker - INFO - -------------------------------------------------
2022-10-05 14:05:41,387 - distributed.core - INFO - Starting established connection
2022-10-05 14:05:41,500 - distributed.scheduler - INFO - Remove client Client-5274d531-44d8-11ed-94ba-4cd98f221a38
2022-10-05 14:05:41,500 - distributed.scheduler - INFO - Remove client Client-5274d531-44d8-11ed-94ba-4cd98f221a38
2022-10-05 14:05:41,500 - distributed.scheduler - INFO - Close client connection: Client-5274d531-44d8-11ed-94ba-4cd98f221a38
future returned 2
2022-10-05 14:05:41,506 - distributed.scheduler - INFO - Receive client connection: Client-53080c1f-44d8-11ed-94ba-4cd98f221a38
2022-10-05 14:05:41,507 - distributed.core - INFO - Starting established connection
2022-10-05 14:05:41,508 - distributed.worker - INFO - Run out-of-band function 'stop'
2022-10-05 14:05:41,509 - distributed.scheduler - INFO - Scheduler closing...
2022-10-05 14:05:41,509 - distributed.scheduler - INFO - Scheduler closing all comms
2022-10-05 14:05:41,509 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://10.250.145.105:38331', name: 2, status: running, memory: 0, processing: 0>
2022-10-05 14:05:41,509 - distributed.worker - INFO - Stopping worker at tcp://10.250.145.105:38331
2022-10-05 14:05:41,510 - distributed.core - INFO - Removing comms to tcp://10.250.145.105:38331
2022-10-05 14:05:41,510 - distributed.scheduler - INFO - Lost all workers
2022-10-05 14:05:41,510 - distributed.worker - INFO - Connection to scheduler broken. Closing without reporting. ID: Worker-a2bfbff7-5dcc-49da-868a-08c403ba78f9 Address tcp://10.250.145.105:38331 Status: Status.closing
2022-10-05 14:05:41,510 - distributed.batched - INFO - Batched Comm Closed <TCP (closed) Worker->Scheduler local=tcp://10.250.145.105:42110 remote=tcp://10.250.145.105:45101>
Traceback (most recent call last):
File "/mnt/home/lgarrison/scc/daskdistrib/venv8/lib/python3.8/site-packages/distributed/batched.py", line 115, in _background_send
nbytes = yield coro
File "/mnt/sw/nix/store/db63z7j5w4n84c625pv5b57m699bnbws-python-3.8.12-view/lib/python3.8/site-packages/tornado/gen.py", line 762, in run
value = future.result()
File "/mnt/home/lgarrison/scc/daskdistrib/venv8/lib/python3.8/site-packages/distributed/comm/tcp.py", line 269, in write
raise CommClosedError()
distributed.comm.core.CommClosedError
Environment:
- Dask version: 2022.9.2
- Python version: 3.8.12
- Operating System: Rocky 8
- Install method (conda, pip, source): pip
Actually if I submit enough jobs to to the worker (1000+), the run_on_scheduler(closeall)
doesn't seem to stop the exception from happening.
Does calling client.shutdown()
before leaving the client context manager resolve this?
No, same error. The job also hangs for 30s when leaving the context, then fails with a ConnectionRefusedError
to the scheduler. I figure this is because the shutdown is triggered twice: once with the explicit call to client.shutdown()
, then again when leaving the context.
These are errors I've seen before. I can't remember what is going on right now, but I'll look into it when I'm back in the office in a week. Sorry for the wait.
Thanks! Let me know if you need me to run more tests.
Hi @kmpaul, have you had a chance to look at this?
@lgarrison: I have not, yet. I'm sorry. This week got very messy. I absolutely will look into this on Monday. I'm sorry for the delay.
@lgarrison: I've verified the error on both Ubuntu and Rocky Linux. This is actually a resurgence of #88, which notes exactly the issue that you are seeing. I had thought this was fixed with #89, but it is clearly coming back.
(Note that #89 actually changed the atexit
-called send_close_signal
function to be exactly what @jacobtomlinson suggested you try, namely client.shutdown()
.)
...And I can further verify that these CommClosedErrors
occur even for Python 3.10.6, on Linux and Windows.
Due to the problems noted in #88 related to errors occurring during shutdown not resulting in non-0 exit codes, the PyTest suite is not catching errors. I think this is a significant problem and one that I will need some time to investigate and fix. In the meantime, @lgarrison, it doesn't look like the exit-time errors are resulting in incorrect results. Are you okay continuing "as-is" and just ignoring the CommClosedErrors
until I can sort them out?
Yes, I can ignore the errors for now. I'm working on instructions for scientists using our local HPC resources to farm out work to the cluster using dask-mpi, and I think these errors would be a source of confusion, even if the code executes correctly. So I can continue my experimentation, but I'll probably wait until this is fixed to start teaching users about it.
(Alternatively, if there's a workflow, dask-based or otherwise, that you like to use for dynamically dispatching independent, Python-based tasks to a Slurm allocation, I'd be interested to hear about it!)
Yes, I can ignore the errors for now. I'm working on instructions for scientists using our local HPC resources to farm out work to the cluster using dask-mpi, and I think these errors would be a source of confusion, even if the code executes correctly. So I can continue my experimentation, but I'll probably wait until this is fixed to start teaching users about it.
Ok. I'll keep plugging away to try to get a solution as soon as possible.
Sigh. @lgarrison: I've tracked down the errors to something beyond Dask-MPI. You can test if you are seeing the same thing as me, but I'm seeing CommClosedErrors
during client shutdowns even without Dask-MPI!
MVP:
With the latest versions of Dask and Distributed (on Linux), do the following.
In Terminal 1:
$ dask-scheduler
Note the ADDRESS
in the Scheduler at: ADDRESS:8786
log message.
In Terminal 2:
$ dask-worker ADDRESS:8786
where ADDRESS
is the scheduler IP address (without the port number).
In Terminal 3:
$ python
Python 3.10.6 | packaged by conda-forge | (main, Aug 22 2022, 20:36:39) [GCC 10.4.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from distributed import Client
>>> client = Client('ADDRESS:8786')
>>> client.shutdown()
where ADDRESS
is the scheduler IP address noted above.
Results
In Terminal 1, the scheduler shuts down appropriately without errors.
In Terminal 2, the worker shuts down, but not without error. The logs of the worker after client.shutdown()
is called are:
2022-10-25 15:54:14,502 - distributed.worker - INFO - Stopping worker at ADDRESS:40141
2022-10-25 15:54:14,503 - distributed.worker - INFO - Connection to scheduler broken. Closing without reporting. ID: Worker-ce265b58-eecf-4335-bd1e-d0fc3b07e93d Address ADDRESS:40141 Status: Status.closing
2022-10-25 15:54:14,503 - distributed.batched - INFO - Batched Comm Closed <TCP (closed) Worker->Scheduler local=ADDRESS:33682 remote=ADDRESS:8786>
Traceback (most recent call last):
File ".../lib/python3.10/site-packages/distributed/batched.py", line 115, in _background_send
nbytes = yield coro
File ".../lib/python3.10/site-packages/tornado/gen.py", line 762, in run
value = future.result()
File ".../lib/python3.10/site-packages/distributed/comm/tcp.py", line 269, in write
raise CommClosedError()
distributed.comm.core.CommClosedError
2022-10-25 15:54:14,507 - distributed.nanny - INFO - Worker closed
2022-10-25 15:54:14,508 - distributed.nanny - ERROR - Worker process died unexpectedly
2022-10-25 15:54:14,699 - distributed.nanny - INFO - Closing Nanny at 'ADDRESS:46611'.
2022-10-25 15:54:14,699 - distributed.dask_worker - INFO - End worker
In Terminal 3, where the client
is running, a CommClosedError
appears every time the client heartbeat is called (about once every 5 seconds):
2022-10-25 15:54:15,825 - tornado.application - ERROR - Exception in callback <bound method Client._heartbeat of <Client: 'ADDRESS:8786' processes=1 threads=8, memory=7.63 GiB>>
Traceback (most recent call last):
File ".../lib/python3.10/site-packages/tornado/ioloop.py", line 905, in _run
return self.callback()
File ".../lib/python3.10/site-packages/distributed/client.py", line 1390, in _heartbeat
self.scheduler_comm.send({"op": "heartbeat-client"})
File ".../lib/python3.10/site-packages/distributed/batched.py", line 156, in send
raise CommClosedError(f"Comm {self.comm!r} already closed.")
distributed.comm.core.CommClosedError: Comm <TCP (closed) Client->Scheduler local=ADDRESS:33802 remote=ADDRESS:8786> already closed.
And this does not stop repeating until the Python process is exited (e.g., exit()
).
Interestingly, you can avoid the error that appears in Terminal 2 in the worker logs if you call client.retire_workers()
before calling client.shutdown()
, but the CommClosedError
errors in the Client application (Terminal 3) are still present.
This happens with the latest versions of Dask and Distributed (from Conda-Forge) on Windows, too.
Interesting, thanks! I was also able to reproduce this without dask-mpi following your instructions. I confirm that client.retire_workers()
avoids the error, but when I add it to my reproduction script before client shutdown, it doesn't help. Sleeping between the retirement and shutdown doesn't help either.
So, should I open an issue in an upstream repo? Would that be dask/distributed
?
I'm opening an upstream issue right now. I'm seeing if I can figure out with Dask version introduced the regression. Then I'll submit the issue and report it here.
There is definitely some strangeness produced by Python's async
functions, here. When tests are run in dask-mpi
, for example, and an async
error occurs at shutdown, the process still ends with an exit code of 0
. That really obscures problems that occur from changes upstream.
Ok. The Dask Distributed issue has been created (dask/distributed#7192). I'm not sure how much more I want to work on Dask-MPI until I hear back about that issue, lest I spend too much time trying to design around an upstream bug. So, I'll return to Dask-MPI if/when I hear about a solution to dask/distributed#7192.
Interesting, thanks! I was also able to reproduce this without dask-mpi following your instructions. I confirm that
client.retire_workers()
avoids the error, but when I add it to my reproduction script before client shutdown, it doesn't help. Sleeping between the retirement and shutdown doesn't help either.
@lgarrison: If you are following what is happening in dask/distributed#7192, then you probably know that I tried the sleep
trick in my test and it actually worked. That was not your experience. How long did you sleep
for between retire_workers
and shutdown
?
(My thinking is that with a large number of workers, the retire_workers
call could take quite a while to complete.)