Suggested patterns for storing job results
Hi, Newbie user here, used celery in the past to manage jobs in my applications. It seems to me that procrastinate doesn’t offer a built-in “results storage” mechanism like some other task queues. It simply executes the job and stores metadata in the Postgres tables, without providing handy access to the job’s return value.
It's clearly possible to open a new db connection inside the worker to store the results, but this would double the connection count on the DB. Moreover, I am not sure this is the intended usage: is this an acceptable usage pattern, or do you expect to cause any issue?
I'd appreciate if you can point me to documentation I may have missed about storing job results or discuss the issue further, if this an interesting use cases.
Hi @coffepowered, it's intended that you store results inside the task when using Procrastinate. In contrast to Celery, with Procrastinate it’s always clear that there’s a PostgreSQL database that can be used for storage 😉. Opening another database connection is generally not a problem, so there’s no need to overcomplicate things.
Hey @medihack thanks for getting back to me!
In contrast to Celery, with Procrastinate it’s always clear that there’s a PostgreSQL database that can be used for storage
Yes, it is! Celery allows however to set a backend for result storage (e.g. postgres - via sqlalchey- an many others. The 'result backend' is intentended to have a persistent way to store results. Can you confirm that such a concept does not exist in procrastinate's design (this is my inderstanding, at least)
Opening another database connection is generally not a problem, so there’s no need to overcomplicate things.
Yes, I am trying to understand how many jobs per second I can expect with my use case on a given RDS instance. Actually using the existing connection pool does not seem so difficult, this is what I came up with (by adapting some hints).
It seems to perform fine on my pc, but I am wondering if this has drawbacks I can't see. I need ton have persistent storage (postgres would be a bonus!!) and be able to consume up to 5000 jobs/minute during peak hours (that means having 40 to 120 workers), and I am not sure wether I am asking too much to procrastinate/postgres or not. So far my tests are going pretty well however
Thanks in advance!
def task_with_persistence_shared_conn(original_func=None, **task_kwargs):
"""
Uses Procrastinate's own connection pool
This avoids creating a separate connection pool
"""
def wrap(func):
@functools.wraps(func)
async def new_func(context: JobContext, *job_args, **job_kwargs):
job_id = context.job.id
task_name = context.task.name
worker_name = context.worker_name
print(f"[MIDDLEWARE] Worker {worker_name}: Starting job {job_id} ({task_name})")
query_start_template = (
"INSERT INTO job_results (job_id, task_name, status, updated_at) "
"VALUES (:job_id, :task_name, 'RUNNING', NOW()) "
"ON CONFLICT (job_id) DO UPDATE SET status = 'RUNNING', updated_at = NOW()"
)
query_start = render_query( # renders the query with paramters - use sqlalchemy to provide sanitization
query_start_template, job_id=job_id, task_name=task_name
)
await context.app.connector.execute_query_async(query_start)
try:
result = func(context, *job_args, **job_kwargs)
query_success_template = (
"UPDATE job_results SET status = 'COMPLETED', result = :result, "
"error_message = NULL, updated_at = NOW() WHERE job_id = :job_id"
)
query_success = render_query(
query_success_template, result=json.dumps(result), job_id=job_id
)
await context.app.connector.execute_query_async(query_success)
print(f"[MIDDLEWARE] Worker {worker_name}: Job {job_id} completed successfully")
return result
except Exception as e:
print(f"[MIDDLEWARE] Worker {worker_name}: Job {job_id} failed: {e}")
error_message = str(e)
# On failure, update the status to FAILED and record the error message
query_fail_template = (
"UPDATE job_results SET status = 'FAILED', error_message = :error_message, "
"updated_at = NOW() WHERE job_id = :job_id"
)
query_fail = render_query(
query_fail_template, error_message=error_message, job_id=job_id
)
await context.app.connector.execute_query_async(query_fail)
raise
# Always pass context and apply the procrastinate task decorator
task_kwargs['pass_context'] = True
return app.task(**task_kwargs)(new_func)
if not original_func:
return wrap
return wrap(original_func)
@task_with_persistence_shared_conn(name="sum_with_persistence", pass_context=True, retry=3) # pass context, retry
def sum_with_persistence(context: JobContext, a, b):
return {"result": a + b}
I think Procrastinate aims at being simple, and does not take extra steps to ensure it may squeeze the most performance possible. It's great if Procrastinate fits your usecase, and we do have some performance tests to track that we don't have performance regression, but I don't think we generally expect it to perform the best possible way under very high loads.
We would probably accept contributions towards making it easier to squeeze performance out of the system as long as 1/ it doesn't make the codebase significantly more complex and 2/ it doesn't seem like it would create a huge maintenance burden in the long run.
In you case, I think your solution should work, but I'd expect you'd be safer just creating your own psycopg pool, and then using it both to create your connector and inside your tasks, so you don't need to tap into procrastinate's internals.
Note that (unless I'm mistaken) procrastinate doesn't keep its pooled connections during the task execution, so you likely won't need more connection to follow that approach (again, unless I'm mistaken)
Hello @ewjoachim, thanks for the detailed response. That's very helpful.
I have a few follow-up questions to make sure I understand correctly:
- Defining "High Load": I appreciate the project's focus on simplicity over raw performance. To help me understand the boundaries, what kind of loads you would consider "high load"?(e.g., ~Xjobs/minute on a small DB).
I am still trying to fully understand connection Management during Tasks
(unless I'm mistaken) procrastinate doesn't keep its pooled connections during the task execution
Does this mean that once a worker picks up a job, it releases its connection back to the Procrastinate pool while the task code (e.g., an await asyncio.sleep() or an API call) is running? If so, that's a crucial detail that I missed. From my tests it seems that connection is kept (purple line)
Rationale for a Separate Pool: Based on that, is your recommendation to create my own psycopg pool mainly for API stability (i.e., not relying on Procrastinate's internals which might change)? Or is there a performance/behavioral difference I should be aware of?
My main goal here is just to find the most robust way to implement this and see if postgres-based quequing is a a viable alternative for my use case. As a matter of fact, I have tried to benchmark this systematically, managing to run 100k jobs in 8 minutes, on a small DB instance. I've opened the benchmark here, would be super grateful if you can take a look, if you think that could be interesting: https://github.com/coffepowered/procrastinate-jobs
Hello, @ewjoachim , @medihack ,
I ended up going a different route, but after running the test I must say I am honestly amazed by how effective Postgres/Procrastinate is as a queuing system at low/medium scale.