arq icon indicating copy to clipboard operation
arq copied to clipboard

Support for task orchestration

Open jokull opened this issue 3 years ago • 7 comments

A common feature of task processing libraries is orchestration that allows you to

  • define tasks that may happen in parallel (similar to asyncio.gather)
  • and what task or tasks will receive a payload of that task and start executing when that group of tasks has finished

Dramatiq calls it composition with pipelines

Celery calls it canvas with chords and groups

A use case might be:

  1. Upon accepting a new blog post from a user
  2. Perform natural language processing
  3. Update a search index based on keywords
  4. Notify users monitoring certain keywords that a new blog post has been created

These all need to happen in sequence and rely on the previous workload. The return value of each task would probably be forwarded to each subsequent job. I assume this could be achieved by scheduling jobs within jobs. However, being able to "string tasks together upfront" gives a little more flexibility and declarative feel to the initial scheduling.

This example doesn't include parallel tasks. An example of that use case might be:

  1. On a user signup
    1. Calculate spam detection of user signup
    2. Calculate follow suggestions
  2. If user passes spam detector and when follow suggestions have been found; notify users of a potential known contact

The idea is to be able to define these tasks up front and queue all of them at once.

Referenced Twitter response

jokull avatar Apr 28 '21 08:04 jokull

This can all be achieved now, I've added an example of all the cases you asked about below, let me know if anything is unclear in this example.

I'm sure we should at least add a few examples like this to the docs, PR welcome otherwise I'll do it.

However I agree that that's not the whole story, some way to declaratively describe workflows would be useful.

I think "Declarative Workflows" would be a good name for this since:

  • it does what it says on the tin
  • it doesn't reuse the term "pipeline" which is already used by aioredis
  • it distinguishes from imperative workflows like those I've demonstrated below

How would declarative workflows work?

I would very much like to implement them without making the existing logic in workers.py anymore complicated (my internal cyclomatic complexity limit has already been exceeded 😖).

I would therefore propose we add some default jobs to a worker, one of which could be workflow, we could then add a method to ArqRedis to call a workflow.

I've added a very rough first idea of how the interface might work at the bottom, please let me know what you think.


Examples of current solution (imperative worflows)

import asyncio
from arq import create_pool, ArqRedis
from arq.connections import RedisSettings


async def do_stuff(ctx, time: int, value: str):
    print('doing stuff...')
    await asyncio.sleep(time)
    # you could of course call more nested jobs here
    return f'stuff done {value}'


async def join_things(ctx, *things: str):
    print('join things together...')
    await asyncio.sleep(1)
    return '<>'.join(things)


async def nested_job(ctx):
    """
    This job just calls another jobs (do_stuff) and ends without waiting for do_stuff to finish)
    """
    redis: ArqRedis = ctx['redis']
    await redis.enqueue_job('do_stuff', 3, 'called from nested_job')
    return 'do_stuff called, not waiting'


async def parallel(ctx):
    """
    Enqueue two other jobs, and wait for them both to complete before continuing
    """
    redis: ArqRedis = ctx['redis']
    # call do_stuff, but don't wait for the result
    j1 = await redis.enqueue_job('do_stuff', 3, 'job 1')
    j2 = await redis.enqueue_job('do_stuff', 2, 'job 2')
    r1, r2 = await asyncio.gather(j1.result(), j2.result())

    j3 = await redis.enqueue_job('join_things', r1, r2)
    r3 = await j3.result()

    return f'things joined together: {r3}'


async def main():
    redis = await create_pool(RedisSettings())
    print('creating job...')

    await redis.enqueue_job('nested_job')
    await redis.enqueue_job('parallel')
    # we could also do this with asyncio.gather:
    # await asyncio.gather(
    #     redis.enqueue_job('nested_job'),
    #     redis.enqueue_job('parallel'),
    # )
    # but note, this is virtually the same as the above case since enqueue_job only waits for the
    # job to be in redis, not for it to complete


class WorkerSettings:
    functions = [do_stuff, nested_job, join_things, parallel]


if __name__ == '__main__':
    asyncio.run(main())

Proposed Declarative Workflows interface

(Note: this is just a rough idea of how the interface might work, not a firm idea, or a promise I'll build it)

This should represent the same workflow as the parallel job above, in theory you should be able to chain together an arbitrary number of steps, each with an arbitrary number of parallel jobs.

@dataclass
class ResultReference:
    id: str

@dataclass
class WorkflowJob:
    function: str
    args: Tuple[Union[ResultReference, str, int], ...]  # technically should be Any, but ResultReference has a specific meaning
    id: str = None


class NewArqRedis(ArqRedis):
    async def workflow(self, *steps: Iterable[WorkflowJob, ...]):
        ...
    
    
async def usage(redis: NewArqRedis):
    await redis.workflow(
        (
            WorkflowJob('do_stuff', (3, 'job 1'), 'j1'),
            WorkflowJob('do_stuff', (2, 'job 2'), 'j2'),
        ),
        (
            WorkflowJob('join_things', (ResultReference('j1'), ResultReference('j2'))),
        )
    )

samuelcolvin avatar Apr 28 '21 11:04 samuelcolvin

Amazing! The declarative API looks super nice.

One little comment:

This style, r1, r2 = await asyncio.gather(j1.result(), j2.result()), is not actually the same as orchestrating task results, inputs and execution. It's a great thing for when you actually want to await on the "main" loop, say for example in a web request, but it's another thing to be able to queue tasks that are executed and awaited in a certain way by a broker/hub thing. Does that make sense?

jokull avatar Apr 28 '21 12:04 jokull

You mean analogous to Celery chord?

So, something like this?

async def parallel(ctx):
    """
    Enqueue two other jobs, and wait for them both to complete before continuing
    """
    redis: ArqRedis = ctx['redis']
    # call do_stuff, but don't wait for the result
    j1 = await redis.enqueue_job('do_stuff', 3, 'job 1')
    j2 = await redis.enqueue_job('do_stuff', 2, 'job 2')
    
    await redis.chord(j1, j2, callback="join_things")

Kludex avatar Apr 28 '21 12:04 Kludex

@jokull r1, r2 = await asyncio.gather(j1.result(), j2.result()) is happening inside a job.

parallel is kind of like a management job that handles the orchestration of other jobs.

samuelcolvin avatar Apr 28 '21 13:04 samuelcolvin

Of course! My bad. That makes perfect sense. In this case, the declarative style is just a "nice to have". It looks like all top level requirements for my project at @tryinch are met with this imperative style. Declarative might even just be unnecessary complexity. A section in the docs about Task Orchestration might be a good next step?

jokull avatar Apr 28 '21 13:04 jokull

yes, it's definitely just sugar coating, but I'd be happy to add it, especially if it doesn't make workers.py any more complex.

samuelcolvin avatar Apr 28 '21 13:04 samuelcolvin

@samuelcolvin thanks for the great tool first!

Maybe I'm missing something but is there a way to run tasks in celery's chain-like way? In other words, tasks linking: the linked task is called when the previous task completed.

For now the only way I can see this could be implemented is to create a helper job that accepts list of jobs to run sequentially and wait for the result of each task.

async def chain_jobs(ctx: dict, *jobs: dict):
    redis: ArqRedis = ctx['redis']
    for job in jobs:
        async_result = await redis.enqueue_job(**job)
        await async_result.result()

NikitaZakharov avatar May 19 '23 09:05 NikitaZakharov