procrastinate
procrastinate copied to clipboard
Worker refactor
Current sitation
When the procrastinate worker starts, it will launch n+2 coroutines, n being the concurrency setting.
- n coroutines will be "subworkers" that loop while the worker isn't requested to stop. Because the stop request may happen at any time, the loop itself is a bit complicated so as to ensure that we stop asap. On each loop, it will wait for a listen/notify event, request a job, and if it receives one, run it, then loop.
- 1 coroutine is the listen/notify coroutine that will use an
asyncio.Eventto communicate that new tasks are available. - 1 periodic defer coroutine will defer periodic jobs.
- All those coroutines are handled by a horribly complicated construct (utils.run_tasks)
Issues with the current situation
Each subworker requests new tasks from the DB. That's far too many SQL queries. Also, utils.run_tasks is a bit disgusting.
Possible solutions
Instead of having n looping subworkers, we should have a single job spawner that figures out when a new job is available and then launches job coroutines withing asyncio tasks.
We should still respect concurrency, probably using asyncio.Semaphore.
At this point I'm not 100% sure yet how graceful and especially ungraceful shutdown should look like. I guess the standard "shutdown signal" for a coroutine is task.cancel(), so the first request to shutdown that triggers a graceful shutdown should use this. Note that it's already how App.run_worker_async() is implemented, but maybe this should be a part of the worker. Then, I guess a second .cancel() should shut down everything, but still ensure that cancelled job have a special state. I think "failed" is the existing corresponding state, waiting for a proper handling of withdrawn & interrupted tasks (in another ticket)
For now this all is up do design discussions.
@onlyann has expressed interest in tackling this. (I can't assign you)
Closed by #1114