hera
hera copied to clipboard
Creating a nested parallel DAG
Hi,
I am trying to understand the parallelism syntax for Hera but right now I am unable to understand if nested parallelism can work. What I mean is the following. Supposed I have a generator task and then multiple consumer tasks that must run one after the other in parallel. For example, the following.
def generate() -> None:
import json
import sys
chunk_number_list = list(range(0, 10))
json.dump([{"chunk_number": chunk_number} for chunk_number in chunk_number_list], sys.stdout)
def consumeA(chunk_number) -> None:
print(f"A {chunk_number}")
def consumeB(chunk_number) -> None:
print(f"B {chunk_number}")
generate_task = Task(
name="generate",
func=generate,
image=image,
)
consume_taskA = Task(
name="consumeA",
func=consumeA,
input_from=InputFrom(name="generate", parameters=["chunk_number"]),
image=image,
)
consume_taskB = Task(
name="consumeB",
func=consumeB,
input_from=InputFrom(name="generate", parameters=["chunk_number"]),
image=image,
)
generate_task.next(consume_taskA)
consume_taskA.next(consume_taskB)
workflow.add_tasks(generate_task, consume_taskA, consume_taskB)
What I want is that after doing the generate task, the workflow should fan out and then within each parallel task, it should do A and B serially. However, what is actually happening is that it first fans out for A, waits until completion and then again fans out for B.
Please tell me if the question is not clear or you need any other information. Thanks for the help.
Hello @vikramsg! That is something that is possible in Argo Workflows since fanning out over InputFrom
uses a single template, and a single template can point to an independent DAG, which can represent a series of templates/tasks to execute on the fanned out output from a task. However, this is not something that is currently supported by Hera. I think it would require something like a TaskChain
concept, which would be almost a pseudo-workflow representation. With that users could do something like
def f1(...): ...
def f2(...): ...
def f3(...): ...
generate = Task(...)
consume = TaskChain([f1, f2, f3], input_from=InputFrom(generate.name, ...))
generate >> consume
and that would translate into the structure you would like to use. Does that sound like something that would solve the problem for you?
Hi @flaviuvadan , Thanks for looking at this. Yes, I think that would solve the issue,
Going to investigate this as part of the 4.0.0 release, after 3.6.0 (3.6.0 is getting quite loaded)
I think the 'TaskChain' approach is great but should be more versatile. Instead of receiving a function list it should receive a task list and generate a templated DAG.
Coming in https://github.com/argoproj-labs/hera-workflows/pull/253!
Thanks. Is there an example of how this works?
Yes! This one: https://github.com/argoproj-labs/hera-workflows/blob/main/examples/parallel_dag.py
Sorry for raising this issue again, but this link does not work. https://github.com/argoproj-labs/hera-workflows/blob/main/examples/parallel_dag.py
Is the example present with some other name?
Will this work something like this. I create a DAG with B>>C>>D
and B has a with_param
which it gets from A
that allows it to fan out?
Hey @vikramsg! This one should work! https://hera.readthedocs.io/en/stable/examples/workflows/upstream/parallelism_nested_dag/?h=parallel
Hi @flaviuvadan ,
Thanks. What is confusing to me is how to fanout the outer DAG. For eg. the following is exactly what I want and it is done with Steps. And it seems one of the step is itself Steps and it supports 'withParam'. Can we do this in Hera?
https://stackoverflow.com/a/70972819
Here's an example of how I think that could work out!
from hera.workflows import Workflow, Steps, script, Parameter, Artifact, models as m
@script(outputs=Parameter(name="items", value_from=m.ValueFrom(path="/tmp/items.json")))
def A():
with open("/tmp/items.json", "w") as f:
f.write("[1, 2, 3]")
@script(outputs=Artifact(name="file", path="/tmp/file"))
def B(item: int):
with open("/tmp/file", "w") as f:
f.write(str(item))
@script(inputs=Artifact(name="file", path="/tmp/file"))
def C():
with open("/tmp/file", "r") as f:
print(f.read())
@script()
def D():
print(42)
with Workflow(name="test", entrypoint="main") as w:
with Steps(name="B_C", inputs=Parameter(name="item")) as BC:
B(arguments={"item": "{{inputs.parameters.item}}"})
C(arguments={"file": "{{steps.B.outputs.artifacts.file}}"})
with Steps(name="main") as main:
A()
BC(with_param="{{steps.A.outputs.parameters.items}}", arguments={"item": "{{item}}"})
D()
print(w.to_yaml())