argo-workflows icon indicating copy to clipboard operation
argo-workflows copied to clipboard

aggregate or separately process the output artifacts of parallel pods

Open clumsy456 opened this issue 4 years ago • 17 comments

Summary

If a dag/task includes withItems/withParam parameter, we are not able to resolve its output artifacts, as is shown in #1625. I suggest a syntax to aggregate or separately process the output artifacts of parallel pods.

Use Cases

There are two situations to use the output artifacts of parallel pods.

separately process

        |---> Pod1_1 ---|---> Pod2_1
begin --|---> Pod1_2 ---|---> Pod2_2
        |---> Pod1_3 ---|---> Pod2_3
  • Task1 includes withItems/withParam parameters, and 3 pods executed actually.
  • Task2 will also create 3 pods, and each pod will process the previous output artifacts of task1 one-on-one.

I suggest task2 should include withParam parameter which refers to the output of task1, so the number of pods of task2 will be equal to that of task1.

spec:
  entrypoint: dag
  templates:
  - name: dag
    dag:
      tasks:
      # generate output artifacts of parallel pods
      - name: gen
        template: gen-artifact
        arguments:
          parameters:
          - name: message
            value: "{{item}}"
        withItems: ["1", "2", "3"]
      # process only one path
      - name: deal-path1
        template: deal-one-path
        arguments:
          artifacts:
          - name: path
            from: "{{item}}"
        withParam: "{{tasks.gen.outputs.artifacts.path1}}"
      # process multi path
      - name: deal-paths
        template: deal-two-path
        arguments:
          artifacts:
          - name: path1
            from: "{{item.path1}}"
          - name: path2
            from: "{{item.path2}}"
        withParam: "{{tasks.gen.outputs.artifacts}}"
      # process param and path
      - name: deal-param-path
        template: deal-param-path
        arguments:
          parameters:
          - name: param
            value: "{{item.parameters.param}}"
          artifacts:
          - name: path
            value: "{{item.artifacts.path1}}"
        withParam: "{{tasks.gen.outputs}}"

  - name: gen-artifact
    inputs:
      parameters:
      - name: message
        ...
    outputs:
      parameters:
      - name: param
        ...
      artifacts:
      - name: path1
        ...
      - name: path2
        ...
  - name: deal-one-path
    inputs:
      artifacts:
      - name: path
        ...
  - name: deal-two-path
    inputs:
      artifacts:
      - name: path1
        ...
      - name: path2
        ...
  - name: deal-param-path
    inputs:
      parameters:
      - name: param
        ...
      artifacts:
      - name: path
        ...
  • if withParam is {{tasks.A.outputs.artifacts.path}}, we can use {{item}} to refer the output artifact;
  • if withParam is {{tasks.A.outputs.artifacts}}, we can use {{item.path1}} and {{item.path2}} to refer to multi output artifacts of one pod;
  • if withParam is {{tasks.A.outputs}}, we can use {{item.parameters.param}} and {{item.artifacts.path}} to refer to output parameter and artifact of one pod.

aggregate

        |---> Pod1_1 ---|
begin --|---> Pod1_2 ---|---> Pod2
        |---> Pod1_3 ---|
  • Task1 includes withItems/withParam parameters, and 3 pods executed actually.
  • Task2 will aggregate all the output artifacts of the 3 pods in one pod.

I suggest add fromMulti and pathMulti field to tasks.arguments.artifacts and templates.inputs.artifacts seperately.

spec:
  entrypoint: dag
  templates:
  - name: dag
    dag:
      tasks:
      # generate output artifacts of parallel pods
      - name: gen
        template: gen-artifact
        arguments:
          parameters:
          - name: message
            value: "{{item}}"
        withItems: ["1", "2", "3"]
      # aggregate
      - name: aggregate
        template: aggregate
        arguments:
          artifacts:
          - name: paths
            fromMulti: "{{tasks.gen.outputs.artifacts.path}}"

  - name: gen-artifact
    inputs:
      parameters:
      - name: message
        ...
    outputs:
      artifacts:
      - name: path
        ...
  - name: aggregate
    inputs:
      artifacts:
      - name: paths
        pathMulti: /tmp/{{index}}_path
        ...

pathMulti is just a template with {{index}}. The actual path in the aggregation pod, will be /tmp/0_path, /tmp/1_path and /tmp/2_path.


Message from the maintainers:

Impacted by this bug? Give it a 👍. We prioritise the issues with the most 👍.

clumsy456 avatar Sep 27 '21 13:09 clumsy456

I think this one of the intended uses of key-only artifacts.

Could you achieve this by writing each output to a unique key within bucket storage, and have the aggregating/grouping pod can then just list the key within the bucket to find the needed files:

spec:
  entrypoint: dag
  templates:
  - name: dag
    dag:
      tasks:
      # generate output artifacts of parallel pods
      - name: gen
        template: gen-artifact
        arguments:
          parameters:
          - name: message
            value: "{{item}}"
        withItems: ["1", "2", "3"]
        outputs:
          artifacts:
           - name: output
              path: /output
              s3:
                key: "{{workflow.name}}/{{item}}"
      # aggregate
      - name: aggregate
        template: aggregate

  - name: gen-artifact
    inputs:
        ...
    outputs:
      artifacts:
      - name: output
         path: /output
        ...
  - name: aggregate
    inputs:
      artifacts:
      - name: input
         path: /inputs
           s3:
              key: "{{workflow.name}}"

The use of key: "{{workflow.name}}/{{item}}" for generation and then key: "{{workflow.name}}" for aggregation is the key.

alexec avatar Nov 04 '21 02:11 alexec

I think this one of the intended uses of key-only artifacts.

Could you achieve this by writing each output to a unique key within bucket storage, and have the aggregating/grouping pod can then just list the key within the bucket to find the needed files:

spec:
  entrypoint: dag
  templates:
  - name: dag
    dag:
      tasks:
      # generate output artifacts of parallel pods
      - name: gen
        template: gen-artifact
        arguments:
          parameters:
          - name: message
            value: "{{item}}"
        withItems: ["1", "2", "3"]
        outputs:
          artifacts:
           - name: output
              path: /output
              s3:
                key: "{{workflow.name}}/{{item}}"
      # aggregate
      - name: aggregate
        template: aggregate

  - name: gen-artifact
    inputs:
        ...
    outputs:
      artifacts:
      - name: output
         path: /output
        ...
  - name: aggregate
    inputs:
      artifacts:
      - name: input
         path: /inputs
           s3:
              key: "{{workflow.name}}"

The use of key: "{{workflow.name}}/{{item}}" for generation and then key: "{{workflow.name}}" for aggregation is the key.

Thanks for your reply!

  • As far as I know, there is no outputs field in the task struct. I'm not sure if you meant to put s3.key into templates[gen-artifact].outputs.artifacts
  • In our usual use, we tend to use templateRef in the dag type template, and put the container type template into an individual WorkflowTemplate, so that different workflows can use the same container and its inputs and outputs. If we use s3.key and {{workflow.name}} in an container type template, we cannot achieve the decoupling described above.

jixinchi avatar Nov 04 '21 02:11 jixinchi

Sorry for replying to you with my company account. "jixinchi" is my company account and "clumsy456" is my personal account. I should have used my company account to raise this issue.

jixinchi avatar Nov 04 '21 02:11 jixinchi

You are correct, it is in the template:


spec:
  entrypoint: dag
  templates:
  - name: dag
    dag:
      tasks:
      # generate output artifacts of parallel pods
      - name: gen
        template: gen-artifact
        arguments:
          parameters:
          - name: message
            value: "{{item}}"
        withItems: ["1", "2", "3"]
      # aggregate
      - name: aggregate
        template: aggregate

  - name: gen-artifact
    inputs:
        ...

    outputs:
          artifacts:
           - name: output
              path: /output
              s3:
                key: "{{workflow.name}}/{{item}}"
        ...
  - name: aggregate
    inputs:
      artifacts:
      - name: input
         path: /inputs
           s3:
              key: "{{workflow.name}}"

alexec avatar Nov 10 '21 17:11 alexec

I'm not sure if the {{workflow.name}} and {{item}} tag work when using templateRef. I'll try it later.

What we discussed above is only about the "aggregate" case. Do you have any comments on the "separately process" case?

jixinchi avatar Nov 17 '21 03:11 jixinchi

I think most patterns should be supported using key-only artifacts.

alexec avatar Nov 17 '21 17:11 alexec

You are correct, it is in the template:

spec:
  entrypoint: dag
  templates:
  - name: dag
    dag:
      tasks:
      # generate output artifacts of parallel pods
      - name: gen
        template: gen-artifact
        arguments:
          parameters:
          - name: message
            value: "{{item}}"
        withItems: ["1", "2", "3"]
      # aggregate
      - name: aggregate
        template: aggregate

  - name: gen-artifact
    inputs:
        ...

    outputs:
          artifacts:
           - name: output
              path: /output
              s3:
                key: "{{workflow.name}}/{{item}}"
        ...
  - name: aggregate
    inputs:
      artifacts:
      - name: input
         path: /inputs
           s3:
              key: "{{workflow.name}}"

I've tried this workflow. The {{item}} tag does not work outside the dag task, because the gen-artifact outputs a file named "{{item}}".

Key-only artifacts only supports cases that you have already known how many artifacts will be generated and how to user them before running the workflow. It is not suitable for WithParam and WithItems cases.

jixinchi avatar Nov 18 '21 07:11 jixinchi

Do you want to set-up a 30m to chat?

https://bit.ly/book-30m-with-argo-team

alexec avatar Nov 18 '21 16:11 alexec

I am from China, and it's 00:10 now... In addition, I'm sorry that my English is not good enough to speak or talk about technical problems. Maybe text communication is more effective.

jixinchi avatar Nov 18 '21 16:11 jixinchi

My email is [email protected]. Or you can find me on slack, named Xinchi Ji.

jixinchi avatar Nov 19 '21 06:11 jixinchi

What about using data template with S3? That allows you to drop a number of artifacts into a bucket, then use withItems to fan-out process them.

https://argoproj.github.io/argo-workflows/data-sourcing-and-transformation/

alexec avatar Nov 23 '21 01:11 alexec

Data template is really a good method to solve similar problems. However, the expression is limited in our business. We need to run some python or R scripts in a container to process the artifacts, not just simple text processing.

jixinchi avatar Nov 23 '21 06:11 jixinchi

I think that is not what data template does. It basically allows you to list artifacts, and then start a new process for each artifact.

alexec avatar Nov 23 '21 16:11 alexec

As is shown in the doc you provided, data template can only process each artifact by methods limited in the exporession. Therefore, data template cannot support our business.

jixinchi avatar Nov 24 '21 09:11 jixinchi

@clumsy456 Hi, I used the way you mentioned. But I get the error sometimes.

image

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: artifact-passing-
spec:
  entrypoint: artifact-example
  templates:
  - name: artifact-example
    steps:
    - - name: generate-artifact
        template: whalesay
    - - name: consume-artifact
        template: print-message
        arguments:
          artifacts:
          # bind message to the hello-art artifact
          # generated by the generate-artifact step
          - name: message
            from: "{{steps.generate-artifact.outputs.artifacts.hello-art}}"
  - name: whalesay
    container:
      image: docker/whalesay:latest
      command: [sh, -c]
      args: ["cowsay hello world | tee /tmp/hello_world.txt"]
      volumeMounts:
        - name: out
          mountPath: /tmp
    volumes:
      - name: out
        emptyDir: { }
    outputs:
      artifacts:
      # generate hello-art artifact from /tmp/hello_world.txt
      # artifacts can be directories as well as files
      - name: hello-art
        path: /tmp/hello_world.txt

  - name: print-message
    inputs:
      artifacts:
      # unpack the message input artifact
      # and put it at /tmp/message
      - name: message
        path: /tmp/message
    container:
      image: alpine:latest
      command: [sh, -c]
      args: ["cat /tmp/message"]


chenbodeng719 avatar Jun 29 '22 08:06 chenbodeng719

@clumsy456 Hi, I used the way you mentioned. But I get the error sometimes.

image

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: artifact-passing-
spec:
  entrypoint: artifact-example
  templates:
  - name: artifact-example
    steps:
    - - name: generate-artifact
        template: whalesay
    - - name: consume-artifact
        template: print-message
        arguments:
          artifacts:
          # bind message to the hello-art artifact
          # generated by the generate-artifact step
          - name: message
            from: "{{steps.generate-artifact.outputs.artifacts.hello-art}}"
  - name: whalesay
    container:
      image: docker/whalesay:latest
      command: [sh, -c]
      args: ["cowsay hello world | tee /tmp/hello_world.txt"]
      volumeMounts:
        - name: out
          mountPath: /tmp
    volumes:
      - name: out
        emptyDir: { }
    outputs:
      artifacts:
      # generate hello-art artifact from /tmp/hello_world.txt
      # artifacts can be directories as well as files
      - name: hello-art
        path: /tmp/hello_world.txt

  - name: print-message
    inputs:
      artifacts:
      # unpack the message input artifact
      # and put it at /tmp/message
      - name: message
        path: /tmp/message
    container:
      image: alpine:latest
      command: [sh, -c]
      args: ["cat /tmp/message"]

Above is just my proposal, not implemented...

clumsy456 avatar Jun 29 '22 12:06 clumsy456

Is there a workaround for this problem?

The issue summary indicates: "If a dag/task includes withItems/withParam parameter, we are not able to resolve its output artifacts"

Does that mean dag/tasks don't work with withItems / withParam, but steps do?

I see https://github.com/argoproj/argo-workflows/pull/6899 has been closed without merging, so I'm wondering what I can do to get withItems / withParam loops to work with artifact passing.

davidghiurco avatar Jul 29 '22 02:07 davidghiurco