procrastinate
procrastinate copied to clipboard
Draft - DjangoConnector
TODO
Closes #502
Successful PR Checklist:
- [ ] Tests
- [ ] (not applicable?)
- [ ] Documentation
- [ ] (not applicable?)
I think we need to decide of an approach:
- either we try to make the Django Connector as close as possible to the Psycopg2 connector, and undo all the things Django does and I think we're going to fight against django all the way through
- Or we try to "embrace" the Django way: use the ORM and
Model.objects.raw()(and do #320 at the same time). Cons: there might be more work, and I have no idea whether this will work. Pro: less likely to completely break with each new Django version.
What's your opinion here ?
Another option is to use django cursors to execute custom sql directly (like in this PR at the moment). Pro: no additional work to create models. I think the best to use Django features as much as possible - so 2nd and 3rd approach
I don't see the difference between your 3rd approach and my 1st ?
Maybe I don't fully understand what you meant in 1st approach. Psycopg2 can't be used by workers. Aiopg can be used with workers, but it is not using django connection, so task deferring won't be possible inside transaction.
So in 3rd approach I meant DjangoConnector would work like mix of Psycopg2 and Aiopg. Also Django connection/cursor objects would be used everywhere instead of postgres equivalents extracted from Django connection (excluding listen-notify case).
Ah ok, I understand your point. I had forgotten that the Psycopg2Connector was incompatible with workers.
But then it seems it would be easier to make the Psycopg2Connector compatible with workers, and then we'd be almost all good to have it work with Django ?
I think it still wouldn't work, because Psycopg2Connector creates separate connections and django.db.connection must be used to make transactions work correctly.
Hey all 👋 I'm curious as to the current state of Procrastinate's support for Django integration. I've found this in the documentation, https://procrastinate.readthedocs.io/en/stable/howto/django.html, and I'm wondering how much functionality there is in this Draft PR that would be beneficial.
Hey :)
I feel a bit ashamed that I let this PR rot :/ I felt that I needed to discuss it with other maintainers and it never really happenned :(
My issue is that the "connector" situation is already a bit of a mess especially regarding sync vs async, and I'm afraid that making the wrong choice in integrating a django connector might make it even harder when we'll address the whole subject for real.
I'm not saying this PR makes the wrong choice but I'm actually a bit lost regarding what the correct path would be :/
Hey @ewjoachim, I totally understand! After looking around, I really enjoy the API provided by Procrastinate, especially compared to a lot of the other task/queue frameworks out there.
It would be great to have a tighter Django integration, but I'm going to proceed with the project as-is and see how it turns out ¯_(ツ)_/¯
With that said, I certainly agree with your earlier sentiments (https://github.com/procrastinate-org/procrastinate/pull/506#issuecomment-998698519) on "embrace the Django way". I'll be honest, I don't have a lot of experience with the low-level Django ORM "magic" nor do I have any experience with Procrastinate. But I'd still love to see this deeper integration happen, and I might be able to help out in smaller ways if you need any help on getting this through 😊 Docs, tests, etc.
My issue is that the "connector" situation is already a bit of a mess especially regarding sync vs async, and I'm afraid that making the wrong choice in integrating a django connector might make it even harder when we'll address the whole subject for real.
Is this perhaps why we have a hard time using a sync app (with our django-allauth backend, synchronous Python context) along with our otherwise async ASGI FastApi application? When I try to use @sync_app.task decorator on our Django code/functions, I tend to see errors like this:
@sync_app.task(**request_status_io_task_args)
async def add_subscriber(email: str) -> None:
"""Add user as a subscriber to status.io maintenance notifications"""
# Django model code
@receiver(user_signed_up)
def enroll_status_io(request, user: User, **kwargs) -> None:
add_subscriber.defer(email=user.email)
[cloud-app] File "/usr/local/lib/python3.10/site-packages/allauth/account/utils.py", line 183, in complete_signup
[cloud-app] signals.user_signed_up.send(
[cloud-app] File "/usr/local/lib/python3.10/site-packages/django/dispatch/dispatcher.py", line 180, in send
[cloud-app] return [
[cloud-app] File "/usr/local/lib/python3.10/site-packages/django/dispatch/dispatcher.py", line 181, in <listcomp>
[cloud-app] (receiver, receiver(signal=self, sender=sender, **named))
[cloud-app] File "/cloud-app/iam/models.py", line 256, in enroll_status_io
[cloud-app] add_subscriber.defer(email=user.email)
[cloud-app] File "/usr/local/lib/python3.10/site-packages/procrastinate/tasks.py", line 129, in defer
[cloud-app] return self.configure().defer(**task_kwargs)
[cloud-app] File "/cloud-app/tests/conftest.py", line 728, in defer_from_jobdeferrer
[cloud-app] job_id = original_defer_from_jobdeferrer(self, **task_kwargs)
[cloud-app] File "/usr/local/lib/python3.10/site-packages/procrastinate/jobs.py", line 163, in defer
[cloud-app] job = self.job_manager.defer_job(job=job)
[cloud-app] File "/usr/local/lib/python3.10/site-packages/procrastinate/manager.py", line 52, in defer_job
[cloud-app] result = self.connector.execute_query_one(
[cloud-app] File "/usr/local/lib/python3.10/site-packages/procrastinate/utils.py", line 148, in wrapper
[cloud-app] return sync_await(awaitable=awaitable)
[cloud-app] File "/usr/local/lib/python3.10/site-packages/procrastinate/utils.py", line 195, in sync_await
[cloud-app] loop = asyncio.get_event_loop()
[cloud-app] File "/usr/local/lib/python3.10/asyncio/events.py", line 656, in get_event_loop
[cloud-app] raise RuntimeError('There is no current event loop in thread %r.'
[cloud-app] RuntimeError: There is no current event loop in thread 'WSGI_0'.
Our workaround historically has been to make an ad-hoc, one-time-use sync app as follows:
@receiver(user_signed_up)
def enroll_status_io(request, user: User, **kwargs) -> None:
defer_with_sync_app(add_subscriber, **request_status_io_task_args)(email=user.email)
def defer_with_sync_app(task_func, **task_kwargs) -> Callable:
def wrapper(**kwargs):
app = make_sync_app()
task = app.task(**task_kwargs)(task_func)
with app.open():
task_id = task.defer(**kwargs)
return task_id
return wrapper
Yes, that's probably the reason :/
Yes, that's probably the reason :/
@ewjoachim do you know if it's possible at all to use both an async and sync app in the same FastAPI application? We mount Django as an ASGI app within our FastAPI application.
I can't say for sure. I believe it may be possible, but it might require that someone changes the way the sync/async translation works within procrastinate, and I'm not sure I have enough knowledge (and/or time to spend on the matter) to investigate what needs to be done exactly, being sure enough that we're going in the right direction and not piling up a different mistake :/
I'd say a potentially good case study candidate would be asgiref's sync_to_async and async_to_sync. It's possible that we might be able to use and/or vendor those and get something that works nicely enough for both.
I'd say a potentially good case study candidate would be asgiref's sync_to_async and async_to_sync. It's possible that we might be able to use and/or vendor those and get something that works nicely enough for both.
Be cautious. You'll start to run into weird stuff with contextvars (they don't really work well to/from threads) and semaphores on threads (for example FastAPI can only run 40 sync dependencies/endpoints concurrently because of an implicit semaphore buried deep in the stack).
Ok, sync+async is a mistake and I should never have tried to have procrastinate work on both :D (not being sarcastic, I really regret the choice, but I wont be beating myself with a stick too much)
The question is "what do we do now" :)
Just rubber ducking here. What would you do if you were starting all over? Only support one? Support both but in completely separate codebases?
I guess we would need to make as much as possible of the procrastinate code independant from I/Os (fortunately, this is already what happens in many parts of the codebase, but not on the workers, though) and have high level functions that only take care of I/Os, and delegate all the "business logic" to our "functional" core.
It's probably not as easy as it sounds, because it's not just a question of using dependency injection to be loosely coupled from whoever does the I/Os, as we need to avoid having to choose whether we call x() or await x() in our function.
So... Maybe implement each "logical" entrypoint as a generator that would be sending "I/O requests" and receiving "I/O responses" and... I'm re-inventing async I/O all over again, aren't I? 😅
FTR I have the same threading problem when trying to use either our async app or a sync one.
For cases where you know the async thing is not using tasks internally (maybe because you wrote it) you can do something like this: https://github.com/edgedb/edgedb-python/blob/master/edgedb/blocking_client.py#L343-L353
Worker heavily uses tasks, sadly... but we'll need a full rewrite anyway...
Superseeded by #906 . Thank you to everyone who contributed though!