distributed
distributed copied to clipboard
Failure to pickle/unpickle on flight breaks the worker state machine
Use case 1
A task in flight fails to unpickle when it lands; this triggers a GatherDepFailureEvent.
Use case 2
A task in flight unpickles successfully when it lands; this triggers a GatherDepSuccesfulEvent.
The task is larger than 60% of memory_limit, so it's spilled immediately.
However, it fails to pickle back.
Use case 3
Network stack raises, as shown in #6877 / #6875 This is a bug in the network stack and should be fixed there.
Expected behaviour
The task is marked as erred on the worker. The scheduler is informed. Any task waiting on it that was already on the worker is released. The whole computation fails, reraising the exception on the client.
Actual behaviour
If validate=True, the worker shuts itself down with @fail_hard:
for ts_wait in ts.waiting_for_data:
assert self.tasks[ts_wait.key] is ts_wait
> assert ts_wait.state in WAITING_FOR_DATA, ts_wait
E AssertionError: <TaskState 'x' error>
If validate=False, the worker sends {op: task-erred} to the scheduler. Unsure what happens next, considering that the task is in memory for the scheduler. This is untested.
#6703 introduces tests for both use cases (xfailed):
test_worker_memory.py::test_workerstate_fail_to_pickle_flighttest_worker_state_machine.py::test_gather_dep_failure
Note that there are no integration tests with the scheduler - they need to be added.
I believe this is a duplicate of https://github.com/dask/distributed/issues/4439
The task is larger than 60% of memory_limit, so it's spilled immediately. However, it fails to pickle back.
I don't understand what "fails to pickle back" means and how this connects to the flight state the unpickling error after deserialization would happen somewhere else. also, how would that be possible that it only sometimes fails to unpickle?
I don't understand what "fails to pickle back" means and how this connects to the flight state the unpickling error after deserialization would happen somewhere else.
The task is executed on worker A, then sent to worker B.
- It is pickled on worker A,
- transferred over the network,
- unpickled on worker B (by the network stack),
- and immediately pickled on worker B if it's larger than 60% memory_limit (by the SpillBuffer)
also, how would that be possible that it only sometimes fails to unpickle?
In the sense that it pickles/unpickles fine on A, but fails on B. This is a very common pain point on all libraries where the behaviour of pickle/unpickle of individual objects is altered by the global state - Pint (https://pint.readthedocs.io/en/stable/), to name one.