procrastinate
procrastinate copied to clipboard
Django - support for transaction.atomic
Hey guys, thanks for a great library.
Context
Just to give you a context, I'd like to use it in Django project to reliably communicate with other microservices and implement Transactional outbox and Message Relay patterns.
Problem
The problem is that currently it's not possible to save django models and defer a task in one transaction. Example code below:
import procrastinate
from django.db import transaction
from .models import *
from procrastinate.contrib.django import connector_params
from asgiref.sync import sync_to_async
app = procrastinate.App(
connector=procrastinate.Psycopg2Connector(**connector_params())
)
@app.task
def some_other_task():
print("Hello Word")
@app.task
@sync_to_async # Needed to be able to use Django ORM
@transaction.atomic
def send_order_placed_notification():
some_other_task.defer() # Task is deferred even when the next line throws IntegrityError (transaction rollback)
SomeModel.objects.create()
@transaction.atomic
def some_view(request):
send_order_placed_notification.defer() # Task is deferred even when the next line throws IntegrityError (transaction rollback)
Order.objects.create()
Root cause
Psycopg2Connector/AiopgConnector create separate database connection instead of using django.db.connection
Ah interesting ! Would you like to participate in a PR to fix this ? Otherwise, I can't promise when I'll have time to try a fix, but we'll definitely try to find something.
Maybe the best would be to make a DjangoConnector ? Or split the Psycopg2Connector logic by creating a "ConnectionProvider" class, which would have a default implementation that creates a pool as is done currently, and a DjangoConnectionProvider that would read the connection from django ?
Following https://hynek.me/articles/python-subclassing-redux/, I'd tend to say the second solution might be better.
Yes, I would like to participate.
I've already created DjangoConnector based on Psycopg2Connetor, but it is working only in django app. I have to investigate how to make it work for workers (they require async connector at the moment).
Maybe it's ok as a first step if we have a different connector for deferring and for workers, but I believe you may want to use the ORM in the tasks and it will be problematic so...
Interesting problem to solve :) Feel free to expose as much as you want of your findings here and we'll brainstorm our way through it together :)
I've created some draft which we can discuss
Summary of my findings so far:
- To be able to defer task and use Django ORM in one transaction the same connection must be used, so
django.db.connectionsmust be used. Both in procrastinate workers and django application . - Django will raise SynchronousOnlyOperation exception when running some parts of code (eg. Django ORM) from a thread where there is a running event loop. More info here. To fix it
sync_to_async(thread_sensitive=True)could be used. - Procrastinate worker runs his own event loop in main thread, because of that sync_to_async will run Django ORM code in different thread. Running Django ORM code not from main thread may cause different issues.
The reason this is needed in Django is that many libraries, specifically database adapters, require that they are accessed in the same thread that they were created in. Also a lot of existing Django code assumes it all runs in the same thread, e.g. middleware adding things to a request for later use in views.
So I think the easiest way to make it work for Django is to rewrite the procrastinate worker in a synchronous way and create DjangoConnector.
I think the easiest way to make it work for Django is to rewrite the procrastinate worker in a synchronous way
Yes, I think so too.
to reliably communicate with other microservices and implement Transactional outbox and Message Relay patterns.
Came here for the exact same pattern, but using FastAPI/SQLAlchemy. We can create a session object in our FastAPI handlers and pass that around, but need a way to thread it into the defer_async call.
Hi, if anyone is facing problem starting worker in Django app:
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/procrastinate/utils.py", line 391, in run_tasks
await asyncio.gather(_main(), *side_tasks)
File "/usr/local/lib/python3.11/site-packages/procrastinate/utils.py", line 383, in _main
await asyncio.gather(*main_tasks)
File "/usr/local/lib/python3.11/site-packages/procrastinate/worker.py", line 134, in single_worker
job = await self.job_manager.fetch_job(self.queues)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/procrastinate/manager.py", line 139, in fetch_job
if row["id"] is None:
~~~^^^^^^
TypeError: tuple indices must be integers or slices, not str
This helped me:
from procrastinate.contrib.django import connector_params
params = connector_params()
del params["cursor_factory"] # looks like Django is passing tuple based cursor
app = App(connector=AiopgConnector(**params))
Next, use @sync_to_async(thread_sensitive=False) to have concurrent sync jobs. Otherwise will be worker blocked by single task even with --concurrency=30 argument.
@app.task(name="mytask")
async def mytask(obj_pk):
@sync_to_async(thread_sensitive=False)
def work():
time.sleep(5) # sync placeholder
Next, use
@sync_to_async(thread_sensitive=False)to have concurrent sync jobs
Yes, we have something in the works for switching to sync_to_async/async_to_sync
For the rest, I believe the best way to solve this will be when we add support for psycopg3
transaction.atomic now work with the revamp of the Django integration (not: it's not a stable release yet but it's already merged in main)