arq icon indicating copy to clipboard operation
arq copied to clipboard

Tasks get stuck if worker crash and restart

Open iamyoudi opened this issue 2 years ago • 7 comments

We are using ARQ in Kubernetes with multiple workers (pods) and it works perfectly fine until some worker crashes due to resource constraints (memory). After the crash and restart of the worker, tasks don’t restart automatically for hours (sometimes tasks start after a long time say 4 or 5 hours).

Not sure if we are missing some configuration in worker settings or something. Any insight will be helpful.

Thanks for this simple yet powerful library.

iamyoudi avatar Sep 12 '22 14:09 iamyoudi

Hi,
Are you able to reproduce this? I tried and here's my findings:

  • Start worker
  • Create task that asyncio.sleep(30)
  • Call task
  • Shut down worker by closing terminal
  • Inspect redis:
127.0.0.1:6379> keys *
1) "arq:in-progress:96f840b7cb374049953369a6554a86d2"
2) "arq:queue"
3) "arq:retry:96f840b7cb374049953369a6554a86d2"
4) "arq:job:96f840b7cb374049953369a6554a86d2"
  • Start worker again.

Now, we have to check the documentation. The default settings we need to look at are:

  • job_timeout: SecondsTimedelta = 300
  • expires_extra_ms: int = 86400000

Let's check the time to live (TTL) for these keys :

127.0.0.1:6379> ttl arq:in-progress:c8e51a5e370e4665bad5de30e41d3581
(integer) 230
127.0.0.1:6379> ttl arq:job:c8e51a5e370e4665bad5de30e41d3581
(integer) 86313
127.0.0.1:6379> ttl arq:retry:c8e51a5e370e4665bad5de30e41d3581
(integer) 88308

So, the in-progress will last 300 seconds. The arq:job and arq:retry will last 86400 seconds. When the arq:in-progress-key has expired after 300 seconds, the job was picked up again.

Have you increased your job_timeout setting?

JonasKs avatar Sep 15 '22 16:09 JonasKs

Ahh, now it makes sense, we increased job_timeout to a few hours. Thanks, @JonasKs.

Shouldn’t ARQ handle this case? In Redis job's status is in-progress , but no worker really executing this job. If job_timeout is big then it's really painful. Thoughts?

iamyoudi avatar Sep 16 '22 07:09 iamyoudi

This red box explains why this is needed:

arq v0.16 has what I’m calling “pessimistic execution”: jobs aren’t removed from the queue until they’ve either succeeded or failed.

So, since the worker never reports them as failed (since the worker is crashed), the key will exist until the job is timed out, based on your settings.

In Redis job's status is in-progress , but no worker really executing this job

There is no way of knowing that no worker is executing this job at the moment.

This could be added, though. Something like this?

  • Create a new queue that workers report their health status to
    • This should be executed on schedule on the worker itself, async to other tasks
  • When the worker picks a task, mark which worker is executing this task
    • From arq:in-progress:96f840b7cb374049953369a6554a86d2
    • To: arq:in-progress:96f840b7cb374049953369a6554a86d2:worker-1
  • If worker-1 has not reported itself as healthy, the healthy-key for that worker would expire.
  • If another worker, worker-2 is connected and see tasks with :worker-1 in-progress, but it knows that worker is not healthy, it should remove the in-progress-key.

JonasKs avatar Sep 16 '22 08:09 JonasKs

Thanks @iamyoudi for the question and @JonasKs for the explanation.

Yes, I think you've got to the bottom of why this is happening.

@JonasKs your idea (or something like it) might work but it would add significant extra complexity and would be hard to test without a pretty complex setup and a long time to identify problems.

I worry it might introduce new, very subtle errors. For example: we would need to make sure that workers each have a unique but deterministic id, also what happens if worker-1 comes back up a different points through re-run sequence.

I think (without looking into it in great detail) that the best solution/work-around would be to decrease expires_extra_ms so jobs are executed again more quickly. Or ideally to shutdown workers gracefully so they can remove the in-progress key as they terminate.


I'm sorry I don't have more time to devote to arq and subtle (but important) problems like this. At the moment I'm trying to devote as much time as possible to pydantic and getting pydantic V2 released. Maybe I'll have time to look at arq in more detail next year, but until then it'll only be maintenance, and reviewing and merging PRs.

samuelcolvin avatar Sep 16 '22 08:09 samuelcolvin

Thanks, @samuelcolvin for replying. Knowing why it's happening is itself a great help. We can tweak parameters to avoid this issue, @JonasKs thanks again for that.

Should we keep this issue open to track any possible future improvements?

@samuelcolvin really excited for the future pydantic V2 release. More power to you.

iamyoudi avatar Sep 16 '22 09:09 iamyoudi

Thanks for understanding, let's leave this open as a reminder during some future (maybe) refactoring of arq.

samuelcolvin avatar Sep 16 '22 09:09 samuelcolvin

Thanks @iamyoudi for the question and @JonasKs for the explanation.

We can tweak parameters to avoid this issue, @JonasKs thanks again for that.

My pleasure, was interesting to investigate. 😊

also what happens if worker-1 comes back up a different points through re-run sequence.

Hmm, I feel like since all tasks should be idempotent, this shouldn't really be an issue? Or to just make sure the worker shuts it self down if it can't confirm the health through redis?

Or ideally to shutdown workers gracefully so they can remove the in-progress key as they terminate.

Hmm.. Could you give me some insight in the SIGTERM implementation at the moment? Does the worker stop picking jobs after SIGTERM has been received? If so, the change @iamyoudi (and me, actually) would need is to increase the grace period before K8s send SIGKILL. This can be configured on the pod itself.
In other words:
SIGTERM received -> Stop picking new jobs -> do not cancel anything, just keep on working -> when all tasks are done -> shut down process
This would allow us to configure terminationGracePeriodSeconds(k8s grace period parameter) to match the job_timeout time. Any worker that gets the SIGTERM signal will process all jobs as normal before shutting down / getting killed. New rollovers would then never create this problem.

If this is not the current implementation, I am very interested in looking into it.

I'm sorry I don't have more time to devote to arq and subtle (but important) problems like this.

Honestly most of these "issues" aren't that important to have a code fix on, as long as we're aware of them and know how to work with them. Quality of life features like #277 can be accomplished by literally passing on parameters.

As always, thank you for the great work! Excited for Pydantic v2!! :tada:

JonasKs avatar Sep 16 '22 10:09 JonasKs

This would allow us to configure terminationGracePeriodSeconds(k8s grace period parameter) to match the job_timeout time. Any worker that gets the SIGTERM signal will process all jobs as normal before shutting down / getting killed. New rollovers would then never create this problem.

This was solved in #345, for future readers.

JonasKs avatar Dec 04 '22 14:12 JonasKs