procrastinate icon indicating copy to clipboard operation
procrastinate copied to clipboard

Feature request: automatically retry jobs when worker exited

Open turicas opened this issue 2 years ago • 9 comments

The docs about retrying stalled jobs says "Make sure to only retry a task when you’re reasonably sure that it is not running anymore", but it's not always easy to check if a worker is still working on a task. To make procrastinate more robust I think it could have the feature to automatic retry jobs when a worker exits. I'm willing to contribute but would like some advice on how to approach this. For this to work we need to check if a worker is still alive and, if not, set all its running jobs to "todo". Is there any way we can check the "live" workers?

turicas avatar Feb 19 '23 23:02 turicas

I'd like to avoid having the "central hub" know about the workers (that's achieved with Celery et. al. using heartbeats). I understand the problem it solves, but I think it introduces a LOT of new constraints.

ewjoachim avatar Feb 20 '23 08:02 ewjoachim

I was thinking in a periodic task enabled by default for every worker that:

  • queries the database to check if there's at least one running job being executed by a worker different from self (and maybe stalled for at least X seconds)
  • try to "ping" this worker (could use the same mechanism used to notify workers about new tasks)
    • if the worker answers, do nothing
    • if not, set all its tasks to "todo"

turicas avatar Feb 20 '23 15:02 turicas

Then... I think there could be something simpler: Each worker sends an "alive" (or similar name) event on every job it's currently running every say 30 seconds (configurable). get_stalled_jobs could then check this instead of the max time. If a job has been running for more than 90 seconds with no "alive" event in the last 90 seconds, it's returned by get_stalled_job. What do you think ?

ewjoachim avatar Feb 21 '23 20:02 ewjoachim

Agreed! But I'd also add an option so procrastinate automatically schedule a recurring task to do it (something like app = App(..., reschedule_when_worker_dies=True)), so the user does not need to read the docs, copy that boilerplate etc. - one less thing to think about. Actually, I think this option should be enabled by default and the user could disable if wanted (in my opinion, automatically deal with failures for you is a sane default for a job scheduling library). I'm willing to help implementing this, but I'd like more context. Do you have any tips on which files I should read/change?

turicas avatar Feb 21 '23 23:02 turicas

The big problem with this approach is we're currently blind when user uses synchronous task (during the execution of a sync task, the event loop stops, so anything happening on the background such as sending an alive event is paused). We'd need to step up our sync/async game before we implement this, or at least introduce an self check that raise if we use this feature with sync tasks.

But again, I think the solution is to be better at doing sync/async things.

ewjoachim avatar Feb 23 '23 07:02 ewjoachim

Ok, I'm now convinced the base primitive we should work with is https://docs.python.org/3/library/asyncio-eventloop.html#executing-code-in-thread-or-process-pools

I should do a dedicated ticket to address this

ewjoachim avatar Feb 23 '23 08:02 ewjoachim

https://github.com/procrastinate-org/procrastinate/issues/751

ewjoachim avatar Feb 23 '23 09:02 ewjoachim

Then... I think there could be something simpler: Each worker sends an "alive" (or similar name) event on every job it's currently running every say 30 seconds (configurable). get_stalled_jobs could then check this instead of the max time. If a job has been running for more than 90 seconds with no "alive" event in the last 90 seconds, it's returns by get_stalled_job. What do you think ?

My team and I came with a similar idea today. We've just implemented the retry mechanism to flag the stalled job.

With a solution like this, the worker could update a last_heartbeat column every x but we'll still need to have a "central hub" that watches those columns. Instead of fetching the stalled jobs by RUNNING_JOBS_MAX_TIME like in the example, it would be by HEARTBEAT_MAX_TIME for a given job. Could be really nice to have a feature like this.

ldlac avatar Nov 16 '23 01:11 ldlac

Here's my implementation:

In the database:

create table procrastinate_latest_heartbeat (
  id                      uuid not null default gen_random_uuid() PRIMARY KEY,
  job_id                  bigint NOT NULL REFERENCES procrastinate_jobs(id)
                          on delete cascade unique,
  received_at             TIMESTAMPTZ not null default now()
);

create or replace function procrastinate_send_heartbeat(
  job_id bigint
) returns void as $$
   insert into procrastinate_latest_heartbeat (job_id)
   values (job_id)
   on conflict (job_id) do update set received_at = now();
$$ language sql;

create or replace view procrastinate_latest_started_or_heartbeat as
select
  procrastinate_jobs.id as job_id,
  greatest(
    procrastinate_latest_heartbeat.received_at,
    (
      select max(procrastinate_events.at)
      from procrastinate_events
      where procrastinate_events.job_id = procrastinate_jobs.id
        and procrastinate_events.type = 'started'
    )
  ) as latest_started_or_heartbeat
  from procrastinate_jobs
  left join procrastinate_latest_heartbeat
    on procrastinate_latest_heartbeat.job_id = procrastinate_jobs.id;

In Python (SQLAlchemy + asyncpg):

# task for retrying stuck jobs
@app.periodic(cron="* * * * *")
@app.task(
    queue=QUEUE_NAME,
    name="retry_jobs",
    pass_context=True,
)
async def retry_jobs(
    context: procrastinate.JobContext,
    *args,
    timestamp: int | None = None,
    **kwargs,
):
    assert context.task is not None

    heartbeat_view = orm.t_procrastinate_latest_started_or_heartbeat

    q = (
        select(orm.ProcrastinateJobs.id)
        .join(
            heartbeat_view,
            orm.ProcrastinateJobs.id == heartbeat_view.c.job_id,
        )
        .where(
            heartbeat_view.c.latest_started_or_heartbeat != None,
            heartbeat_view.c.latest_started_or_heartbeat
            < func.now() - timedelta(minutes=5),
            orm.ProcrastinateJobs.status == "doing",
            orm.ProcrastinateJobs.task_name != context.task.name,
            orm.ProcrastinateJobs.queue_name == QUEUE_NAME,
        )
        .order_by(orm.ProcrastinateJobs.id)
        .with_for_update(of=orm.ProcrastinateJobs, skip_locked=True)
    )
    async with (
        AsyncSession(orm.engine, expire_on_commit=False) as session,
        session.begin(),
    ):
        job_ids = await session.scalars(q)

        await session.execute(
            update(orm.ProcrastinateJobs)
            .values(status="todo", scheduled_at=None)
            .where(orm.ProcrastinateJobs.id.in_(job_ids))
        )

# decorator for task handlers
def send_heartbeats(task_handler: TaskHandler) -> TaskHandler:
    @functools.wraps(task_handler)
    async def wrapper(context: procrastinate.JobContext, *args, **kwargs):
        assert context.job is not None
        assert context.job.id is not None
        async with asyncio.TaskGroup() as tg:
            ht = tg.create_task(heartbeat_loop(context.job.id))
            await task_handler(context, *args, **kwargs)
            ht.cancel()

    return wrapper


async def heartbeat_loop(job_id: int, interval_secs: int = 5):
    while True:
        async with (
            AsyncSession(sf_orm.engine, expire_on_commit=False) as session,
            session.begin(),
        ):
            await session.execute(
                select(func.procrastinate_send_heartbeat(job_id))
            )
        await asyncio.sleep(interval_secs)

amacfie avatar Aug 23 '24 20:08 amacfie