procrastinate
procrastinate copied to clipboard
Cannot defer a task from another task
Hi there! I am having an issue trying to .defer()
a task from another task.
Issue
Here's a minimum example:
from procrastinate import AiopgConnector, App
app = App(connector=AiopgConnector())
app.open()
@app.task
def other_task():
print("Hey! I'm here.")
@app.periodic(cron="* * * * *")
@app.task
def periodic_task(timestamp: Optional[int] = None):
other_task.defer()
@app.task
def primary_task():
other_task.defer()
The other_task.defer()
fails in my worker process (started as procrastinate --verbose --app=exampleapp.app.app worker
) with the following stack trace regardless of whether I defer from the Python shell (app.primary_task.defer()
) or when the Periodic tick occurs:
Traceback (most recent call last):
File "/Users/christopher/Library/Caches/pypoetry/virtualenvs/exampleapp-AzECwUBP-py3.10/lib/python3.10/site-packages/procrastinate/worker.py", line 229, in run_job
task_result = task(*job_args, **job.task_kwargs)
File "/Users/christopher/Library/Caches/pypoetry/virtualenvs/exampleapp-AzECwUBP-py3.10/lib/python3.10/site-packages/procrastinate/tasks.py", line 109, in __call__
return self.func(*args, **kwargs)
File "/Users/christopher/Projects/exampleapp/src/exampleapp/app.py", line 273, in periodic_task
other_task.defer()
File "/Users/christopher/Library/Caches/pypoetry/virtualenvs/exampleapp-AzECwUBP-py3.10/lib/python3.10/site-packages/procrastinate/tasks.py", line 129, in defer
return self.configure().defer(**task_kwargs)
File "/Users/christopher/Library/Caches/pypoetry/virtualenvs/exampleapp-AzECwUBP-py3.10/lib/python3.10/site-packages/procrastinate/jobs.py", line 163, in defer
job = self.job_manager.defer_job(job=job)
File "/Users/christopher/Library/Caches/pypoetry/virtualenvs/exampleapp-AzECwUBP-py3.10/lib/python3.10/site-packages/procrastinate/manager.py", line 52, in defer_job
result = self.connector.execute_query_one(
File "/Users/christopher/Library/Caches/pypoetry/virtualenvs/exampleapp-AzECwUBP-py3.10/lib/python3.10/site-packages/procrastinate/utils.py", line 149, in wrapper
return sync_await(awaitable=awaitable)
File "/Users/christopher/Library/Caches/pypoetry/virtualenvs/exampleapp-AzECwUBP-py3.10/lib/python3.10/site-packages/procrastinate/utils.py", line 200, in sync_await
return loop.run_until_complete(awaitable)
File "/Users/christopher/.pyenv/versions/3.10.1/lib/python3.10/asyncio/base_events.py", line 617, in run_until_complete
self._check_running()
File "/Users/christopher/.pyenv/versions/3.10.1/lib/python3.10/asyncio/base_events.py", line 577, in _check_running
raise RuntimeError('This event loop is already running')
RuntimeError: This event loop is already running
I could probably work around this with how I structure my code -- I could probably move the logic I wish to defer into a shared function that's called from 2 places, but ideally I'm not forced to do that.
The reason I've structured things this way is that it doesn't seem to be possible to defer a periodic task directly (which may be a separate issue I'm happy to file as well). Obviously in normal use cases you probably wouldn't do this, but while I'm testing things out locally and iterating on development I'd like to be able to invoke a Periodic task directly (rather than waiting for it to run on whatever Cron schedule I've set).
Given the example above:
>>> from exampleapp import app
>>> app.periodic_task.defer()
2022-05-11 12:26:06,444 [DEBUG] procrastinate.jobs :: About to defer job exampleapp.app.periodic_task[None]()
2022-05-11 12:26:06,449 [INFO] procrastinate.jobs :: Deferred job exampleapp.app.periodic_task[87]()
87
>>> app.periodic_task.defer(15)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
TypeError: Task.defer() takes 1 positional argument but 2 were given
The invocation for job number 87 fails with the same RuntimeError
given above.
Expected
Without using project-specific jargon, what I'd like to have happen is that I'd like to enqueue task B
from within periodic task A
. I do not need to wait for the result of B
and I would like control to return to A
once B
is enqueued.
For context, I am coming from a place where I've run a large application using Celery and Rabbit, so I may simply be applying Celery paradigms to Procrastinate incorrectly so please feel free to let me know if that's the case.
Comments
This project is neat and I decided to use it for a small little side project I'm working on. I especially liked for a small side project that I would not be required to set up a separate task queue for running jobs asynchronously and on a schedule. Thanks for the work here!
Damn, that's the sync / async compatibility biting us again :/
As a "small" step, I'd say defining your task like this will likely solve the issue:
@app.periodic(cron="* * * * *")
@app.task
async def periodic_task(timestamp: Optional[int] = None):
await other_task.defer_async()
But I agree it's not ideal :/
Thanks for the reply @ewjoachim. I'd already figured out the Optional[int]
trick with timestamp which worked reasonably well, though I was still running into the later issue with the the async/sync mismatch.
For now, I just ended up splitting up my logic into a "private" function that I can call both from a non-periodic task for testing and periodic task. I don't think I can go full async yet since I've got some other dependencies which don't support it, but thank you for the tip on a possible workaround.
Oh sorry if it wasn't clear. My comment was solely about the async/await stuff, the Optional[int]
is copied/pasted from your own snippet above :)
I could be wrong, but I think it shouldn't be too impactful if you declare your function as async but don't actually do async stuff in it. That's kind of the same thing that happens as when procrastinate launches your sync task. The consequence will be that the event loop will not get control back until the end of you task, but if you're doing sync things, I think the event loop doesn't actually need to have control back.