celery-dyrygent
celery-dyrygent copied to clipboard
dyrgent not passing output of task 1 as input to task 2 in a chain
Given a task called echo
like this
@app.task
def echo(*args, **kwargs):
print('echo {} {}'.format(args, kwargs))
return args
And then a workflow like this
celery_workflow = chain(echo.s(1), echo.s(2))
The output of the standard Celery workflow is correct and looks like this
print(celery_workflow.apply_async())
[2022-09-06 03:12:28,534: WARNING/ForkPoolWorker-32] echo (1,) {}
[2022-09-06 03:12:28,550: INFO/MainProcess] Task worker2.echo[a88d5d4a-c531-46c0-9f50-34ef207d54e0] received
[2022-09-06 03:12:28,551: WARNING/ForkPoolWorker-1] echo ([1], 2) {}
But the out of the dyrgent version is wrong like this
workflow = Workflow()
workflow.add_celery_canvas(celery_workflow)
print(workflow.apply_async())
[2022-09-06 03:17:44,812: WARNING/ForkPoolWorker-1] echo (1,) {}
[2022-09-06 03:17:44,813: INFO/MainProcess] Task workflow-processor[c068d1df-f6a7-418f-8dd9-c152e3bbbc18] received
[2022-09-06 03:17:44,817: INFO/ForkPoolWorker-1] Task worker2.echo[664d33a1-c7f8-4b93-aef6-99a43d602576] succeeded in 0.0048081259010359645s: (1,)
[2022-09-06 03:17:44,906: INFO/MainProcess] Events of group {task} enabled by remote.
[2022-09-06 03:17:48,061: INFO/ForkPoolWorker-32] Task 664d33a1-c7f8-4b93-aef6-99a43d602576 is done, success
[2022-09-06 03:17:48,062: INFO/ForkPoolWorker-32] Scheduling execution of task 7913e555-38e4-4052-b1e9-0be756d4f9ba with options {}
[2022-09-06 03:17:48,064: INFO/ForkPoolWorker-32] Tick done, took 0.003532ms, workflow running, waiting for 1 tasks
[2022-09-06 03:17:48,065: INFO/MainProcess] Task worker2.echo[7913e555-38e4-4052-b1e9-0be756d4f9ba] received
[2022-09-06 03:17:48,066: WARNING/ForkPoolWorker-1] echo (2,) {}
Is this a bug or a feature?
Hello, This is an expected behavior. I never needed this feature of celery and no one has ever requested it. Personally I find the injection of preceding task result as argument of following task a little bit confusing.
It should be fairly easy to implement with some special param not to break backwards compatibility.
However I'm not sure how celery works if there are multiple tasks preceding a task.
E.g. group() | task
Does it pass all the results as a list?
@brabiega Yes, for a chord, all the header results are passed to the final task. e.g.
celery_workflow = chord([echo.s(i) for i in range(10)], echo.s())
[2022-09-06 11:58:03,310: INFO/MainProcess] Task worker2.echo[4863db4b-014e-4dbb-ae65-2218849b684b] received
[2022-09-06 11:58:03,311: WARNING/ForkPoolWorker-32] echo (0,) {}
[2022-09-06 11:58:03,314: INFO/ForkPoolWorker-32] Task worker2.echo[4863db4b-014e-4dbb-ae65-2218849b684b] succeeded in 0.0024735210463404655s: (0,)
[2022-09-06 11:58:03,314: INFO/MainProcess] Task worker2.echo[96d18c26-786d-4f8d-993c-e115ea126e53] received
[2022-09-06 11:58:03,317: WARNING/ForkPoolWorker-32] echo (1,) {}
[2022-09-06 11:58:03,321: INFO/ForkPoolWorker-32] Task worker2.echo[96d18c26-786d-4f8d-993c-e115ea126e53] succeeded in 0.005151060991920531s: (1,)
[2022-09-06 11:58:03,321: INFO/MainProcess] Task worker2.echo[d8b1340b-e7cc-4898-ab85-c6fdcdbcc408] received
[2022-09-06 11:58:03,323: WARNING/ForkPoolWorker-32] echo ([[0], [1]],) {}
[2022-09-06 11:58:03,324: INFO/ForkPoolWorker-32] Task worker2.echo[d8b1340b-e7cc-4898-ab85-c6fdcdbcc408] succeeded in 0.0014346520183607936s: ([[0], [1]],)
I'll see what I can do once I have some spare time.
Can you show how you would pass data from one task to another? I want to use dyrygent but not being able to pass data between tasks via return data_for_downstream_task
is a big blocker.
The easiest way would be to pass list(signature.dependencies)
here - this shall pass all task_ids which given task depends on.
Then you can fetch the results from celery results DB
https://github.com/ovh/celery-dyrygent/blob/89b19568c3079ab92124d7086db6cd9236285c22/celery_dyrygent/workflows/workflow.py#L408