celery-dyrygent icon indicating copy to clipboard operation
celery-dyrygent copied to clipboard

dyrgent not passing output of task 1 as input to task 2 in a chain

Open mingfang opened this issue 2 years ago • 5 comments

Given a task called echo like this

@app.task
def echo(*args, **kwargs):
    print('echo {} {}'.format(args, kwargs))
    return args

And then a workflow like this

celery_workflow = chain(echo.s(1), echo.s(2))

The output of the standard Celery workflow is correct and looks like this

print(celery_workflow.apply_async())
[2022-09-06 03:12:28,534: WARNING/ForkPoolWorker-32] echo (1,) {}
[2022-09-06 03:12:28,550: INFO/MainProcess] Task worker2.echo[a88d5d4a-c531-46c0-9f50-34ef207d54e0] received
[2022-09-06 03:12:28,551: WARNING/ForkPoolWorker-1] echo ([1], 2) {}

But the out of the dyrgent version is wrong like this

workflow = Workflow()
workflow.add_celery_canvas(celery_workflow)
print(workflow.apply_async())
[2022-09-06 03:17:44,812: WARNING/ForkPoolWorker-1] echo (1,) {}
[2022-09-06 03:17:44,813: INFO/MainProcess] Task workflow-processor[c068d1df-f6a7-418f-8dd9-c152e3bbbc18] received
[2022-09-06 03:17:44,817: INFO/ForkPoolWorker-1] Task worker2.echo[664d33a1-c7f8-4b93-aef6-99a43d602576] succeeded in 0.0048081259010359645s: (1,)
[2022-09-06 03:17:44,906: INFO/MainProcess] Events of group {task} enabled by remote.
[2022-09-06 03:17:48,061: INFO/ForkPoolWorker-32] Task 664d33a1-c7f8-4b93-aef6-99a43d602576 is done, success
[2022-09-06 03:17:48,062: INFO/ForkPoolWorker-32] Scheduling execution of task 7913e555-38e4-4052-b1e9-0be756d4f9ba with options {}
[2022-09-06 03:17:48,064: INFO/ForkPoolWorker-32] Tick done, took 0.003532ms, workflow running, waiting for 1 tasks
[2022-09-06 03:17:48,065: INFO/MainProcess] Task worker2.echo[7913e555-38e4-4052-b1e9-0be756d4f9ba] received
[2022-09-06 03:17:48,066: WARNING/ForkPoolWorker-1] echo (2,) {}

Is this a bug or a feature?

mingfang avatar Sep 06 '22 03:09 mingfang

Hello, This is an expected behavior. I never needed this feature of celery and no one has ever requested it. Personally I find the injection of preceding task result as argument of following task a little bit confusing.

It should be fairly easy to implement with some special param not to break backwards compatibility. However I'm not sure how celery works if there are multiple tasks preceding a task. E.g. group() | task

Does it pass all the results as a list?

brabiega avatar Sep 06 '22 11:09 brabiega

@brabiega Yes, for a chord, all the header results are passed to the final task. e.g.

celery_workflow = chord([echo.s(i) for i in range(10)], echo.s())
[2022-09-06 11:58:03,310: INFO/MainProcess] Task worker2.echo[4863db4b-014e-4dbb-ae65-2218849b684b] received
[2022-09-06 11:58:03,311: WARNING/ForkPoolWorker-32] echo (0,) {}
[2022-09-06 11:58:03,314: INFO/ForkPoolWorker-32] Task worker2.echo[4863db4b-014e-4dbb-ae65-2218849b684b] succeeded in 0.0024735210463404655s: (0,)
[2022-09-06 11:58:03,314: INFO/MainProcess] Task worker2.echo[96d18c26-786d-4f8d-993c-e115ea126e53] received
[2022-09-06 11:58:03,317: WARNING/ForkPoolWorker-32] echo (1,) {}
[2022-09-06 11:58:03,321: INFO/ForkPoolWorker-32] Task worker2.echo[96d18c26-786d-4f8d-993c-e115ea126e53] succeeded in 0.005151060991920531s: (1,)
[2022-09-06 11:58:03,321: INFO/MainProcess] Task worker2.echo[d8b1340b-e7cc-4898-ab85-c6fdcdbcc408] received
[2022-09-06 11:58:03,323: WARNING/ForkPoolWorker-32] echo ([[0], [1]],) {}
[2022-09-06 11:58:03,324: INFO/ForkPoolWorker-32] Task worker2.echo[d8b1340b-e7cc-4898-ab85-c6fdcdbcc408] succeeded in 0.0014346520183607936s: ([[0], [1]],)

mingfang avatar Sep 06 '22 11:09 mingfang

I'll see what I can do once I have some spare time.

brabiega avatar Sep 12 '22 10:09 brabiega

Can you show how you would pass data from one task to another? I want to use dyrygent but not being able to pass data between tasks via return data_for_downstream_task is a big blocker.

ClaytonSmith avatar Mar 10 '23 18:03 ClaytonSmith

The easiest way would be to pass list(signature.dependencies) here - this shall pass all task_ids which given task depends on. Then you can fetch the results from celery results DB

https://github.com/ovh/celery-dyrygent/blob/89b19568c3079ab92124d7086db6cd9236285c22/celery_dyrygent/workflows/workflow.py#L408

brabiega avatar Mar 10 '23 22:03 brabiega