aggregate or separately process the output artifacts of parallel pods
Summary
If a dag/task includes withItems/withParam parameter, we are not able to resolve its output artifacts, as is shown in #1625. I suggest a syntax to aggregate or separately process the output artifacts of parallel pods.
Use Cases
There are two situations to use the output artifacts of parallel pods.
separately process
|---> Pod1_1 ---|---> Pod2_1
begin --|---> Pod1_2 ---|---> Pod2_2
|---> Pod1_3 ---|---> Pod2_3
- Task1 includes withItems/withParam parameters, and 3 pods executed actually.
- Task2 will also create 3 pods, and each pod will process the previous output artifacts of task1 one-on-one.
I suggest task2 should include withParam parameter which refers to the output of task1, so the number of pods of task2 will be equal to that of task1.
spec:
entrypoint: dag
templates:
- name: dag
dag:
tasks:
# generate output artifacts of parallel pods
- name: gen
template: gen-artifact
arguments:
parameters:
- name: message
value: "{{item}}"
withItems: ["1", "2", "3"]
# process only one path
- name: deal-path1
template: deal-one-path
arguments:
artifacts:
- name: path
from: "{{item}}"
withParam: "{{tasks.gen.outputs.artifacts.path1}}"
# process multi path
- name: deal-paths
template: deal-two-path
arguments:
artifacts:
- name: path1
from: "{{item.path1}}"
- name: path2
from: "{{item.path2}}"
withParam: "{{tasks.gen.outputs.artifacts}}"
# process param and path
- name: deal-param-path
template: deal-param-path
arguments:
parameters:
- name: param
value: "{{item.parameters.param}}"
artifacts:
- name: path
value: "{{item.artifacts.path1}}"
withParam: "{{tasks.gen.outputs}}"
- name: gen-artifact
inputs:
parameters:
- name: message
...
outputs:
parameters:
- name: param
...
artifacts:
- name: path1
...
- name: path2
...
- name: deal-one-path
inputs:
artifacts:
- name: path
...
- name: deal-two-path
inputs:
artifacts:
- name: path1
...
- name: path2
...
- name: deal-param-path
inputs:
parameters:
- name: param
...
artifacts:
- name: path
...
- if
withParamis{{tasks.A.outputs.artifacts.path}}, we can use{{item}}to refer the output artifact; - if
withParamis{{tasks.A.outputs.artifacts}}, we can use{{item.path1}}and{{item.path2}}to refer to multi output artifacts of one pod; - if
withParamis{{tasks.A.outputs}}, we can use{{item.parameters.param}}and{{item.artifacts.path}}to refer to output parameter and artifact of one pod.
aggregate
|---> Pod1_1 ---|
begin --|---> Pod1_2 ---|---> Pod2
|---> Pod1_3 ---|
- Task1 includes withItems/withParam parameters, and 3 pods executed actually.
- Task2 will aggregate all the output artifacts of the 3 pods in one pod.
I suggest add fromMulti and pathMulti field to tasks.arguments.artifacts and templates.inputs.artifacts seperately.
spec:
entrypoint: dag
templates:
- name: dag
dag:
tasks:
# generate output artifacts of parallel pods
- name: gen
template: gen-artifact
arguments:
parameters:
- name: message
value: "{{item}}"
withItems: ["1", "2", "3"]
# aggregate
- name: aggregate
template: aggregate
arguments:
artifacts:
- name: paths
fromMulti: "{{tasks.gen.outputs.artifacts.path}}"
- name: gen-artifact
inputs:
parameters:
- name: message
...
outputs:
artifacts:
- name: path
...
- name: aggregate
inputs:
artifacts:
- name: paths
pathMulti: /tmp/{{index}}_path
...
pathMulti is just a template with {{index}}. The actual path in the aggregation pod, will be /tmp/0_path, /tmp/1_path and /tmp/2_path.
Message from the maintainers:
Impacted by this bug? Give it a 👍. We prioritise the issues with the most 👍.
I think this one of the intended uses of key-only artifacts.
Could you achieve this by writing each output to a unique key within bucket storage, and have the aggregating/grouping pod can then just list the key within the bucket to find the needed files:
spec:
entrypoint: dag
templates:
- name: dag
dag:
tasks:
# generate output artifacts of parallel pods
- name: gen
template: gen-artifact
arguments:
parameters:
- name: message
value: "{{item}}"
withItems: ["1", "2", "3"]
outputs:
artifacts:
- name: output
path: /output
s3:
key: "{{workflow.name}}/{{item}}"
# aggregate
- name: aggregate
template: aggregate
- name: gen-artifact
inputs:
...
outputs:
artifacts:
- name: output
path: /output
...
- name: aggregate
inputs:
artifacts:
- name: input
path: /inputs
s3:
key: "{{workflow.name}}"
The use of key: "{{workflow.name}}/{{item}}" for generation and then key: "{{workflow.name}}" for aggregation is the key.
I think this one of the intended uses of key-only artifacts.
Could you achieve this by writing each output to a unique key within bucket storage, and have the aggregating/grouping pod can then just list the key within the bucket to find the needed files:
spec: entrypoint: dag templates: - name: dag dag: tasks: # generate output artifacts of parallel pods - name: gen template: gen-artifact arguments: parameters: - name: message value: "{{item}}" withItems: ["1", "2", "3"] outputs: artifacts: - name: output path: /output s3: key: "{{workflow.name}}/{{item}}" # aggregate - name: aggregate template: aggregate - name: gen-artifact inputs: ... outputs: artifacts: - name: output path: /output ... - name: aggregate inputs: artifacts: - name: input path: /inputs s3: key: "{{workflow.name}}"The use of
key: "{{workflow.name}}/{{item}}"for generation and thenkey: "{{workflow.name}}"for aggregation is the key.
Thanks for your reply!
- As far as I know, there is no
outputsfield in thetaskstruct. I'm not sure if you meant to puts3.keyintotemplates[gen-artifact].outputs.artifacts - In our usual use, we tend to use
templateRefin the dag type template, and put the container type template into an individualWorkflowTemplate, so that different workflows can use the same container and its inputs and outputs. If we uses3.keyand{{workflow.name}}in an container type template, we cannot achieve the decoupling described above.
Sorry for replying to you with my company account. "jixinchi" is my company account and "clumsy456" is my personal account. I should have used my company account to raise this issue.
You are correct, it is in the template:
spec:
entrypoint: dag
templates:
- name: dag
dag:
tasks:
# generate output artifacts of parallel pods
- name: gen
template: gen-artifact
arguments:
parameters:
- name: message
value: "{{item}}"
withItems: ["1", "2", "3"]
# aggregate
- name: aggregate
template: aggregate
- name: gen-artifact
inputs:
...
outputs:
artifacts:
- name: output
path: /output
s3:
key: "{{workflow.name}}/{{item}}"
...
- name: aggregate
inputs:
artifacts:
- name: input
path: /inputs
s3:
key: "{{workflow.name}}"
I'm not sure if the {{workflow.name}} and {{item}} tag work when using templateRef. I'll try it later.
What we discussed above is only about the "aggregate" case. Do you have any comments on the "separately process" case?
I think most patterns should be supported using key-only artifacts.
You are correct, it is in the template:
spec: entrypoint: dag templates: - name: dag dag: tasks: # generate output artifacts of parallel pods - name: gen template: gen-artifact arguments: parameters: - name: message value: "{{item}}" withItems: ["1", "2", "3"] # aggregate - name: aggregate template: aggregate - name: gen-artifact inputs: ... outputs: artifacts: - name: output path: /output s3: key: "{{workflow.name}}/{{item}}" ... - name: aggregate inputs: artifacts: - name: input path: /inputs s3: key: "{{workflow.name}}"
I've tried this workflow. The {{item}} tag does not work outside the dag task, because the gen-artifact outputs a file named "{{item}}".
Key-only artifacts only supports cases that you have already known how many artifacts will be generated and how to user them before running the workflow. It is not suitable for WithParam and WithItems cases.
Do you want to set-up a 30m to chat?
https://bit.ly/book-30m-with-argo-team
I am from China, and it's 00:10 now... In addition, I'm sorry that my English is not good enough to speak or talk about technical problems. Maybe text communication is more effective.
My email is [email protected]. Or you can find me on slack, named Xinchi Ji.
What about using data template with S3? That allows you to drop a number of artifacts into a bucket, then use withItems to fan-out process them.
https://argoproj.github.io/argo-workflows/data-sourcing-and-transformation/
Data template is really a good method to solve similar problems. However, the expression is limited in our business. We need to run some python or R scripts in a container to process the artifacts, not just simple text processing.
I think that is not what data template does. It basically allows you to list artifacts, and then start a new process for each artifact.
As is shown in the doc you provided, data template can only process each artifact by methods limited in the exporession. Therefore, data template cannot support our business.
@clumsy456 Hi, I used the way you mentioned. But I get the error sometimes.
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: artifact-passing-
spec:
entrypoint: artifact-example
templates:
- name: artifact-example
steps:
- - name: generate-artifact
template: whalesay
- - name: consume-artifact
template: print-message
arguments:
artifacts:
# bind message to the hello-art artifact
# generated by the generate-artifact step
- name: message
from: "{{steps.generate-artifact.outputs.artifacts.hello-art}}"
- name: whalesay
container:
image: docker/whalesay:latest
command: [sh, -c]
args: ["cowsay hello world | tee /tmp/hello_world.txt"]
volumeMounts:
- name: out
mountPath: /tmp
volumes:
- name: out
emptyDir: { }
outputs:
artifacts:
# generate hello-art artifact from /tmp/hello_world.txt
# artifacts can be directories as well as files
- name: hello-art
path: /tmp/hello_world.txt
- name: print-message
inputs:
artifacts:
# unpack the message input artifact
# and put it at /tmp/message
- name: message
path: /tmp/message
container:
image: alpine:latest
command: [sh, -c]
args: ["cat /tmp/message"]
@clumsy456 Hi, I used the way you mentioned. But I get the error sometimes.
![]()
apiVersion: argoproj.io/v1alpha1 kind: Workflow metadata: generateName: artifact-passing- spec: entrypoint: artifact-example templates: - name: artifact-example steps: - - name: generate-artifact template: whalesay - - name: consume-artifact template: print-message arguments: artifacts: # bind message to the hello-art artifact # generated by the generate-artifact step - name: message from: "{{steps.generate-artifact.outputs.artifacts.hello-art}}" - name: whalesay container: image: docker/whalesay:latest command: [sh, -c] args: ["cowsay hello world | tee /tmp/hello_world.txt"] volumeMounts: - name: out mountPath: /tmp volumes: - name: out emptyDir: { } outputs: artifacts: # generate hello-art artifact from /tmp/hello_world.txt # artifacts can be directories as well as files - name: hello-art path: /tmp/hello_world.txt - name: print-message inputs: artifacts: # unpack the message input artifact # and put it at /tmp/message - name: message path: /tmp/message container: image: alpine:latest command: [sh, -c] args: ["cat /tmp/message"]
Above is just my proposal, not implemented...
Is there a workaround for this problem?
The issue summary indicates: "If a dag/task includes withItems/withParam parameter, we are not able to resolve its output artifacts"
Does that mean dag/tasks don't work with withItems / withParam, but steps do?
I see https://github.com/argoproj/argo-workflows/pull/6899 has been closed without merging, so I'm wondering what I can do to get withItems / withParam loops to work with artifact passing.