distributed
distributed copied to clipboard
Tell workers when their peers have left (so they don't hang fetching data from them)
When a worker breaks its connection to the scheduler, or goes too long without heartbeating, the scheduler removes it and reschedules its tasks to run elsewhere. However, other works may continue trying to fetch data from it far beyond that point. This can lead to something that feels like a deadlock, but isn't quite (if you wait long enough, it will resolve), where workers are waiting for their timeouts to expire requesting data from peers that no longer exist.
This is significantly exacerbated by increasing connection timeouts or retries. For example, if you set distributed.comm.retry.count = 5 (default is 0) and distributed.comm.timeouts.connect = 60s (default is 30s), you might experience a 5-minute deadlock as a worker keeps trying to connect to its dead peer, until it finally gives up and asks the scheduler for a new peer to try. If you set distributed.comm.retry.count = 5 and distributed.comm.timeouts.tcp = 5m (default is 30s), and a peer worker is almost out of memory and freezes up but doesn't close its TCP connection (https://github.com/dask/distributed/issues/6110 https://github.com/dask/distributed/issues/6208 https://github.com/dask/distributed/issues/6177), you could have a 25-minute deadlock.
Rather than relying solely on connection timeouts on the worker side, the scheduler should probably inform workers when one of their peers has left. The scheduler is a good source of truth, because workers already have to heartbeat to it, and maintain a long-lived connection (the batched stream).
But naively broadcasting a remove-worker message to every worker, every time any worker leaves, probably wouldn't scale well to large clusters. Ideally, we would only inform the workers that are fetching data from the departed worker. We could determine this from set(wts.processing_on for ts in removed_ws.has_what for wts in ts.waiters if wts.processing_on) (but something more specific, since not every worker needs to hear the updates about every task).
In that message, we should also include the new who_has for the task, with the dead worker removed. (And ensure this actually cancels the gather_dep on workers, and doesn't deadlock them if the task has already transitioned to cancelled in the meantime.)
cc @crusaderky @fjetter
I've talked about this in various other issues and called this a "circuit breaker" pattern.
I think the "remove-worker" message (I would probably use a slightly different name) should then abort or close all open connections to that worker. This would allow all code waiting for a connection to get an OSError similar to what would happen if the timeout is actually hit.
That should be a straight forward implementation and we could use a feature toggle to role this behavior out carefully
Ideally, we would only inform the workers that are fetching data from the departed worker.
This is not that simple for pending AMM acquire-replicas requests, as the scheduler today doesn't track them. Although in this case I think it would be ok to just state on the scheduler.TaskState "this worker may have a pending acquire-replicas going on" and send the remove-worker just in case.
for this we want to be able to cause all checked out comm use for a particular address (the removed worker) to OSError. ConnectionPool.remove(downed_worker) does this, but we also want connection attempts (tasks in self._connecting) to OSError would it make sense for ConnectionPool.remove to also cancel those?
additionally as the problem here is exacerbated by the retry_operation in get_data_from_worker https://github.com/dask/distributed/blob/d74f50060cfbf1891654e9301447c958910582f4/distributed/worker.py#L2872 would it be better to move the retry logic into _handle_gather_dep_network_failure so that instead of assuming the network error is always fatal treat a network error as a busy worker, so that retries are round-robin rather than just hammering the same worker
I think we can safely remove the retry for get_data_from_worker. I believe the worker willhandle this gracefully. It will remove the worker from its internal bookkeeping and, if necessary, asks the scheudler for help.
but we also want connection attempts (tasks in self._connecting) to OSError would it make sense for ConnectionPool.remove to also cancel those?
Yes, that makes sense. I suggest to do this in a dedicated PR. Should be easily isolated from the other changes necessary for this issue.
There are three tasks that can be worked on separately
- The scheduler tells all workers that a worker left. The worker then adjusts internal state, e.g. who_has dicts
- The ConnectionPool removes all available comms. This includes cancellation of currently running connection attempts
- Retries
workers.from get_data_from_workersis moved up the stack toutils_comms.gather_from_workers