arq
arq copied to clipboard
Support for task orchestration
A common feature of task processing libraries is orchestration that allows you to
- define tasks that may happen in parallel (similar to
asyncio.gather
) - and what task or tasks will receive a payload of that task and start executing when that group of tasks has finished
Dramatiq calls it composition with pipelines
Celery calls it canvas with chords and groups
A use case might be:
- Upon accepting a new blog post from a user
- Perform natural language processing
- Update a search index based on keywords
- Notify users monitoring certain keywords that a new blog post has been created
These all need to happen in sequence and rely on the previous workload. The return value of each task would probably be forwarded to each subsequent job. I assume this could be achieved by scheduling jobs within jobs. However, being able to "string tasks together upfront" gives a little more flexibility and declarative feel to the initial scheduling.
This example doesn't include parallel tasks. An example of that use case might be:
- On a user signup
- Calculate spam detection of user signup
- Calculate follow suggestions
- If user passes spam detector and when follow suggestions have been found; notify users of a potential known contact
The idea is to be able to define these tasks up front and queue all of them at once.
This can all be achieved now, I've added an example of all the cases you asked about below, let me know if anything is unclear in this example.
I'm sure we should at least add a few examples like this to the docs, PR welcome otherwise I'll do it.
However I agree that that's not the whole story, some way to declaratively describe workflows would be useful.
I think "Declarative Workflows" would be a good name for this since:
- it does what it says on the tin
- it doesn't reuse the term "pipeline" which is already used by aioredis
- it distinguishes from imperative workflows like those I've demonstrated below
How would declarative workflows work?
I would very much like to implement them without making the existing logic in workers.py
anymore complicated (my internal cyclomatic complexity limit has already been exceeded 😖).
I would therefore propose we add some default jobs to a worker, one of which could be workflow
, we could then add a method to ArqRedis
to call a workflow.
I've added a very rough first idea of how the interface might work at the bottom, please let me know what you think.
Examples of current solution (imperative worflows)
import asyncio
from arq import create_pool, ArqRedis
from arq.connections import RedisSettings
async def do_stuff(ctx, time: int, value: str):
print('doing stuff...')
await asyncio.sleep(time)
# you could of course call more nested jobs here
return f'stuff done {value}'
async def join_things(ctx, *things: str):
print('join things together...')
await asyncio.sleep(1)
return '<>'.join(things)
async def nested_job(ctx):
"""
This job just calls another jobs (do_stuff) and ends without waiting for do_stuff to finish)
"""
redis: ArqRedis = ctx['redis']
await redis.enqueue_job('do_stuff', 3, 'called from nested_job')
return 'do_stuff called, not waiting'
async def parallel(ctx):
"""
Enqueue two other jobs, and wait for them both to complete before continuing
"""
redis: ArqRedis = ctx['redis']
# call do_stuff, but don't wait for the result
j1 = await redis.enqueue_job('do_stuff', 3, 'job 1')
j2 = await redis.enqueue_job('do_stuff', 2, 'job 2')
r1, r2 = await asyncio.gather(j1.result(), j2.result())
j3 = await redis.enqueue_job('join_things', r1, r2)
r3 = await j3.result()
return f'things joined together: {r3}'
async def main():
redis = await create_pool(RedisSettings())
print('creating job...')
await redis.enqueue_job('nested_job')
await redis.enqueue_job('parallel')
# we could also do this with asyncio.gather:
# await asyncio.gather(
# redis.enqueue_job('nested_job'),
# redis.enqueue_job('parallel'),
# )
# but note, this is virtually the same as the above case since enqueue_job only waits for the
# job to be in redis, not for it to complete
class WorkerSettings:
functions = [do_stuff, nested_job, join_things, parallel]
if __name__ == '__main__':
asyncio.run(main())
Proposed Declarative Workflows interface
(Note: this is just a rough idea of how the interface might work, not a firm idea, or a promise I'll build it)
This should represent the same workflow as the parallel
job above, in theory you should be able to chain together an arbitrary number of steps, each with an arbitrary number of parallel jobs.
@dataclass
class ResultReference:
id: str
@dataclass
class WorkflowJob:
function: str
args: Tuple[Union[ResultReference, str, int], ...] # technically should be Any, but ResultReference has a specific meaning
id: str = None
class NewArqRedis(ArqRedis):
async def workflow(self, *steps: Iterable[WorkflowJob, ...]):
...
async def usage(redis: NewArqRedis):
await redis.workflow(
(
WorkflowJob('do_stuff', (3, 'job 1'), 'j1'),
WorkflowJob('do_stuff', (2, 'job 2'), 'j2'),
),
(
WorkflowJob('join_things', (ResultReference('j1'), ResultReference('j2'))),
)
)
Amazing! The declarative API looks super nice.
One little comment:
This style, r1, r2 = await asyncio.gather(j1.result(), j2.result())
, is not actually the same as orchestrating task results, inputs and execution. It's a great thing for when you actually want to await
on the "main" loop, say for example in a web request, but it's another thing to be able to queue tasks that are executed and awaited in a certain way by a broker/hub thing. Does that make sense?
You mean analogous to Celery chord?
So, something like this?
async def parallel(ctx):
"""
Enqueue two other jobs, and wait for them both to complete before continuing
"""
redis: ArqRedis = ctx['redis']
# call do_stuff, but don't wait for the result
j1 = await redis.enqueue_job('do_stuff', 3, 'job 1')
j2 = await redis.enqueue_job('do_stuff', 2, 'job 2')
await redis.chord(j1, j2, callback="join_things")
@jokull r1, r2 = await asyncio.gather(j1.result(), j2.result())
is happening inside a job.
parallel
is kind of like a management job that handles the orchestration of other jobs.
Of course! My bad. That makes perfect sense. In this case, the declarative style is just a "nice to have". It looks like all top level requirements for my project at @tryinch are met with this imperative style. Declarative might even just be unnecessary complexity. A section in the docs about Task Orchestration might be a good next step?
yes, it's definitely just sugar coating, but I'd be happy to add it, especially if it doesn't make workers.py
any more complex.
@samuelcolvin thanks for the great tool first!
Maybe I'm missing something but is there a way to run tasks in celery's chain-like way? In other words, tasks linking: the linked task is called when the previous task completed.
For now the only way I can see this could be implemented is to create a helper job that accepts list of jobs to run sequentially and wait for the result of each task.
async def chain_jobs(ctx: dict, *jobs: dict):
redis: ArqRedis = ctx['redis']
for job in jobs:
async_result = await redis.enqueue_job(**job)
await async_result.result()