procrastinate icon indicating copy to clipboard operation
procrastinate copied to clipboard

feat: New fetch job function.

Open TheNeedForSleep opened this issue 1 year ago • 6 comments

fixes #1242

New fetch job function:

  • handles on update conflict
  • works with same lock different priority task order
  • inner select uses index properly

Does not support the aborting status!

Successful PR Checklist:

  • [ ] Tests
    • [ ] (not applicable?)
  • [ ] Documentation
    • [ ] (not applicable?)

PR label(s):

  • [ ] https://github.com/procrastinate-org/procrastinate/labels/PR%20type%3A%20breaking%20%F0%9F%92%A5
  • [x] https://github.com/procrastinate-org/procrastinate/labels/PR%20type%3A%20feature%20%E2%AD%90%EF%B8%8F
  • [x] https://github.com/procrastinate-org/procrastinate/labels/PR%20type%3A%20bugfix%20%F0%9F%95%B5%EF%B8%8F
  • [ ] https://github.com/procrastinate-org/procrastinate/labels/PR%20type%3A%20miscellaneous%20%F0%9F%91%BE
  • [ ] https://github.com/procrastinate-org/procrastinate/labels/PR%20type%3A%20dependencies%20%F0%9F%A4%96
  • [ ] https://github.com/procrastinate-org/procrastinate/labels/PR%20type%3A%20documentation%20%F0%9F%93%9A

TheNeedForSleep avatar Nov 20 '24 14:11 TheNeedForSleep

The code is not ready to be merged!

TheNeedForSleep avatar Dec 03 '24 13:12 TheNeedForSleep

The code is not ready to be merged!

You can mark the PR as a draft to make sure it doesn't get accidentally merged :)

TkTech avatar Dec 03 '24 15:12 TkTech

When the PR is ready, press the big "Ready for review" button

Screenshot 2024-12-04 at 14 34 57

ewjoachim avatar Dec 04 '24 13:12 ewjoachim

Thanks for this PR, @TheNeedForSleep ! I have just come up against this issue in a project I'm working on. I have temporarily "fixed" it for myself with pretty much the same SQL change:

CREATE OR REPLACE FUNCTION procrastinate_fetch_job(
    target_queue_names character varying[]
)
    RETURNS procrastinate_jobs
    LANGUAGE plpgsql
AS $$
DECLARE
	found_jobs procrastinate_jobs;
BEGIN
    WITH candidate AS (
        SELECT jobs.*
            FROM procrastinate_jobs AS jobs
            WHERE
                (
                    jobs.lock IS NULL OR
                    -- reject the job if its lock has earlier jobs
                    NOT EXISTS (
                        SELECT 1
                            FROM procrastinate_jobs AS earlier_jobs
                            WHERE
                                earlier_jobs.lock = jobs.lock
                                AND earlier_jobs.status IN ('todo', 'doing', 'aborting')
                                AND earlier_jobs.id < jobs.id)
                )
                AND jobs.status = 'todo'
                AND (target_queue_names IS NULL OR jobs.queue_name = ANY( target_queue_names ))
                AND (jobs.scheduled_at IS NULL OR jobs.scheduled_at <= now())
            ORDER BY jobs.priority DESC, jobs.id ASC LIMIT 1
            FOR UPDATE OF jobs SKIP LOCKED
    )
    UPDATE procrastinate_jobs
        SET status = 'doing'
        FROM candidate
        WHERE procrastinate_jobs.id = candidate.id
        RETURNING procrastinate_jobs.* INTO found_jobs;

	RETURN found_jobs;
END;
$$;

Question: why is it necessary to make the change in manager.py? It looks to me that the existing query will already fallback to a job where jobs.lock IS NULL.

joaquimds avatar Dec 13 '24 11:12 joaquimds

Question: why is it necessary to make the change in manager.py? It looks to me that the existing query will already fallback to a job where jobs.lock IS NULL.

If multiple workers try to fetch tasks with the same lock they will keep running into the same conflicts. To deescalate a worker that fails to fetch a task successfully just grabs the next task with no lock instead of returning None going to sleep and then keep retrying and keep escalating the problem.

I dont know if it is actually necessary to add in this code for everyone but in some situations this problem will occur. The following factors will lead to the situation:

  • high number of workers
  • low job runtime

I will let you guys decide if you actually want that change in the manager.py Given I now had one month to think about it i dont think it is actually necessary because the cost of "worker that fails to fetch a job will get a Null Job (sql fetch job returns Null on failure) goes to sleep and will fetch again after either poll sleep time or listen event"

If any of my ideas hard to follow then please let me know :)

TheNeedForSleep avatar Jan 16 '25 17:01 TheNeedForSleep

Motivation for this change:

Worker will crash when trying to add a job with lock that already is in the lock index.

image

Test setup: Og current main fetch job function

The following jobs where the fast_job_go_brr is called once.

import asyncio
import logging
import time
from random import choice, random

from pydantic import validate_call

from worker import PRIORITY_QUEUE, IntegrityErrorRetryStrategy, worker

logger = logging.getLogger(__name__)


@worker.task(
    queue=PRIORITY_QUEUE,
    retry=IntegrityErrorRetryStrategy(),
)
@validate_call
async def fast_job_go_brr(
    n_jobs: int = 10_000,
    locks: list | None = None,
):
    if locks is None:
        locks = [None, "A", "B", "C"]

    async with asyncio.TaskGroup() as group:
        for _i in range(n_jobs):
            group.create_task(
                fast_job.configure(
                    lock=choice(locks),  # noqa: S311
                    priority=choice([1, 2, 3]),  # noqa: S311
                ).defer_async()
            )


@worker.task(
    queue=PRIORITY_QUEUE,
    retry=IntegrityErrorRetryStrategy(),
)
@validate_call
async def fast_job(min_wait: float = 0.001, max_wait: float = 0.01, sync_wait=False):
    sleep_time = random() * (max_wait - min_wait) + min_wait  # noqa: S311
    logger.info("Waiting %s", sleep_time)
    if sync_wait:
        time.sleep(sleep_time)
    else:
        await asyncio.sleep(sleep_time)
    logger.info("Done")

TheNeedForSleep avatar Jan 17 '25 10:01 TheNeedForSleep