distributed
distributed copied to clipboard
Deadlock when deserializing a key fails
If a key received from another worker fails to deserialize, it will:
- deadlock the task that needed that key
- break future communication between the workers, by throwing off the order of their requests and responses.
@gen_cluster(
client=True,
nthreads=[("", 1), ("", 1)],
Worker=Nanny, # Nanny necessary to force serialization, not sure why
)
async def test_deserialization_failure_receiving_key(c, s, a, b):
class CantDeserialize:
def __getstate__(self):
return None
def __setstate__(self, state):
raise RuntimeError("can't deserialize me")
f1 = c.submit(CantDeserialize, key="f1", workers=[a.address])
f2 = c.submit(CantDeserialize, key="f2", workers=[b.address])
# `f1` must be transferred to worker `b`
f3 = c.submit(lambda x, y: None, f1, f2, workers=[b.address])
# Currently deadlocks.
# If you comment this out, and don't wait for `f3`, then the test will
# deadlock at `await g3 == 3`, because worker<->worker comms have been messed up.
with pytest.raises(asyncio.CancelledError): # what error should this actually be?
await f3
del f1, f2, f3
# Now, try normal tasks that should work
g1 = c.submit(inc, 0, key="g1", workers=[a.address])
g2 = c.submit(inc, 1, key="g2", workers=[b.address])
g3 = c.submit(sum, [g1, g2], key="g3", workers=[b.address])
assert await g3 == 3
I haven't looked into problem 1 much yet. I'm not sure why the dependent task doesn't get transitioned to error as well. Here's what a worker task looked like in a dump of a real cluster that experienced this issue:
('sub-341a2315e88b3b0847ea02cb2582f5d3', 2, 1):
dependencies:
- <TaskState "('rechunk-merge-3b908056696e32b9928bfb6c26433bd7', 2, 1)" memory>
- <TaskState "('rechunk-merge-c50a2a505f3a9d80321c8bb9188b08de', 2, 1)" error>
duration: 0.00014734268188476562
key: ('sub-341a2315e88b3b0847ea02cb2582f5d3', 2, 1)
priority:
- 0
- 1
- 24
- -24
state: waiting
waiting_for_data:
- <TaskState "('rechunk-merge-c50a2a505f3a9d80321c8bb9188b08de', 2, 1)" error>
Clearly, if a dependency is in state error, the task should not be in waiting anymore. This feels like a state machine bug to me?
Problem 2 is kinda funny. On my real cluster, I saw a traceback like this on the worker requesting a key:
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/distributed/worker.py", line 2029, in gather_dep
response = await get_data_from_worker(
File "/usr/local/lib/python3.9/site-packages/distributed/worker.py", line 2818, in get_data_from_worker
return await retry_operation(_get_data, operation="get_data_from_worker")
File "/usr/local/lib/python3.9/site-packages/distributed/utils_comm.py", line 383, in retry_operation
return await retry(
File "/usr/local/lib/python3.9/site-packages/distributed/utils_comm.py", line 368, in retry
return await coro()
File "/usr/local/lib/python3.9/site-packages/distributed/worker.py", line 2798, in _get_data
response = await send_recv(
File "/usr/local/lib/python3.9/site-packages/distributed/core.py", line 944, in send_recv
raise exc.with_traceback(tb)
File "/usr/local/lib/python3.9/site-packages/distributed/core.py", line 770, in _handle_comm
result = await result
File "/usr/local/lib/python3.9/site-packages/distributed/worker.py", line 1737, in get_data
assert response == "OK", response
AssertionError: {'op': 'get_data', 'keys': {"('rechunk-merge-c50a2a505f3a9d80321c8bb9188b08de', 2, 1)"}, 'who': 'tls://10.0.1.172:44309', 'max_connections': None, 'reply': True}
Notice the raise exc.with_traceback(tb). This is actually the RPC system re-raising an error that occurred on the other worker (the sender), which is coming from here in get_data (the handler for the RPC): https://github.com/dask/distributed/blob/66585d586fe02df54f19fa09c5cf2d45a852af5e/distributed/worker.py#L1735-L1737
What's happening is that in get_data_from_worker (confusingly, the caller of the RPC): https://github.com/dask/distributed/blob/66585d586fe02df54f19fa09c5cf2d45a852af5e/distributed/worker.py#L2798-L2813
send_recv errors, because it fails to deserialize the data. So we never do the await comm.write("OK").
On the other end, the worker sending the data has no idea we received it and failed to deserialize. It's still waiting for us to say OK.
Which means that the next time we ask that worker for another piece of data, we send an 'op': 'get_data' message over the same comm—which get_data is still waiting to hear an OK over. 'op': 'get_data' doesn't match OK, so the subsequent data transfer request fails as well.
cc @crusaderky @fjetter
Likely duplicate of
- #6705
- #3558