procrastinate icon indicating copy to clipboard operation
procrastinate copied to clipboard

Suggested patterns for storing job results

Open coffepowered opened this issue 3 months ago • 5 comments

Hi, Newbie user here, used celery in the past to manage jobs in my applications. It seems to me that procrastinate doesn’t offer a built-in “results storage” mechanism like some other task queues. It simply executes the job and stores metadata in the Postgres tables, without providing handy access to the job’s return value.

It's clearly possible to open a new db connection inside the worker to store the results, but this would double the connection count on the DB. Moreover, I am not sure this is the intended usage: is this an acceptable usage pattern, or do you expect to cause any issue?

I'd appreciate if you can point me to documentation I may have missed about storing job results or discuss the issue further, if this an interesting use cases.

coffepowered avatar Sep 05 '25 12:09 coffepowered

Hi @coffepowered, it's intended that you store results inside the task when using Procrastinate. In contrast to Celery, with Procrastinate it’s always clear that there’s a PostgreSQL database that can be used for storage 😉. Opening another database connection is generally not a problem, so there’s no need to overcomplicate things.

medihack avatar Sep 05 '25 18:09 medihack

Hey @medihack thanks for getting back to me!

In contrast to Celery, with Procrastinate it’s always clear that there’s a PostgreSQL database that can be used for storage

Yes, it is! Celery allows however to set a backend for result storage (e.g. postgres - via sqlalchey- an many others. The 'result backend' is intentended to have a persistent way to store results. Can you confirm that such a concept does not exist in procrastinate's design (this is my inderstanding, at least)

Opening another database connection is generally not a problem, so there’s no need to overcomplicate things.

Yes, I am trying to understand how many jobs per second I can expect with my use case on a given RDS instance. Actually using the existing connection pool does not seem so difficult, this is what I came up with (by adapting some hints).

It seems to perform fine on my pc, but I am wondering if this has drawbacks I can't see. I need ton have persistent storage (postgres would be a bonus!!) and be able to consume up to 5000 jobs/minute during peak hours (that means having 40 to 120 workers), and I am not sure wether I am asking too much to procrastinate/postgres or not. So far my tests are going pretty well however

Thanks in advance!


def task_with_persistence_shared_conn(original_func=None, **task_kwargs):
    """
    Uses Procrastinate's own connection pool
    This avoids creating a separate connection pool
    """
    def wrap(func):
        @functools.wraps(func)
        async def new_func(context: JobContext, *job_args, **job_kwargs):
            job_id = context.job.id
            task_name = context.task.name
            worker_name = context.worker_name
            
            print(f"[MIDDLEWARE] Worker {worker_name}: Starting job {job_id} ({task_name})")
            query_start_template = (
                "INSERT INTO job_results (job_id, task_name, status, updated_at) "
                "VALUES (:job_id, :task_name, 'RUNNING', NOW()) "
                "ON CONFLICT (job_id) DO UPDATE SET status = 'RUNNING', updated_at = NOW()"
            )
            query_start = render_query( # renders the query with paramters - use sqlalchemy to provide sanitization
                query_start_template, job_id=job_id, task_name=task_name
            )
            await context.app.connector.execute_query_async(query_start)
            
            try:
                result = func(context, *job_args, **job_kwargs)
                query_success_template = (
                    "UPDATE job_results SET status = 'COMPLETED', result = :result, "
                    "error_message = NULL, updated_at = NOW() WHERE job_id = :job_id"
                )
                query_success = render_query(
                    query_success_template, result=json.dumps(result), job_id=job_id
                )
                await context.app.connector.execute_query_async(query_success)

                
                print(f"[MIDDLEWARE] Worker {worker_name}: Job {job_id} completed successfully")
                return result
                
            except Exception as e:
                print(f"[MIDDLEWARE] Worker {worker_name}: Job {job_id} failed: {e}")
                error_message = str(e)

                # On failure, update the status to FAILED and record the error message
                query_fail_template = (
                    "UPDATE job_results SET status = 'FAILED', error_message = :error_message, "
                    "updated_at = NOW() WHERE job_id = :job_id"
                )
                query_fail = render_query(
                    query_fail_template, error_message=error_message, job_id=job_id
                )
                await context.app.connector.execute_query_async(query_fail)
                raise

        # Always pass context and apply the procrastinate task decorator
        task_kwargs['pass_context'] = True
        return app.task(**task_kwargs)(new_func)

    if not original_func:
        return wrap
    return wrap(original_func)


@task_with_persistence_shared_conn(name="sum_with_persistence", pass_context=True, retry=3) # pass context, retry
def sum_with_persistence(context: JobContext, a, b):

    return {"result": a + b}

coffepowered avatar Sep 06 '25 14:09 coffepowered

I think Procrastinate aims at being simple, and does not take extra steps to ensure it may squeeze the most performance possible. It's great if Procrastinate fits your usecase, and we do have some performance tests to track that we don't have performance regression, but I don't think we generally expect it to perform the best possible way under very high loads.

We would probably accept contributions towards making it easier to squeeze performance out of the system as long as 1/ it doesn't make the codebase significantly more complex and 2/ it doesn't seem like it would create a huge maintenance burden in the long run.

In you case, I think your solution should work, but I'd expect you'd be safer just creating your own psycopg pool, and then using it both to create your connector and inside your tasks, so you don't need to tap into procrastinate's internals.

Note that (unless I'm mistaken) procrastinate doesn't keep its pooled connections during the task execution, so you likely won't need more connection to follow that approach (again, unless I'm mistaken)

ewjoachim avatar Sep 07 '25 20:09 ewjoachim

Hello @ewjoachim, thanks for the detailed response. That's very helpful.

I have a few follow-up questions to make sure I understand correctly:

  • Defining "High Load": I appreciate the project's focus on simplicity over raw performance. To help me understand the boundaries, what kind of loads you would consider "high load"?(e.g., ~Xjobs/minute on a small DB).

I am still trying to fully understand connection Management during Tasks

(unless I'm mistaken) procrastinate doesn't keep its pooled connections during the task execution

Does this mean that once a worker picks up a job, it releases its connection back to the Procrastinate pool while the task code (e.g., an await asyncio.sleep() or an API call) is running? If so, that's a crucial detail that I missed. From my tests it seems that connection is kept (purple line)

Image

Rationale for a Separate Pool: Based on that, is your recommendation to create my own psycopg pool mainly for API stability (i.e., not relying on Procrastinate's internals which might change)? Or is there a performance/behavioral difference I should be aware of?

My main goal here is just to find the most robust way to implement this and see if postgres-based quequing is a a viable alternative for my use case. As a matter of fact, I have tried to benchmark this systematically, managing to run 100k jobs in 8 minutes, on a small DB instance. I've opened the benchmark here, would be super grateful if you can take a look, if you think that could be interesting: https://github.com/coffepowered/procrastinate-jobs

coffepowered avatar Sep 09 '25 14:09 coffepowered

Hello, @ewjoachim , @medihack ,

I ended up going a different route, but after running the test I must say I am honestly amazed by how effective Postgres/Procrastinate is as a queuing system at low/medium scale.

coffepowered avatar Sep 15 '25 13:09 coffepowered