celery icon indicating copy to clipboard operation
celery copied to clipboard

Documentation for Task.replace for Dynamic Tasks

Open costrouc opened this issue 9 years ago • 5 comments

I have been using Celery heavily but one feature I have really needed (along with many others as I can see on stack overflow) is the ability for dynamic tasks . Task.replace looks like it will enable this functionality and much more. I have been experimenting with Celery 4.0 and was looking to find ways to implement certain types of dynamic tasks. Would it be worth documenting example usage of Task.replace?

I would be happy to help with documenting the proper way to implement ways for dynamic tasks within a canvas workflow as I would really like to contribute to the project.


from celery import Celery, group

app = Celery()
app.config_from_object(CeleryConfig) # Get config from somewhere

@app.task
def add_task(*args):
      return sum(args)

@app.task
def add_group_task(args):
      return sum(args)

Example replace methods.

@app.task(bind=True)
def replace_chain_task(self):
      sig = add_task.s(1, 2) | add_task.s(3, 4)
      raise self.replace(sig)

@app.task(bind=True)
def replace_chord_task(self, n):
      sig = group(add_task.s(i) for i in range(n)) | add_group_task.s()
      raise self.replace(sig)

Example 1: Replace task with chain of tasks

>>> (add_task.s(5, 6) | replace_chain_task.s() | add_task.s(10)).delay()
31       # 5+6+1+2+3+4+10

Example 2: Replace task with chord of tasks

>>> (add_task.s(1, 4) | replace_chord_task.s() | add_task.s(1)).delay()
11       # (0 + 1 + 2 + 3 + 4) + 1

Example 3: Replace tasks within group of tasks with chain

>>> (group(replace_chain_task.s() for i in range(4)) | add_group_task.s()).delay()
40      # (1+2+3+4) + (1+2+3+4) + (1+2+3+4) + (1+2+3+4)

TODO But not sure how to implement and not working

Example 4: Replace tasks within group of tasks with chord (NOT WORKING)

>>>(group(replace_chord_task.s(i) for i in range(5)) | add_group_task.s()).delay()
IndexError: 0 # Should return 10  (0+1+2+3+4) + (0+1+2+3) + (0+1+2) + (0+1) + (0)
                       # Looking at worker it looks like it almost completes

Example 5: Replace task with group (not chord) is it possible? Could not get to work

@app.task(bind=True)
def replace_group_task(self, n):
      sig = group(add_task.s(i) for i in range(n))
      raise self.replace(sig)

I would love to come up with additional examples as well. Task.replace looks like it will make Celery significantly more powerful.

costrouc avatar Sep 08 '16 21:09 costrouc

Note that, according to the documentation: "the best practice is to always use raise self.replace(...) to convey to the reader that the task won't continue after being replaced."

bruno-rino avatar Sep 09 '16 10:09 bruno-rino

A working way to replace with a single group is:

@app.task(bind=True)
def replace_group_task(self, n):
    subtasks = [add_task.s(i) for i in range(n)]
    return self.replace(group(*subtasks))

Celery's testcases include these kind of tasks. Check out https://github.com/celery/celery/blob/master/t/integration/tasks.py#L83

I cannot emphasize enough that this functionality is core to implementing what many people would think map does: run tasks in parallel that can consume the results of a previous task. Second the call for further documentation. Would love to be pointed towards contribution guidelines to do so myself.

jeffreybrowning avatar Feb 13 '20 22:02 jeffreybrowning

@jeffreybrowning I agree. PRs are welcome.

Here's our contribution guide: https://github.com/celery/celery/blob/master/CONTRIBUTING.rst

thedrow avatar Feb 16 '20 13:02 thedrow

I have a use case for this that I've been busting my head trying to implement...

I am running a complex workflow comprising of several tasks that can run in chords or in parallel etc... One of the task comprised in this workflow needs to be split in chunks based on the length of the workflow results passed to it as first argument, if those results exceed a certain length.

Something like:

from utils import my_internal_task, flatten, deduplicate

CHUNK_SIZE = 10
CHUNK_TARGET_SIZE = 1000

@app.task
def forward_results(results):
  results = flatten(results)
  results = deduplicate(results, attr='_uuid')
  return results

@app.task(bind=True)
def run_command(results, *args, **kwargs):
  if len(results) > CHUNK_TARGET_SIZE:
     chunks = get_chunks(results, CHUNK_SIZE)
     chunked_tasks = [run_command.si(chunk, *args, **kwargs) for chunk in chunks]
     return self.replace(chain(group(*chunked_tasks), forward_results.s()))
  results = my_internal_task(results, *args, **kwargs)
  return results

Note that the rest of the workflow will depend on the results of that task / group of tasks.

Any pointers on how to achieve this ?

ocervell avatar Oct 10 '23 15:10 ocervell

same here. is this solved? I always felt that self.replace(workflow) is not the way to go. Also right now I'm having a problem where the chord callback is getting group results in the execution order, NOT in the calling order (weird behavior, since according to pseudo-code for unlock_chord shows callback is applied with group.join() method.

lucioladen avatar Mar 06 '24 16:03 lucioladen