pipelines
pipelines copied to clipboard
feat(backend): Add Parallelism Limit to ParallelFor tasks. Fixes #8718
Description of your changes: Fixes #8718
Adds the Parallelism
item to a DAG template if specified by a task's IteratorPolicy (ie ParallelFor w/ a parallelism limit).
Checklist:
- [x] The title for your pull request (PR) should follow our title convention. Learn more about the pull request title convention used in this repository.
[APPROVALNOTIFIER] This PR is NOT APPROVED
This pull-request has been approved by: Once this PR has been reviewed and has the lgtm label, please assign chensun for approval. For more information see the Kubernetes Code Review Process.
The full list of commands accepted by this bot can be found here.
Approvers can indicate their approval by writing /approve
in a comment
Approvers can cancel approval by writing /approve cancel
in a comment
looks like a CI infra issue?
/retest just to check
@gmfrasca: The /retest
command does not accept any targets.
The following commands are available to trigger required jobs:
-
/test kfp-kubernetes-test-python310
-
/test kfp-kubernetes-test-python311
-
/test kfp-kubernetes-test-python312
-
/test kfp-kubernetes-test-python38
-
/test kfp-kubernetes-test-python39
-
/test kubeflow-pipeline-backend-test
-
/test kubeflow-pipeline-frontend-test
-
/test kubeflow-pipeline-mkp-snapshot-test
-
/test kubeflow-pipeline-mkp-test
-
/test kubeflow-pipelines-backend-visualization
-
/test kubeflow-pipelines-component-yaml
-
/test kubeflow-pipelines-components-google-cloud-python38
-
/test kubeflow-pipelines-integration-v2
-
/test kubeflow-pipelines-manifests
-
/test kubeflow-pipelines-sdk-docformatter
-
/test kubeflow-pipelines-sdk-execution-tests
-
/test kubeflow-pipelines-sdk-isort
-
/test kubeflow-pipelines-sdk-python310
-
/test kubeflow-pipelines-sdk-python311
-
/test kubeflow-pipelines-sdk-python312
-
/test kubeflow-pipelines-sdk-python38
-
/test kubeflow-pipelines-sdk-python39
-
/test kubeflow-pipelines-sdk-yapf
-
/test test-kfp-runtime-code-python310
-
/test test-kfp-runtime-code-python311
-
/test test-kfp-runtime-code-python312
-
/test test-kfp-runtime-code-python38
-
/test test-kfp-runtime-code-python39
-
/test test-run-all-gcpc-modules
-
/test test-upgrade-kfp-sdk
The following commands are available to trigger optional jobs:
-
/test kfp-kubernetes-execution-tests
-
/test kubeflow-pipeline-e2e-test
-
/test kubeflow-pipeline-upgrade-test
-
/test kubeflow-pipeline-upgrade-test-v2
-
/test kubeflow-pipelines-samples-v2
Use /test all
to run the following jobs that were automatically triggered:
-
kubeflow-pipeline-backend-test
-
kubeflow-pipeline-e2e-test
-
kubeflow-pipeline-upgrade-test
-
kubeflow-pipeline-upgrade-test-v2
-
kubeflow-pipelines-samples-v2
In response to this:
looks like a CI infra issue?
/retest just to check
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository.
/retest
tested with this pipeline:
parallelfor.py
from kfp import compiler
from kfp import dsl
from kfp.dsl import Input, InputPath, Output, OutputPath, Dataset, Model, component
@dsl.component(base_image="quay.io/opendatahub/ds-pipelines-ci-executor-image:v1.0")
def preprocess(
message: Input[str],
output_model: Output[Model]
):
import random
line = "some_model"
print(f"Message: {message}")
with open(output_model.path, 'w') as output_file:
output_file.write('line: {}'.format(line))
output_model.metadata['accuracy'] = random.uniform(0, 1)
@dsl.component(base_image="quay.io/opendatahub/ds-pipelines-ci-executor-image:v1.0")
def train(
model: Input[Model],
epoch: Input[int],
trained_model: Output[Model],
):
import random
line = "some_model"
print(f"Train for epoch: {epoch}")
with open(trained_model.path, 'w') as output_file:
output_file.write('line: {}'.format(line))
trained_model.metadata['accuracy'] = random.uniform(0, 1)
@dsl.pipeline(pipeline_root='', name='tutorial-data-passing')
def data_passing_pipeline():
preprocess_task = preprocess(message="dataset").set_caching_options(enable_caching=False)
with dsl.ParallelFor(items=[1, 5, 10, 25], parallelism=2) as epochs:
train(model=preprocess_task.outputs['output_model'], epoch=epochs).set_caching_options(enable_caching=False)
if __name__ == '__main__':
compiler.Compiler().compile(data_passing_pipeline, __file__ + '.yaml')
Worked successfully:
~ $ kubectl -n ${kfp_ns} get workflow tutorial-data-passing-drsfz -o yaml | yq '.spec.templates[-2]'
dag:
tasks:
- arguments:
parameters:
- name: component
value: '{{workflow.annotations.pipelines.kubeflow.org/components-comp-for-loop-2}}'
- name: parent-dag-id
value: '{{inputs.parameters.parent-dag-id}}'
- name: task
value: '{"componentRef":{"name":"comp-for-loop-2"},"dependentTasks":["preprocess"],"inputs":{"artifacts":{"pipelinechannel--preprocess-output_model":{"taskOutputArtifact":{"outputArtifactKey":"output_model","producerTask":"preprocess"}}}},"iteratorPolicy":{"parallelismLimit":2},"parameterIterator":{"itemInput":"pipelinechannel--loop-item-param-1","items":{"raw":"[1, 5, 10, 25]"}},"taskInfo":{"name":"for-loop-2"}}'
depends: preprocess.Succeeded
name: for-loop-2-driver
template: system-dag-driver
- arguments:
parameters:
- name: parent-dag-id
value: '{{tasks.for-loop-2-driver.outputs.parameters.execution-id}}'
- name: iteration-index
value: '{{item}}'
depends: for-loop-2-driver.Succeeded
name: for-loop-2-iterations
template: comp-for-loop-2-for-loop-2
withSequence:
count: '{{tasks.for-loop-2-driver.outputs.parameters.iteration-count}}'
- arguments:
parameters:
- name: component
value: '{{workflow.annotations.pipelines.kubeflow.org/components-comp-preprocess}}'
- name: task
value: '{"cachingOptions":{},"componentRef":{"name":"comp-preprocess"},"inputs":{"parameters":{"message":{"runtimeValue":{"constant":"dataset"}}}},"taskInfo":{"name":"preprocess"}}'
- name: container
value: '{{workflow.annotations.pipelines.kubeflow.org/implementations-comp-preprocess}}'
- name: parent-dag-id
value: '{{inputs.parameters.parent-dag-id}}'
name: preprocess-driver
template: system-container-driver
- arguments:
parameters:
- name: pod-spec-patch
value: '{{tasks.preprocess-driver.outputs.parameters.pod-spec-patch}}'
- default: "false"
name: cached-decision
value: '{{tasks.preprocess-driver.outputs.parameters.cached-decision}}'
depends: preprocess-driver.Succeeded
name: preprocess
template: system-container-executor
inputs:
parameters:
- name: parent-dag-id
metadata:
annotations:
sidecar.istio.io/inject: "false"
name: root
outputs: {}
parallelism: 2
Note the:
parallelism: 2
The UI feedback on this could be better:
Currently all 4 iterations show executing at the same time (they also never really show the done checkmark even once they finish). But this problem existed prior to this, and seems out of scope for this task, and should be in a follow up issue.
However I confirmed the pods are scheduled 2 at a time in this example (since parallelism = 2 in this example).
Hrmm testing it with the above pipeline amended to:
@dsl.pipeline(pipeline_root='', name='tutorial-data-passing')
def data_passing_pipeline():
preprocess_task = preprocess(message="dataset").set_caching_options(enable_caching=False)
with dsl.ParallelFor(items=[1, 5, 10, 25], parallelism=2) as epochs:
train(model=preprocess_task.outputs['output_model'], epoch=epochs).set_caching_options(enable_caching=False)
with dsl.ParallelFor(items=[6, 12, 24, 48], parallelism=4) as epochs:
train(model=preprocess_task.outputs['output_model'], epoch=epochs).set_caching_options(enable_caching=False)
It looks like the workflow will use parallelism = 4
for both
as @HumairAK noted it looks like there's a problem when there are multiple ParallelFor components in a single DAG, as the parallelism
mechanism applies to the entire DAG, and as it stands there's no way to assign various parallelismLimits to individual tasks within a DAG template.
The obvious solution/workaround for this is to add another layer of abstraction on iterator tasks, (ie, root DAG calls a new "XYZ-iterator" DAG, which contains the withSequence
iterator and parallelism limit value), but this will make the DAG a bit more complex than it already is, and I'm not sure at this moment what the consequences of updating the workflow path to this may be. will need a bit to investigate.
as @HumairAK noted it looks like there's a problem when there are multiple ParallelFor components in a single DAG, as the
parallelism
mechanism applies to the entire DAG, and as it stands there's no way to assign various parallelismLimits to individual tasks within a DAG template.The obvious solution/workaround for this is to add another layer of abstraction on iterator tasks, (ie, root DAG calls a new "XYZ-iterator" DAG, which contains the
withSequence
iterator and parallelism limit value), but this will make the DAG a bit more complex than it already is, and I'm not sure at this moment what the consequences of updating the workflow path to this may be. will need a bit to investigate.
Hi @gmfrasca, I think it's really important to have a limit that applies to the individual for loop, not the entire DAG. Have you considered using Argo's implementation of loop parallelism? Using that might even simplify the DAG. However, implementing this could lead to issues with other backends.
Hey @hsteude - so this implementation actually already leverages the Argo loop parallelism mechanism. The issue here is that the current compiled architecture of a pipeline aggregates all KFP pipeline steps into sequential task
s of a top-level root
DAG Template, but the finest granularity you can specify that limit is at the Template level, not an individual DAGTask. Essentially, we do not have the concept of parallelism on a per-step basis to use in this current state.
The workaround/DAG re-architecture I mentioned above would bump out each of these steps
to call their own intermediate Template, each time with its own DAG and iterator, and this template would simply call the component Template itself. With that, we could then specify individual parallelism limits for individual steps, since they are now encapsulated in a Template, at the cost of introducing another layer of abstraction/templating
@gmfrasca: The following tests failed, say /retest
to rerun all failed tests or /retest-required
to rerun all mandatory failed tests:
Test name | Commit | Details | Required | Rerun command |
---|---|---|---|---|
kubeflow-pipelines-samples-v2 | 978065002c89cc3e8de88aa4790ceee1cf8e98ab | link | false | /test kubeflow-pipelines-samples-v2 |
kubeflow-pipeline-upgrade-test | 978065002c89cc3e8de88aa4790ceee1cf8e98ab | link | false | /test kubeflow-pipeline-upgrade-test |
kubeflow-pipeline-e2e-test | 978065002c89cc3e8de88aa4790ceee1cf8e98ab | link | false | /test kubeflow-pipeline-e2e-test |
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here.
/test kubeflow-pipelines-samples-v2
@gmfrasca: The following tests failed, say /retest
to rerun all failed tests or /retest-required
to rerun all mandatory failed tests:
Test name | Commit | Details | Required | Rerun command |
---|---|---|---|---|
kubeflow-pipeline-e2e-test | 978065002c89cc3e8de88aa4790ceee1cf8e98ab | link | false | /test kubeflow-pipeline-e2e-test |
kubeflow-pipelines-samples-v2 | fb8a3ffb68ec1824564b75800580c0f1914e7067 | link | false | /test kubeflow-pipelines-samples-v2 |
kubeflow-pipeline-upgrade-test | fb8a3ffb68ec1824564b75800580c0f1914e7067 | link | false | /test kubeflow-pipeline-upgrade-test |
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here.
/lgtm
I reviewed this PR a month ago and this PR is still opened. I think if would be good having one more lgtm
to ensure this PR is still good to merge.
/hold
does hold work here? I'd like to review this before it's merged :smile:
they also never really show the done checkmark even once they finish
I've seen this in other pipelines too -- ones without loops. Might be anything with a sub-DAG.
I'm caught off guard a bit because it looks like ParallelFor was mostly already working, except for the limit? And this PR fixes the limit and that's about it, right?
Is #8718 the correct Issue to fix, then? Should it be tweaked to call out that only parallelization limit was left? Or should 8718 be closed and a new Issue opened?
While running this PR in an attempt to validate it and understand it better, I found that I can reliably cause an apiserver panic. The same pipeline works fine on both upstream master of https://github.com/kubeflow/pipelines and https://github.com/opendatahub-io/data-science-pipelines.
test pipeline (adapted from what Humair posted above):
from kfp import compiler
from kfp import dsl
from kfp.dsl import Input, Output, Model
@dsl.component(base_image="python:3.12")
def preprocess(
message: Input[str],
output_model: Output[Model]
):
line = "useful_data"
print(f"Message: {message}")
with open(output_model.path, 'w') as output_file:
output_file.write('line: {}'.format(line))
@dsl.component(base_image="python:3.12")
def train(
model: Input[Model],
counter: Input[int],
trained_model: Output[Model],
):
import random
line = "some_model"
print(f"train loop counter: {counter}")
with open(trained_model.path, 'w') as output_file:
output_file.write('line: {}'.format(line))
trained_model.metadata['accuracy'] = random.uniform(0, 1)
@dsl.pipeline(pipeline_root='', name='pl-1')
def parallel_limit_test_pipeline():
preprocess_task = preprocess(message="hello-world").set_caching_options(enable_caching=False)
with dsl.ParallelFor(items=[1, 2, 3, 4], parallelism=2) as counter:
train(model=preprocess_task.outputs['output_model'], counter=counter).set_caching_options(enable_caching=False)
if __name__ == '__main__':
compiler.Compiler().compile(parallel_limit_test_pipeline, __file__ + '.yaml')
compiled version of that pipeline:
# PIPELINE DEFINITION
# Name: pl-1
components:
comp-for-loop-2:
dag:
tasks:
train:
cachingOptions: {}
componentRef:
name: comp-train
inputs:
artifacts:
model:
componentInputArtifact: pipelinechannel--preprocess-output_model
parameters:
counter:
componentInputParameter: pipelinechannel--loop-item-param-1
taskInfo:
name: train
inputDefinitions:
artifacts:
pipelinechannel--preprocess-output_model:
artifactType:
schemaTitle: system.Model
schemaVersion: 0.0.1
parameters:
pipelinechannel--loop-item-param-1:
parameterType: NUMBER_INTEGER
comp-preprocess:
executorLabel: exec-preprocess
inputDefinitions:
parameters:
message:
parameterType: STRING
outputDefinitions:
artifacts:
output_model:
artifactType:
schemaTitle: system.Model
schemaVersion: 0.0.1
comp-train:
executorLabel: exec-train
inputDefinitions:
artifacts:
model:
artifactType:
schemaTitle: system.Model
schemaVersion: 0.0.1
parameters:
counter:
parameterType: NUMBER_INTEGER
outputDefinitions:
artifacts:
trained_model:
artifactType:
schemaTitle: system.Model
schemaVersion: 0.0.1
deploymentSpec:
executors:
exec-preprocess:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- preprocess
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.7.0'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\nfrom builtins import str\n\ndef preprocess(\n message: Input[str],\n\
\ output_model: Output[Model]\n):\n line = \"useful_data\"\n \
\ print(f\"Message: {message}\")\n with open(output_model.path, 'w')\
\ as output_file:\n output_file.write('line: {}'.format(line))\n\n"
image: python:3.12
exec-train:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- train
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.7.0'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\nfrom builtins import int\n\ndef train(\n model: Input[Model],\n\
\ counter: Input[int],\n trained_model: Output[Model],\n):\n\
\ import random\n line = \"some_model\"\n print(f\"train loop counter:\
\ {counter}\")\n with open(trained_model.path, 'w') as output_file:\n\
\ output_file.write('line: {}'.format(line))\n trained_model.metadata['accuracy']\
\ = random.uniform(0, 1)\n\n"
image: python:3.12
pipelineInfo:
name: pl-1
root:
dag:
tasks:
for-loop-2:
componentRef:
name: comp-for-loop-2
dependentTasks:
- preprocess
inputs:
artifacts:
pipelinechannel--preprocess-output_model:
taskOutputArtifact:
outputArtifactKey: output_model
producerTask: preprocess
iteratorPolicy:
parallelismLimit: 2
parameterIterator:
itemInput: pipelinechannel--loop-item-param-1
items:
raw: '[1, 2, 3, 4]'
taskInfo:
name: for-loop-2
preprocess:
cachingOptions: {}
componentRef:
name: comp-preprocess
inputs:
parameters:
message:
runtimeValue:
constant: hello-world
taskInfo:
name: preprocess
schemaVersion: 2.1.0
sdkVersion: kfp-2.7.0
The pipeline uploads fine and renders fine. However, when I run it, I get the following error in the UI:
Run creation failed
{"error":"Failed to create a new run: InternalServerError: Failed to validate workflow for ():
templates.entrypoint.tasks.root templates.root.tasks.for-loop-2-loop templates.comp-for-loop-2-loop-for-loop-2
sorting failed: invalid dependency preprocess","code":13,"message":"Failed to create a new run: InternalServerError:
Failed to validate workflow for (): templates.entrypoint.tasks.root templates.root.tasks.for-loop-2-loop
templates.comp-for-loop-2-loop-for-loop-2 sorting failed: invalid dependency preprocess","details":
[{"@type":"type.googleapis.com/google.rpc.Status","code":13,"message":"Internal Server Error"}]}
I can see the following in the apiserver logs:
I0801 17:54:36.656761 1 interceptor.go:29] /kubeflow.pipelines.backend.api.v2beta1.RunService/CreateRun handler starting
I0801 17:54:36.670814 1 error.go:278] templates.entrypoint.tasks.root templates.root.tasks.for-loop-2-loop templates.comp-for-loop-2-loop-for-loop-2 sorting failed: invalid dependency preprocess
InternalServerError: Failed to validate workflow for ()
github.com/kubeflow/pipelines/backend/src/common/util.NewInternalServerError
/opt/app-root/src/backend/src/common/util/error.go:144
github.com/kubeflow/pipelines/backend/src/apiserver/resource.(*ResourceManager).CreateRun
/opt/app-root/src/backend/src/apiserver/resource/resource_manager.go:516
github.com/kubeflow/pipelines/backend/src/apiserver/server.(*RunServer).createRun
/opt/app-root/src/backend/src/apiserver/server/run_server.go:131
github.com/kubeflow/pipelines/backend/src/apiserver/server.(*RunServer).CreateRun
/opt/app-root/src/backend/src/apiserver/server/run_server.go:500
github.com/kubeflow/pipelines/backend/api/v2beta1/go_client._RunService_CreateRun_Handler.func1
/opt/app-root/src/backend/api/v2beta1/go_client/run.pb.go:2711
main.apiServerInterceptor
/opt/app-root/src/backend/src/apiserver/interceptor.go:30
github.com/kubeflow/pipelines/backend/api/v2beta1/go_client._RunService_CreateRun_Handler
/opt/app-root/src/backend/api/v2beta1/go_client/run.pb.go:2713
google.golang.org/grpc.(*Server).processUnaryRPC
/opt/app-root/src/go/pkg/mod/google.golang.org/[email protected]/server.go:1335
google.golang.org/grpc.(*Server).handleStream
/opt/app-root/src/go/pkg/mod/google.golang.org/[email protected]/server.go:1712
google.golang.org/grpc.(*Server).serveStreams.func1.1
/opt/app-root/src/go/pkg/mod/google.golang.org/[email protected]/server.go:947
runtime.goexit
/usr/lib/golang/src/runtime/asm_amd64.s:1650
Failed to create a new run
github.com/kubeflow/pipelines/backend/src/common/util.(*UserError).wrap
/opt/app-root/src/backend/src/common/util/error.go:271
github.com/kubeflow/pipelines/backend/src/common/util.Wrap
/opt/app-root/src/backend/src/common/util/error.go:350
github.com/kubeflow/pipelines/backend/src/apiserver/server.(*RunServer).CreateRun
/opt/app-root/src/backend/src/apiserver/server/run_server.go:502
github.com/kubeflow/pipelines/backend/api/v2beta1/go_client._RunService_CreateRun_Handler.func1
/opt/app-root/src/backend/api/v2beta1/go_client/run.pb.go:2711
main.apiServerInterceptor
/opt/app-root/src/backend/src/apiserver/interceptor.go:30
github.com/kubeflow/pipelines/backend/api/v2beta1/go_client._RunService_CreateRun_Handler
/opt/app-root/src/backend/api/v2beta1/go_client/run.pb.go:2713
google.golang.org/grpc.(*Server).processUnaryRPC
/opt/app-root/src/go/pkg/mod/google.golang.org/[email protected]/server.go:1335
google.golang.org/grpc.(*Server).handleStream
/opt/app-root/src/go/pkg/mod/google.golang.org/[email protected]/server.go:1712
google.golang.org/grpc.(*Server).serveStreams.func1.1
/opt/app-root/src/go/pkg/mod/google.golang.org/[email protected]/server.go:947
runtime.goexit
/usr/lib/golang/src/runtime/asm_amd64.s:1650
/kubeflow.pipelines.backend.api.v2beta1.RunService/CreateRun call failed
github.com/kubeflow/pipelines/backend/src/common/util.(*UserError).wrapf
/opt/app-root/src/backend/src/common/util/error.go:266
github.com/kubeflow/pipelines/backend/src/common/util.Wrapf
/opt/app-root/src/backend/src/common/util/error.go:337
main.apiServerInterceptor
/opt/app-root/src/backend/src/apiserver/interceptor.go:32
github.com/kubeflow/pipelines/backend/api/v2beta1/go_client._RunService_CreateRun_Handler
/opt/app-root/src/backend/api/v2beta1/go_client/run.pb.go:2713
google.golang.org/grpc.(*Server).processUnaryRPC
/opt/app-root/src/go/pkg/mod/google.golang.org/[email protected]/server.go:1335
google.golang.org/grpc.(*Server).handleStream
/opt/app-root/src/go/pkg/mod/google.golang.org/[email protected]/server.go:1712
google.golang.org/grpc.(*Server).serveStreams.func1.1
/opt/app-root/src/go/pkg/mod/google.golang.org/[email protected]/server.go:947
runtime.goexit
/usr/lib/golang/src/runtime/asm_amd64.s:1650
removing the parallelism=2
like so doesn't help:
with dsl.ParallelFor(items=[1, 2, 3, 4]) as counter:
train(model=preprocess_task.outputs['output_model'], counter=counter).set_caching_options(enable_caching=False)
removing the loop entirely does help, i.e. no more panic:
train(model=preprocess_task.outputs['output_model'], counter=1).set_caching_options(enable_caching=False)
leaving the hold in place :)
I was able to replicate the problem - this occurs when a ParallelFor task has a dependantTask before it, and the new extra layer of abstraction meant the DAG Driver was not pointed at the root task list and therefore could not detect the external dependencies. Also, the 'iterator' task in the abstracted DAG itself was using the Argo depends
functionality which also only works with tasks within the same DAG; since the root task also leverages this it is safe to remove from the sub-DAG templates.
Pushing an update that should address the problem @gregsheremeta found. An in-depth re-review would be greatly appreciated given the complexity of the problem
New changes are detected. LGTM label has been removed.
[APPROVALNOTIFIER] This PR is NOT APPROVED
This pull-request has been approved by: Once this PR has been reviewed and has the lgtm label, please ask for approval from rimolive. For more information see the Kubernetes Code Review Process.
The full list of commands accepted by this bot can be found here.
Approvers can indicate their approval by writing /approve
in a comment
Approvers can cancel approval by writing /approve cancel
in a comment
/retest
/rerun-all
ready for re-review per KFP community meeting last week
[Giulio] Parallellism limits PR/ Re-review Updated to handle template dependency and DAG parameter passing correctly In-depth review requested, particularly if anyone has expertise in the DAG/template generation from argocompiler, in case any other functionality was not accounted for
@gmfrasca this pipeline failed for the 2nd loop:
pipeline.py
from kfp import compiler
from kfp import dsl
from kfp.dsl import Input, InputPath, Output, OutputPath, Dataset, Model, component
@dsl.component(base_image="quay.io/opendatahub/ds-pipelines-ci-executor-image:v1.0")
def preprocess(
message: Input[str],
output_model: Output[Model]
):
import random
line = "some_model"
print(f"Message: {message}")
with open(output_model.path, 'w') as output_file:
output_file.write('line: {}'.format(line))
output_model.metadata['accuracy'] = random.uniform(0, 1)
@dsl.component(base_image="quay.io/opendatahub/ds-pipelines-ci-executor-image:v1.0")
def train(
model: Input[Model],
epoch: Input[int],
trained_model: Output[Model],
):
import random
line = "some_model"
print(f"Train for epoch: {epoch}")
with open(trained_model.path, 'w') as output_file:
output_file.write('line: {}'.format(line))
trained_model.metadata['accuracy'] = random.uniform(0, 1)
@dsl.pipeline(pipeline_root='', name='tutorial-data-passing')
def data_passing_pipeline():
preprocess_task = preprocess(message="dataset").set_caching_options(enable_caching=False)
with dsl.ParallelFor(items=[1, 5, 10, 25], parallelism=2) as epochs:
train(model=preprocess_task.outputs['output_model'], epoch=epochs).set_caching_options(enable_caching=False)
with dsl.ParallelFor(items=[6, 12, 24, 48], parallelism=4) as epochs:
train(model=preprocess_task.outputs['output_model'], epoch=epochs).set_caching_options(enable_caching=False)
if __name__ == '__main__':
compiler.Compiler().compile(data_passing_pipeline, __file__ + '.yaml')
With the error in a dag driver pod:
F1004 15:35:39.425156 18 main.go:79] KFP driver: driver.DAG(pipelineName=tutorial-data-passing, runID=45604c64-db2d-4936-8c4b-39fda3827564, task="for-loop-4", component="comp-for-loop-4", dagExecutionID=164, componentSpec) failed: failed to resolve inputs: failed to resolve input artifact pipelinechannel--preprocess-output_model with spec task_output_artifact:{producer_task:"preprocess" output_artifact_key:"output_model"}: failed to get executions in DAG(executionID=164): two tasks have the same task name "for-loop-2", id1=166 id2=168
Seems like there is are multiple executions writing the same task name This pipeline worked on the code in the master branch, can you take another look?
I'm thinking we should make some of these pipelines into integration tests to ensure we're catching these cases