pipelines icon indicating copy to clipboard operation
pipelines copied to clipboard

feat(backend): Add Parallelism Limit to ParallelFor tasks. Fixes #8718

Open gmfrasca opened this issue 9 months ago • 24 comments

Description of your changes: Fixes #8718

Adds the Parallelism item to a DAG template if specified by a task's IteratorPolicy (ie ParallelFor w/ a parallelism limit).

Checklist:

gmfrasca avatar May 07 '24 19:05 gmfrasca

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by: Once this PR has been reviewed and has the lgtm label, please assign chensun for approval. For more information see the Kubernetes Code Review Process.

The full list of commands accepted by this bot can be found here.

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment Approvers can cancel approval by writing /approve cancel in a comment

google-oss-prow[bot] avatar May 07 '24 19:05 google-oss-prow[bot]

looks like a CI infra issue?

/retest just to check

gmfrasca avatar May 07 '24 20:05 gmfrasca

@gmfrasca: The /retest command does not accept any targets. The following commands are available to trigger required jobs:

  • /test kfp-kubernetes-test-python310
  • /test kfp-kubernetes-test-python311
  • /test kfp-kubernetes-test-python312
  • /test kfp-kubernetes-test-python38
  • /test kfp-kubernetes-test-python39
  • /test kubeflow-pipeline-backend-test
  • /test kubeflow-pipeline-frontend-test
  • /test kubeflow-pipeline-mkp-snapshot-test
  • /test kubeflow-pipeline-mkp-test
  • /test kubeflow-pipelines-backend-visualization
  • /test kubeflow-pipelines-component-yaml
  • /test kubeflow-pipelines-components-google-cloud-python38
  • /test kubeflow-pipelines-integration-v2
  • /test kubeflow-pipelines-manifests
  • /test kubeflow-pipelines-sdk-docformatter
  • /test kubeflow-pipelines-sdk-execution-tests
  • /test kubeflow-pipelines-sdk-isort
  • /test kubeflow-pipelines-sdk-python310
  • /test kubeflow-pipelines-sdk-python311
  • /test kubeflow-pipelines-sdk-python312
  • /test kubeflow-pipelines-sdk-python38
  • /test kubeflow-pipelines-sdk-python39
  • /test kubeflow-pipelines-sdk-yapf
  • /test test-kfp-runtime-code-python310
  • /test test-kfp-runtime-code-python311
  • /test test-kfp-runtime-code-python312
  • /test test-kfp-runtime-code-python38
  • /test test-kfp-runtime-code-python39
  • /test test-run-all-gcpc-modules
  • /test test-upgrade-kfp-sdk

The following commands are available to trigger optional jobs:

  • /test kfp-kubernetes-execution-tests
  • /test kubeflow-pipeline-e2e-test
  • /test kubeflow-pipeline-upgrade-test
  • /test kubeflow-pipeline-upgrade-test-v2
  • /test kubeflow-pipelines-samples-v2

Use /test all to run the following jobs that were automatically triggered:

  • kubeflow-pipeline-backend-test
  • kubeflow-pipeline-e2e-test
  • kubeflow-pipeline-upgrade-test
  • kubeflow-pipeline-upgrade-test-v2
  • kubeflow-pipelines-samples-v2

In response to this:

looks like a CI infra issue?

/retest just to check

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository.

google-oss-prow[bot] avatar May 07 '24 20:05 google-oss-prow[bot]

/retest

gmfrasca avatar May 07 '24 20:05 gmfrasca

tested with this pipeline:

parallelfor.py
from kfp import compiler
from kfp import dsl
from kfp.dsl import Input, InputPath, Output, OutputPath, Dataset, Model, component

@dsl.component(base_image="quay.io/opendatahub/ds-pipelines-ci-executor-image:v1.0")
def preprocess(
        message: Input[str],
        output_model: Output[Model]
):
    import random
    line = "some_model"
    print(f"Message: {message}")
    with open(output_model.path, 'w') as output_file:
        output_file.write('line: {}'.format(line))
    output_model.metadata['accuracy'] = random.uniform(0, 1)


@dsl.component(base_image="quay.io/opendatahub/ds-pipelines-ci-executor-image:v1.0")
def train(
        model: Input[Model],
        epoch: Input[int],
        trained_model: Output[Model],
):
    import random
    line = "some_model"
    print(f"Train for epoch: {epoch}")
    with open(trained_model.path, 'w') as output_file:
        output_file.write('line: {}'.format(line))
    trained_model.metadata['accuracy'] = random.uniform(0, 1)


@dsl.pipeline(pipeline_root='', name='tutorial-data-passing')
def data_passing_pipeline():
    preprocess_task = preprocess(message="dataset").set_caching_options(enable_caching=False)
    with dsl.ParallelFor(items=[1, 5, 10, 25], parallelism=2) as epochs:
        train(model=preprocess_task.outputs['output_model'], epoch=epochs).set_caching_options(enable_caching=False)


if __name__ == '__main__':
    compiler.Compiler().compile(data_passing_pipeline, __file__ + '.yaml')

Worked successfully:

~ $ kubectl -n ${kfp_ns} get workflow tutorial-data-passing-drsfz -o yaml | yq '.spec.templates[-2]'
dag:
  tasks:
    - arguments:
        parameters:
          - name: component
            value: '{{workflow.annotations.pipelines.kubeflow.org/components-comp-for-loop-2}}'
          - name: parent-dag-id
            value: '{{inputs.parameters.parent-dag-id}}'
          - name: task
            value: '{"componentRef":{"name":"comp-for-loop-2"},"dependentTasks":["preprocess"],"inputs":{"artifacts":{"pipelinechannel--preprocess-output_model":{"taskOutputArtifact":{"outputArtifactKey":"output_model","producerTask":"preprocess"}}}},"iteratorPolicy":{"parallelismLimit":2},"parameterIterator":{"itemInput":"pipelinechannel--loop-item-param-1","items":{"raw":"[1, 5, 10, 25]"}},"taskInfo":{"name":"for-loop-2"}}'
      depends: preprocess.Succeeded
      name: for-loop-2-driver
      template: system-dag-driver
    - arguments:
        parameters:
          - name: parent-dag-id
            value: '{{tasks.for-loop-2-driver.outputs.parameters.execution-id}}'
          - name: iteration-index
            value: '{{item}}'
      depends: for-loop-2-driver.Succeeded
      name: for-loop-2-iterations
      template: comp-for-loop-2-for-loop-2
      withSequence:
        count: '{{tasks.for-loop-2-driver.outputs.parameters.iteration-count}}'
    - arguments:
        parameters:
          - name: component
            value: '{{workflow.annotations.pipelines.kubeflow.org/components-comp-preprocess}}'
          - name: task
            value: '{"cachingOptions":{},"componentRef":{"name":"comp-preprocess"},"inputs":{"parameters":{"message":{"runtimeValue":{"constant":"dataset"}}}},"taskInfo":{"name":"preprocess"}}'
          - name: container
            value: '{{workflow.annotations.pipelines.kubeflow.org/implementations-comp-preprocess}}'
          - name: parent-dag-id
            value: '{{inputs.parameters.parent-dag-id}}'
      name: preprocess-driver
      template: system-container-driver
    - arguments:
        parameters:
          - name: pod-spec-patch
            value: '{{tasks.preprocess-driver.outputs.parameters.pod-spec-patch}}'
          - default: "false"
            name: cached-decision
            value: '{{tasks.preprocess-driver.outputs.parameters.cached-decision}}'
      depends: preprocess-driver.Succeeded
      name: preprocess
      template: system-container-executor
inputs:
  parameters:
    - name: parent-dag-id
metadata:
  annotations:
    sidecar.istio.io/inject: "false"
name: root
outputs: {}
parallelism: 2

Note the:

parallelism: 2

The UI feedback on this could be better:

image

Currently all 4 iterations show executing at the same time (they also never really show the done checkmark even once they finish). But this problem existed prior to this, and seems out of scope for this task, and should be in a follow up issue.

However I confirmed the pods are scheduled 2 at a time in this example (since parallelism = 2 in this example).

HumairAK avatar May 09 '24 15:05 HumairAK

Hrmm testing it with the above pipeline amended to:

@dsl.pipeline(pipeline_root='', name='tutorial-data-passing')
def data_passing_pipeline():
    preprocess_task = preprocess(message="dataset").set_caching_options(enable_caching=False)
    with dsl.ParallelFor(items=[1, 5, 10, 25], parallelism=2) as epochs:
        train(model=preprocess_task.outputs['output_model'], epoch=epochs).set_caching_options(enable_caching=False)

    with dsl.ParallelFor(items=[6, 12, 24, 48], parallelism=4) as epochs:
        train(model=preprocess_task.outputs['output_model'], epoch=epochs).set_caching_options(enable_caching=False)

It looks like the workflow will use parallelism = 4 for both

HumairAK avatar May 09 '24 17:05 HumairAK

as @HumairAK noted it looks like there's a problem when there are multiple ParallelFor components in a single DAG, as the parallelism mechanism applies to the entire DAG, and as it stands there's no way to assign various parallelismLimits to individual tasks within a DAG template.

The obvious solution/workaround for this is to add another layer of abstraction on iterator tasks, (ie, root DAG calls a new "XYZ-iterator" DAG, which contains the withSequence iterator and parallelism limit value), but this will make the DAG a bit more complex than it already is, and I'm not sure at this moment what the consequences of updating the workflow path to this may be. will need a bit to investigate.

gmfrasca avatar May 09 '24 21:05 gmfrasca

as @HumairAK noted it looks like there's a problem when there are multiple ParallelFor components in a single DAG, as the parallelism mechanism applies to the entire DAG, and as it stands there's no way to assign various parallelismLimits to individual tasks within a DAG template.

The obvious solution/workaround for this is to add another layer of abstraction on iterator tasks, (ie, root DAG calls a new "XYZ-iterator" DAG, which contains the withSequence iterator and parallelism limit value), but this will make the DAG a bit more complex than it already is, and I'm not sure at this moment what the consequences of updating the workflow path to this may be. will need a bit to investigate.

Hi @gmfrasca, I think it's really important to have a limit that applies to the individual for loop, not the entire DAG. Have you considered using Argo's implementation of loop parallelism? Using that might even simplify the DAG. However, implementing this could lead to issues with other backends.

hsteude avatar May 10 '24 12:05 hsteude

Hey @hsteude - so this implementation actually already leverages the Argo loop parallelism mechanism. The issue here is that the current compiled architecture of a pipeline aggregates all KFP pipeline steps into sequential tasks of a top-level root DAG Template, but the finest granularity you can specify that limit is at the Template level, not an individual DAGTask. Essentially, we do not have the concept of parallelism on a per-step basis to use in this current state.

The workaround/DAG re-architecture I mentioned above would bump out each of these steps to call their own intermediate Template, each time with its own DAG and iterator, and this template would simply call the component Template itself. With that, we could then specify individual parallelism limits for individual steps, since they are now encapsulated in a Template, at the cost of introducing another layer of abstraction/templating

gmfrasca avatar May 10 '24 17:05 gmfrasca

@gmfrasca: The following tests failed, say /retest to rerun all failed tests or /retest-required to rerun all mandatory failed tests:

Test name Commit Details Required Rerun command
kubeflow-pipelines-samples-v2 978065002c89cc3e8de88aa4790ceee1cf8e98ab link false /test kubeflow-pipelines-samples-v2
kubeflow-pipeline-upgrade-test 978065002c89cc3e8de88aa4790ceee1cf8e98ab link false /test kubeflow-pipeline-upgrade-test
kubeflow-pipeline-e2e-test 978065002c89cc3e8de88aa4790ceee1cf8e98ab link false /test kubeflow-pipeline-e2e-test

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here.

google-oss-prow[bot] avatar May 13 '24 22:05 google-oss-prow[bot]

/test kubeflow-pipelines-samples-v2

gmfrasca avatar May 23 '24 18:05 gmfrasca

@gmfrasca: The following tests failed, say /retest to rerun all failed tests or /retest-required to rerun all mandatory failed tests:

Test name Commit Details Required Rerun command
kubeflow-pipeline-e2e-test 978065002c89cc3e8de88aa4790ceee1cf8e98ab link false /test kubeflow-pipeline-e2e-test
kubeflow-pipelines-samples-v2 fb8a3ffb68ec1824564b75800580c0f1914e7067 link false /test kubeflow-pipelines-samples-v2
kubeflow-pipeline-upgrade-test fb8a3ffb68ec1824564b75800580c0f1914e7067 link false /test kubeflow-pipeline-upgrade-test

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here.

google-oss-prow[bot] avatar Jun 27 '24 16:06 google-oss-prow[bot]

/lgtm

rimolive avatar Jun 28 '24 10:06 rimolive

I reviewed this PR a month ago and this PR is still opened. I think if would be good having one more lgtm to ensure this PR is still good to merge.

rimolive avatar Jul 31 '24 19:07 rimolive

/hold

does hold work here? I'd like to review this before it's merged :smile:

gregsheremeta avatar Jul 31 '24 20:07 gregsheremeta

they also never really show the done checkmark even once they finish

I've seen this in other pipelines too -- ones without loops. Might be anything with a sub-DAG.

gregsheremeta avatar Jul 31 '24 20:07 gregsheremeta

I'm caught off guard a bit because it looks like ParallelFor was mostly already working, except for the limit? And this PR fixes the limit and that's about it, right?

Is #8718 the correct Issue to fix, then? Should it be tweaked to call out that only parallelization limit was left? Or should 8718 be closed and a new Issue opened?

gregsheremeta avatar Jul 31 '24 22:07 gregsheremeta

While running this PR in an attempt to validate it and understand it better, I found that I can reliably cause an apiserver panic. The same pipeline works fine on both upstream master of https://github.com/kubeflow/pipelines and https://github.com/opendatahub-io/data-science-pipelines.

test pipeline (adapted from what Humair posted above):

from kfp import compiler
from kfp import dsl
from kfp.dsl import Input, Output, Model

@dsl.component(base_image="python:3.12")
def preprocess(
        message: Input[str],
        output_model: Output[Model]
):
    line = "useful_data"
    print(f"Message: {message}")
    with open(output_model.path, 'w') as output_file:
        output_file.write('line: {}'.format(line))

@dsl.component(base_image="python:3.12")
def train(
        model: Input[Model],
        counter: Input[int],
        trained_model: Output[Model],
):
    import random
    line = "some_model"
    print(f"train loop counter: {counter}")
    with open(trained_model.path, 'w') as output_file:
        output_file.write('line: {}'.format(line))
    trained_model.metadata['accuracy'] = random.uniform(0, 1)

@dsl.pipeline(pipeline_root='', name='pl-1')
def parallel_limit_test_pipeline():
    preprocess_task = preprocess(message="hello-world").set_caching_options(enable_caching=False)
    
    with dsl.ParallelFor(items=[1, 2, 3, 4], parallelism=2) as counter:
        train(model=preprocess_task.outputs['output_model'], counter=counter).set_caching_options(enable_caching=False)

if __name__ == '__main__':
    compiler.Compiler().compile(parallel_limit_test_pipeline, __file__ + '.yaml')

compiled version of that pipeline:

# PIPELINE DEFINITION
# Name: pl-1
components:
  comp-for-loop-2:
    dag:
      tasks:
        train:
          cachingOptions: {}
          componentRef:
            name: comp-train
          inputs:
            artifacts:
              model:
                componentInputArtifact: pipelinechannel--preprocess-output_model
            parameters:
              counter:
                componentInputParameter: pipelinechannel--loop-item-param-1
          taskInfo:
            name: train
    inputDefinitions:
      artifacts:
        pipelinechannel--preprocess-output_model:
          artifactType:
            schemaTitle: system.Model
            schemaVersion: 0.0.1
      parameters:
        pipelinechannel--loop-item-param-1:
          parameterType: NUMBER_INTEGER
  comp-preprocess:
    executorLabel: exec-preprocess
    inputDefinitions:
      parameters:
        message:
          parameterType: STRING
    outputDefinitions:
      artifacts:
        output_model:
          artifactType:
            schemaTitle: system.Model
            schemaVersion: 0.0.1
  comp-train:
    executorLabel: exec-train
    inputDefinitions:
      artifacts:
        model:
          artifactType:
            schemaTitle: system.Model
            schemaVersion: 0.0.1
      parameters:
        counter:
          parameterType: NUMBER_INTEGER
    outputDefinitions:
      artifacts:
        trained_model:
          artifactType:
            schemaTitle: system.Model
            schemaVersion: 0.0.1
deploymentSpec:
  executors:
    exec-preprocess:
      container:
        args:
        - --executor_input
        - '{{$}}'
        - --function_to_execute
        - preprocess
        command:
        - sh
        - -c
        - "\nif ! [ -x \"$(command -v pip)\" ]; then\n    python3 -m ensurepip ||\
          \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
          \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.7.0'\
          \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
          $0\" \"$@\"\n"
        - sh
        - -ec
        - 'program_path=$(mktemp -d)


          printf "%s" "$0" > "$program_path/ephemeral_component.py"

          _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main                         --component_module_path                         "$program_path/ephemeral_component.py"                         "$@"

          '
        - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
          \ *\nfrom builtins import str\n\ndef preprocess(\n        message: Input[str],\n\
          \        output_model: Output[Model]\n):\n    line = \"useful_data\"\n \
          \   print(f\"Message: {message}\")\n    with open(output_model.path, 'w')\
          \ as output_file:\n        output_file.write('line: {}'.format(line))\n\n"
        image: python:3.12
    exec-train:
      container:
        args:
        - --executor_input
        - '{{$}}'
        - --function_to_execute
        - train
        command:
        - sh
        - -c
        - "\nif ! [ -x \"$(command -v pip)\" ]; then\n    python3 -m ensurepip ||\
          \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
          \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.7.0'\
          \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
          $0\" \"$@\"\n"
        - sh
        - -ec
        - 'program_path=$(mktemp -d)


          printf "%s" "$0" > "$program_path/ephemeral_component.py"

          _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main                         --component_module_path                         "$program_path/ephemeral_component.py"                         "$@"

          '
        - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
          \ *\nfrom builtins import int\n\ndef train(\n        model: Input[Model],\n\
          \        counter: Input[int],\n        trained_model: Output[Model],\n):\n\
          \    import random\n    line = \"some_model\"\n    print(f\"train loop counter:\
          \ {counter}\")\n    with open(trained_model.path, 'w') as output_file:\n\
          \        output_file.write('line: {}'.format(line))\n    trained_model.metadata['accuracy']\
          \ = random.uniform(0, 1)\n\n"
        image: python:3.12
pipelineInfo:
  name: pl-1
root:
  dag:
    tasks:
      for-loop-2:
        componentRef:
          name: comp-for-loop-2
        dependentTasks:
        - preprocess
        inputs:
          artifacts:
            pipelinechannel--preprocess-output_model:
              taskOutputArtifact:
                outputArtifactKey: output_model
                producerTask: preprocess
        iteratorPolicy:
          parallelismLimit: 2
        parameterIterator:
          itemInput: pipelinechannel--loop-item-param-1
          items:
            raw: '[1, 2, 3, 4]'
        taskInfo:
          name: for-loop-2
      preprocess:
        cachingOptions: {}
        componentRef:
          name: comp-preprocess
        inputs:
          parameters:
            message:
              runtimeValue:
                constant: hello-world
        taskInfo:
          name: preprocess
schemaVersion: 2.1.0
sdkVersion: kfp-2.7.0

The pipeline uploads fine and renders fine. However, when I run it, I get the following error in the UI:

Run creation failed
{"error":"Failed to create a new run: InternalServerError: Failed to validate workflow for (): 
templates.entrypoint.tasks.root templates.root.tasks.for-loop-2-loop templates.comp-for-loop-2-loop-for-loop-2 
sorting failed: invalid dependency preprocess","code":13,"message":"Failed to create a new run: InternalServerError: 
Failed to validate workflow for (): templates.entrypoint.tasks.root templates.root.tasks.for-loop-2-loop 
templates.comp-for-loop-2-loop-for-loop-2 sorting failed: invalid dependency preprocess","details":
[{"@type":"type.googleapis.com/google.rpc.Status","code":13,"message":"Internal Server Error"}]}

I can see the following in the apiserver logs:


I0801 17:54:36.656761       1 interceptor.go:29] /kubeflow.pipelines.backend.api.v2beta1.RunService/CreateRun handler starting
I0801 17:54:36.670814       1 error.go:278] templates.entrypoint.tasks.root templates.root.tasks.for-loop-2-loop templates.comp-for-loop-2-loop-for-loop-2 sorting failed: invalid dependency preprocess
InternalServerError: Failed to validate workflow for ()
github.com/kubeflow/pipelines/backend/src/common/util.NewInternalServerError
	/opt/app-root/src/backend/src/common/util/error.go:144
github.com/kubeflow/pipelines/backend/src/apiserver/resource.(*ResourceManager).CreateRun
	/opt/app-root/src/backend/src/apiserver/resource/resource_manager.go:516
github.com/kubeflow/pipelines/backend/src/apiserver/server.(*RunServer).createRun
	/opt/app-root/src/backend/src/apiserver/server/run_server.go:131
github.com/kubeflow/pipelines/backend/src/apiserver/server.(*RunServer).CreateRun
	/opt/app-root/src/backend/src/apiserver/server/run_server.go:500
github.com/kubeflow/pipelines/backend/api/v2beta1/go_client._RunService_CreateRun_Handler.func1
	/opt/app-root/src/backend/api/v2beta1/go_client/run.pb.go:2711
main.apiServerInterceptor
	/opt/app-root/src/backend/src/apiserver/interceptor.go:30
github.com/kubeflow/pipelines/backend/api/v2beta1/go_client._RunService_CreateRun_Handler
	/opt/app-root/src/backend/api/v2beta1/go_client/run.pb.go:2713
google.golang.org/grpc.(*Server).processUnaryRPC
	/opt/app-root/src/go/pkg/mod/google.golang.org/[email protected]/server.go:1335
google.golang.org/grpc.(*Server).handleStream
	/opt/app-root/src/go/pkg/mod/google.golang.org/[email protected]/server.go:1712
google.golang.org/grpc.(*Server).serveStreams.func1.1
	/opt/app-root/src/go/pkg/mod/google.golang.org/[email protected]/server.go:947
runtime.goexit
	/usr/lib/golang/src/runtime/asm_amd64.s:1650
Failed to create a new run
github.com/kubeflow/pipelines/backend/src/common/util.(*UserError).wrap
	/opt/app-root/src/backend/src/common/util/error.go:271
github.com/kubeflow/pipelines/backend/src/common/util.Wrap
	/opt/app-root/src/backend/src/common/util/error.go:350
github.com/kubeflow/pipelines/backend/src/apiserver/server.(*RunServer).CreateRun
	/opt/app-root/src/backend/src/apiserver/server/run_server.go:502
github.com/kubeflow/pipelines/backend/api/v2beta1/go_client._RunService_CreateRun_Handler.func1
	/opt/app-root/src/backend/api/v2beta1/go_client/run.pb.go:2711
main.apiServerInterceptor
	/opt/app-root/src/backend/src/apiserver/interceptor.go:30
github.com/kubeflow/pipelines/backend/api/v2beta1/go_client._RunService_CreateRun_Handler
	/opt/app-root/src/backend/api/v2beta1/go_client/run.pb.go:2713
google.golang.org/grpc.(*Server).processUnaryRPC
	/opt/app-root/src/go/pkg/mod/google.golang.org/[email protected]/server.go:1335
google.golang.org/grpc.(*Server).handleStream
	/opt/app-root/src/go/pkg/mod/google.golang.org/[email protected]/server.go:1712
google.golang.org/grpc.(*Server).serveStreams.func1.1
	/opt/app-root/src/go/pkg/mod/google.golang.org/[email protected]/server.go:947
runtime.goexit
	/usr/lib/golang/src/runtime/asm_amd64.s:1650
/kubeflow.pipelines.backend.api.v2beta1.RunService/CreateRun call failed
github.com/kubeflow/pipelines/backend/src/common/util.(*UserError).wrapf
	/opt/app-root/src/backend/src/common/util/error.go:266
github.com/kubeflow/pipelines/backend/src/common/util.Wrapf
	/opt/app-root/src/backend/src/common/util/error.go:337
main.apiServerInterceptor
	/opt/app-root/src/backend/src/apiserver/interceptor.go:32
github.com/kubeflow/pipelines/backend/api/v2beta1/go_client._RunService_CreateRun_Handler
	/opt/app-root/src/backend/api/v2beta1/go_client/run.pb.go:2713
google.golang.org/grpc.(*Server).processUnaryRPC
	/opt/app-root/src/go/pkg/mod/google.golang.org/[email protected]/server.go:1335
google.golang.org/grpc.(*Server).handleStream
	/opt/app-root/src/go/pkg/mod/google.golang.org/[email protected]/server.go:1712
google.golang.org/grpc.(*Server).serveStreams.func1.1
	/opt/app-root/src/go/pkg/mod/google.golang.org/[email protected]/server.go:947
runtime.goexit
	/usr/lib/golang/src/runtime/asm_amd64.s:1650

removing the parallelism=2 like so doesn't help:

    with dsl.ParallelFor(items=[1, 2, 3, 4]) as counter:
        train(model=preprocess_task.outputs['output_model'], counter=counter).set_caching_options(enable_caching=False)

removing the loop entirely does help, i.e. no more panic:

    train(model=preprocess_task.outputs['output_model'], counter=1).set_caching_options(enable_caching=False)

leaving the hold in place :)

gregsheremeta avatar Aug 01 '24 18:08 gregsheremeta

I was able to replicate the problem - this occurs when a ParallelFor task has a dependantTask before it, and the new extra layer of abstraction meant the DAG Driver was not pointed at the root task list and therefore could not detect the external dependencies. Also, the 'iterator' task in the abstracted DAG itself was using the Argo depends functionality which also only works with tasks within the same DAG; since the root task also leverages this it is safe to remove from the sub-DAG templates.

Pushing an update that should address the problem @gregsheremeta found. An in-depth re-review would be greatly appreciated given the complexity of the problem

gmfrasca avatar Sep 06 '24 23:09 gmfrasca

New changes are detected. LGTM label has been removed.

google-oss-prow[bot] avatar Sep 06 '24 23:09 google-oss-prow[bot]

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by: Once this PR has been reviewed and has the lgtm label, please ask for approval from rimolive. For more information see the Kubernetes Code Review Process.

The full list of commands accepted by this bot can be found here.

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment Approvers can cancel approval by writing /approve cancel in a comment

google-oss-prow[bot] avatar Sep 06 '24 23:09 google-oss-prow[bot]

/retest

gmfrasca avatar Sep 11 '24 15:09 gmfrasca

/rerun-all

rimolive avatar Sep 12 '24 22:09 rimolive

ready for re-review per KFP community meeting last week

[Giulio] Parallellism limits PR/ Re-review Updated to handle template dependency and DAG parameter passing correctly In-depth review requested, particularly if anyone has expertise in the DAG/template generation from argocompiler, in case any other functionality was not accounted for

gregsheremeta avatar Sep 19 '24 12:09 gregsheremeta

@gmfrasca this pipeline failed for the 2nd loop:

pipeline.py
from kfp import compiler
from kfp import dsl
from kfp.dsl import Input, InputPath, Output, OutputPath, Dataset, Model, component

@dsl.component(base_image="quay.io/opendatahub/ds-pipelines-ci-executor-image:v1.0")
def preprocess(
        message: Input[str],
        output_model: Output[Model]
):
    import random
    line = "some_model"
    print(f"Message: {message}")
    with open(output_model.path, 'w') as output_file:
        output_file.write('line: {}'.format(line))
    output_model.metadata['accuracy'] = random.uniform(0, 1)


@dsl.component(base_image="quay.io/opendatahub/ds-pipelines-ci-executor-image:v1.0")
def train(
        model: Input[Model],
        epoch: Input[int],
        trained_model: Output[Model],
):
    import random
    line = "some_model"
    print(f"Train for epoch: {epoch}")
    with open(trained_model.path, 'w') as output_file:
        output_file.write('line: {}'.format(line))
    trained_model.metadata['accuracy'] = random.uniform(0, 1)


@dsl.pipeline(pipeline_root='', name='tutorial-data-passing')
def data_passing_pipeline():
    preprocess_task = preprocess(message="dataset").set_caching_options(enable_caching=False)
    with dsl.ParallelFor(items=[1, 5, 10, 25], parallelism=2) as epochs:
        train(model=preprocess_task.outputs['output_model'], epoch=epochs).set_caching_options(enable_caching=False)

    with dsl.ParallelFor(items=[6, 12, 24, 48], parallelism=4) as epochs:
        train(model=preprocess_task.outputs['output_model'], epoch=epochs).set_caching_options(enable_caching=False)


if __name__ == '__main__':
    compiler.Compiler().compile(data_passing_pipeline, __file__ + '.yaml')

With the error in a dag driver pod:

F1004 15:35:39.425156 18 main.go:79] KFP driver: driver.DAG(pipelineName=tutorial-data-passing, runID=45604c64-db2d-4936-8c4b-39fda3827564, task="for-loop-4", component="comp-for-loop-4", dagExecutionID=164, componentSpec) failed: failed to resolve inputs: failed to resolve input artifact pipelinechannel--preprocess-output_model with spec task_output_artifact:{producer_task:"preprocess" output_artifact_key:"output_model"}: failed to get executions in DAG(executionID=164): two tasks have the same task name "for-loop-2", id1=166 id2=168

Seems like there is are multiple executions writing the same task name This pipeline worked on the code in the master branch, can you take another look?

HumairAK avatar Oct 04 '24 16:10 HumairAK

I'm thinking we should make some of these pipelines into integration tests to ensure we're catching these cases

HumairAK avatar Oct 04 '24 18:10 HumairAK