procrastinate
procrastinate copied to clipboard
Feature request: automatically retry jobs when worker exited
The docs about retrying stalled jobs says "Make sure to only retry a task when you’re reasonably sure that it is not running anymore", but it's not always easy to check if a worker is still working on a task. To make procrastinate more robust I think it could have the feature to automatic retry jobs when a worker exits. I'm willing to contribute but would like some advice on how to approach this. For this to work we need to check if a worker is still alive and, if not, set all its running jobs to "todo". Is there any way we can check the "live" workers?
I'd like to avoid having the "central hub" know about the workers (that's achieved with Celery et. al. using heartbeats). I understand the problem it solves, but I think it introduces a LOT of new constraints.
I was thinking in a periodic task enabled by default for every worker that:
- queries the database to check if there's at least one running job being executed by a worker different from self (and maybe stalled for at least X seconds)
- try to "ping" this worker (could use the same mechanism used to notify workers about new tasks)
- if the worker answers, do nothing
- if not, set all its tasks to "todo"
Then... I think there could be something simpler: Each worker sends an "alive" (or similar name) event on every job it's currently running every say 30 seconds (configurable). get_stalled_jobs
could then check this instead of the max time. If a job has been running for more than 90 seconds with no "alive" event in the last 90 seconds, it's returned by get_stalled_job
. What do you think ?
Agreed! But I'd also add an option so procrastinate automatically schedule a recurring task to do it (something like app = App(..., reschedule_when_worker_dies=True)
), so the user does not need to read the docs, copy that boilerplate etc. - one less thing to think about.
Actually, I think this option should be enabled by default and the user could disable if wanted (in my opinion, automatically deal with failures for you is a sane default for a job scheduling library).
I'm willing to help implementing this, but I'd like more context. Do you have any tips on which files I should read/change?
The big problem with this approach is we're currently blind when user uses synchronous task (during the execution of a sync task, the event loop stops, so anything happening on the background such as sending an alive event is paused). We'd need to step up our sync/async game before we implement this, or at least introduce an self check that raise if we use this feature with sync tasks.
But again, I think the solution is to be better at doing sync/async things.
Ok, I'm now convinced the base primitive we should work with is https://docs.python.org/3/library/asyncio-eventloop.html#executing-code-in-thread-or-process-pools
I should do a dedicated ticket to address this
https://github.com/procrastinate-org/procrastinate/issues/751
Then... I think there could be something simpler: Each worker sends an "alive" (or similar name) event on every job it's currently running every say 30 seconds (configurable).
get_stalled_jobs
could then check this instead of the max time. If a job has been running for more than 90 seconds with no "alive" event in the last 90 seconds, it's returns byget_stalled_job
. What do you think ?
My team and I came with a similar idea today. We've just implemented the retry mechanism to flag the stalled job.
With a solution like this, the worker could update a last_heartbeat
column every x but we'll still need to have a "central hub" that watches those columns. Instead of fetching the stalled jobs by RUNNING_JOBS_MAX_TIME
like in the example, it would be by HEARTBEAT_MAX_TIME
for a given job. Could be really nice to have a feature like this.
Here's my implementation:
In the database:
create table procrastinate_latest_heartbeat (
id uuid not null default gen_random_uuid() PRIMARY KEY,
job_id bigint NOT NULL REFERENCES procrastinate_jobs(id)
on delete cascade unique,
received_at TIMESTAMPTZ not null default now()
);
create or replace function procrastinate_send_heartbeat(
job_id bigint
) returns void as $$
insert into procrastinate_latest_heartbeat (job_id)
values (job_id)
on conflict (job_id) do update set received_at = now();
$$ language sql;
create or replace view procrastinate_latest_started_or_heartbeat as
select
procrastinate_jobs.id as job_id,
greatest(
procrastinate_latest_heartbeat.received_at,
(
select max(procrastinate_events.at)
from procrastinate_events
where procrastinate_events.job_id = procrastinate_jobs.id
and procrastinate_events.type = 'started'
)
) as latest_started_or_heartbeat
from procrastinate_jobs
left join procrastinate_latest_heartbeat
on procrastinate_latest_heartbeat.job_id = procrastinate_jobs.id;
In Python (SQLAlchemy + asyncpg):
# task for retrying stuck jobs
@app.periodic(cron="* * * * *")
@app.task(
queue=QUEUE_NAME,
name="retry_jobs",
pass_context=True,
)
async def retry_jobs(
context: procrastinate.JobContext,
*args,
timestamp: int | None = None,
**kwargs,
):
assert context.task is not None
heartbeat_view = orm.t_procrastinate_latest_started_or_heartbeat
q = (
select(orm.ProcrastinateJobs.id)
.join(
heartbeat_view,
orm.ProcrastinateJobs.id == heartbeat_view.c.job_id,
)
.where(
heartbeat_view.c.latest_started_or_heartbeat != None,
heartbeat_view.c.latest_started_or_heartbeat
< func.now() - timedelta(minutes=5),
orm.ProcrastinateJobs.status == "doing",
orm.ProcrastinateJobs.task_name != context.task.name,
orm.ProcrastinateJobs.queue_name == QUEUE_NAME,
)
.order_by(orm.ProcrastinateJobs.id)
.with_for_update(of=orm.ProcrastinateJobs, skip_locked=True)
)
async with (
AsyncSession(orm.engine, expire_on_commit=False) as session,
session.begin(),
):
job_ids = await session.scalars(q)
await session.execute(
update(orm.ProcrastinateJobs)
.values(status="todo", scheduled_at=None)
.where(orm.ProcrastinateJobs.id.in_(job_ids))
)
# decorator for task handlers
def send_heartbeats(task_handler: TaskHandler) -> TaskHandler:
@functools.wraps(task_handler)
async def wrapper(context: procrastinate.JobContext, *args, **kwargs):
assert context.job is not None
assert context.job.id is not None
async with asyncio.TaskGroup() as tg:
ht = tg.create_task(heartbeat_loop(context.job.id))
await task_handler(context, *args, **kwargs)
ht.cancel()
return wrapper
async def heartbeat_loop(job_id: int, interval_secs: int = 5):
while True:
async with (
AsyncSession(sf_orm.engine, expire_on_commit=False) as session,
session.begin(),
):
await session.execute(
select(func.procrastinate_send_heartbeat(job_id))
)
await asyncio.sleep(interval_secs)