distributed icon indicating copy to clipboard operation
distributed copied to clipboard

Scheduler deadlock when using SSHCluster due to stderr blocking

Open apmorton opened this issue 9 months ago • 3 comments

Describe the issue: When using SSHCluster eventually my cluster deadlocks (and dashboard stops responding). Scheduler stack shows it's stuck reporting an asyncio unhandled task exception. My real-world cluster will deadlock pretty consistently after an hour or so of normal task execution. I'm not using run_on_scheduler in the real world, it's just a convenient way to trigger the issue quickly.

The root of the problem seems to be that both distributed.deploy.ssh.Scheduler and distributed.deploy.ssh.Worker stop polling the stdout/stderr pipes shortly after startup, which eventually causes the remote processes pipes to fill up and block if anything in the process writes to them.

Minimal Complete Verifiable Example:

from dask.distributed import (
    SSHCluster,
    worker_client,
    fire_and_forget,
)

async def _raise():
    raise RuntimeError('broken')

def _thing(n):
    with worker_client() as client:
        fire_and_forget(client.run_on_scheduler(_raise, wait=False))
        fire_and_forget(client.submit(_thing, n=n+1))


def main():
    cluster = SSHCluster(
        ['localhost', 'localhost'],
        connect_options=dict(known_hosts=None),
        worker_options=dict(nthreads=1, n_workers=1),
        scheduler_options=dict(dashboard=True),
    )
    with cluster.get_client() as client:
        client.upload_file(__file__)
        client.submit(_thing, n=1).result()
        input('waiting')

if __name__ == '__main__':
    main()

(run with python -c 'import deadlock; deadlock.main()')

When left running this example will eventually deadlock once the scheduler stderr pipe buffer fills.

Below is the py-spy stack trace:

Thread 4175110 (idle): "MainThread"
    emit (logging/__init__.py:1113)
    handle (logging/__init__.py:978)
    callHandlers (logging/__init__.py:1714)
    handle (logging/__init__.py:1644)
    _log (logging/__init__.py:1634)
    error (logging/__init__.py:1518)
    default_exception_handler (asyncio/base_events.py:1785)
    call_exception_handler (asyncio/base_events.py:1811)
    _run_once (asyncio/base_events.py:1937)
    run_forever (asyncio/base_events.py:608)
    run_until_complete (asyncio/base_events.py:641)
    run (asyncio/runners.py:118)
    asyncio_run (distributed/compatibility.py:204)
    main (distributed/cli/dask_spec.py:63)
    invoke (click/core.py:788)
    invoke (click/core.py:1443)
    main (click/core.py:1082)
    __call__ (click/core.py:1161)
    <module> (distributed/cli/dask_spec.py:67)
    _run_code (<frozen runpy>:88)
    _run_module_as_main (<frozen runpy>:198)

And corresponding stack from the kernel side showing we're blocked in a pipe:

[<0>] pipe_wait+0x6f/0xc0
[<0>] pipe_write+0x17b/0x470
[<0>] new_sync_write+0x125/0x1c0
[<0>] __vfs_write+0x29/0x40
[<0>] vfs_write+0xb9/0x1a0
[<0>] ksys_write+0x67/0xe0
[<0>] __x64_sys_write+0x1a/0x20
[<0>] do_syscall_64+0x57/0x190
[<0>] entry_SYSCALL_64_after_hwframe+0x44/0xa9

Anything else we need to know?:

Environment:

  • Dask version: 2024.2.1
  • Python version: 3.11
  • Operating System: Ubuntu 20.04
  • Install method (conda, pip, source): conda

apmorton avatar Mar 30 '25 18:03 apmorton

It seems the errors being generated in my real-world cluster are related to worker disconnection?

2025-03-30 18:32:40,788 - distributed.core - ERROR - Exception while handling op register-client
Traceback (most recent call last):
  File "distributed/core.py", line 970, in _handle_comm
    result = await result
             ^^^^^^^^^^^^
  File "distributed/scheduler.py", line 5683, in add_client
    await self.client_comms[client].close()
          ~~~~~~~~~~~~~~~~~^^^^^^^^
KeyError: 'Client-worker-cd06f8e9-0d94-11f0-a4e1-c6ae71c1169f'
Task exception was never retrieved
future: <Task finished name='Task-261321' coro=<Server._handle_comm() done, defined at distributed/core.py:876> exception=KeyError('Client-worker-cd06f8e9-0d94-11f0-a4e1-c6ae71c1169f')>
Traceback (most recent call last):
  File "distributed/core.py", line 970, in _handle_comm
    result = await result
             ^^^^^^^^^^^^
  File "distributed/scheduler.py", line 5683, in add_client
    await self.client_comms[client].close()
          ~~~~~~~~~~~~~~~~~^^^^^^^^
KeyError: 'Client-worker-cd06f8e9-0d94-11f0-a4e1-c6ae71c1169f'

apmorton avatar Mar 30 '25 18:03 apmorton

Thanks for reporting, sorry you're having problems. Before we dig any deeper can you confirm that the Dask version and Python version on the remote machines matches your local machine exactly?

jacobtomlinson avatar Mar 31 '25 10:03 jacobtomlinson

They do match exactly, yes.

apmorton avatar Mar 31 '25 23:03 apmorton