distributed icon indicating copy to clipboard operation
distributed copied to clipboard

Task timeouts and retries

Open mrocklin opened this issue 9 years ago • 6 comments

Sometimes we want to retry a task on another worker if it appears to be taking a long time. One approach would be to specify a timeout when submitting the task

future = e.submit(func, *args, timeout=5)

Another would be for the function itself to raise a special exception

def func(*args):
    ...
    raise RetryException()

The latter is somewhat attractive because it removes some administrative burden from the scheduler.

Pinging @minrk for thoughts or previous experiences.

mrocklin avatar Jul 27 '16 15:07 mrocklin

IPython doesn't allow retrying task on timeout (that might be a little tricky, depending on how well you support aborting/invalidating results that are pending), but it does support task reassignment based on special exceptions. In IPython's case, it's an UnmetDependency exception, indicating that the worker cannot run the task (meant for missing packages, resources like memory, GPUs, etc.).

minrk avatar Jul 28 '16 08:07 minrk

Just curious, has anything changed on this front? In particular I was looking for a solution to:

Sometimes we want to retry a task on another worker if it appears to be taking a long time.

I often find that the last couple of tasks (out of many thousands) will hang for unknown reasons; the only solution I've come up with so far is to submit everything w/ retries and have some sort of loop in the client that checks for this situation and retries everything that's running:

c.retire_workers([k for k, v in c.processing().items() if v])

Far from ideal but it at least breaks out of the deadlock where the job will never finish.

bnaul avatar Aug 16 '18 04:08 bnaul

@bnaul you might be interested in https://github.com/dask/dask/issues/1183

louisabraham avatar Oct 11 '18 21:10 louisabraham

Just curious, has anything changed on this front? In particular I was looking for a solution to:

Sometimes we want to retry a task on another worker if it appears to be taking a long time.

I often find that the last couple of tasks (out of many thousands) will hang for unknown reasons; the only solution I've come up with so far is to submit everything w/ retries and have some sort of loop in the client that checks for this situation and retries everything that's running:

c.retire_workers([k for k, v in c.processing().items() if v])

Far from ideal but it at least breaks out of the deadlock where the job will never finish.

Wow, this line is a life-saver. Almost in all of my jobs I end up running into this problem.

Executing this by hand also helped with my issue here: https://stackoverflow.com/questions/64557212/dask-distributed-workers-stall-when-reaching-80-of-memory-limit

Is there meanwhile a solution for this problem?

Hoeze avatar Oct 27 '20 18:10 Hoeze

FWIW today there's a special distributed.Reschedule exception which tasks can raise to automatically be stopped and rescheduled. This doesn't guarantee that it will ultimately be run a different worker, but might still be useful for folks who are tracking this issue.

jrbourbeau avatar Apr 28 '21 01:04 jrbourbeau

This would be really useful especially if that could be a global to all tasks. My specific use case would be basically annotate any task in a graph with a timeout that would trigger a retry. I often find that 99% of my tasks finish quickly, but for some reason one hangs potentially indefinitely. I could use the exception mentioned by @jrbourbeau above, but it make it hard if I am calling a function defined by a package that triggers many tasks.

One example is Xarray's' to_zarr(). It would be great to be able to use a context manager for this like

with dask.config.set(task_timeout=1000):
   dset.to_zarr(...)

Another use case that would be helpful is when I have a hypothesis that a dask distributed lock is stuck or something similar. There are many hypothesis I have that are unlikely to be true about why a very small subset of tasks are hanging at the very end. This is the only sweeping mechanism that could help trigger a retry.

I have no clue what this would take, just giving my two cents at how this would be a nice-to-have.

ljstrnadiii avatar Jul 26 '22 16:07 ljstrnadiii

I have another scenario of same issue where raising anything from the task won't work: I have a cluster tuned for raw performance versus stability, and tasks are allowed to rarely fail with unexpected consequences (segfaults typically) and thus can sometimes hang without any way to recover. I don't see any "infinite" hangs - when the tasks stops responding it will eventually trigger a reschedule "automatically" but the delay is huge, probably due to some internal timeout that saves the day, so I can safely keep it running without supervision. The issue is of course degraded overall performance.

The line I'm seeing in logs after a long delay:

2023-06-04 13:20:28,448 - distributed.client - WARNING - Couldn't gather 1 keys, rescheduling {'run_sim_many-91117067be7edfb74fec26fd07754ed2': ('tcp://10.0.0.33:42059',)} 

So I need a way to trigger that internal timeout. Ideally, some routine should learn 99th percentile of single-job completion time, and force-reschedule.

I will try to play with TCP keepalive and timeouts on scheduler node to see if it helps

grandrew avatar Jun 04 '23 20:06 grandrew