pipelines icon indicating copy to clipboard operation
pipelines copied to clipboard

[backend] `set_retry` for pipelines does not work

Open ianbenlolo opened this issue 1 year ago • 15 comments

Environment

  • How did you deploy Kubeflow Pipelines (KFP)? Python --> VertexAI Pipeline.
  • KFP version: KFP version 2.7.0.
  • KFP SDK version:
kfp                              2.7.0
kfp-pipeline-spec                0.3.0
kfp-server-api                   2.0.0

Steps to reproduce

In the docs here it says "Pipelines can themselves be used as components in other pipelines, just as you would use any other single-step component in a pipeline".

I was testing this out to see if a pipeline within a pipeline can be retried but i can't get it to work. Here is what I've tried (based on this.)

from kfp import compiler
from kfp import dsl

@dsl.component
def print_op1(msg: str) -> str:
    print(msg)
    return msg

@dsl.container_component
def print_op2(msg: str):
    return dsl.ContainerSpec(
        image='alpine',
        command=['echo', msg],
    )

@dsl.component
def fail_job():
    raise ValueError('This job failed')

@dsl.pipeline
def inner_pipeline(msg: str):
    task = print_op1(msg=msg)

    fail_job().after(task).set_retry(num_retries = 2)

    print_op2(msg=task.output)


@dsl.pipeline(name='pipeline-in-pipeline')
def my_pipeline():
    op1_out = print_op1(msg='Hello')
    inner_out = inner_pipeline(msg='world').set_retry(num_retries=10).after(op1_out)
    print_op1(msg='bye').after(inner_out)

if __name__ == '__main__':
    compiler.Compiler().compile(
        pipeline_func=my_pipeline,
        package_path=__file__.replace('.py', '.yaml'))

The fail_job will retry, but the pipeline-in-pipeline does not. Am i wrong in my thinking?

Expected result

The pipeline-in-pipeline should retry as well.

This is related to my discussion here but making an issue for visibility.


Impacted by this bug? Give it a šŸ‘.

ianbenlolo avatar Oct 11 '24 15:10 ianbenlolo

@ianbenlolo set_retry is not working even without nested pipelines, see #9950 , I've also tried it without nested pipelines. It doesn't reties even once based on this spec:

      tasks:
        fail-job:
          cachingOptions:
            enableCache: true
          componentRef:
            name: comp-fail-job
          dependentTasks:
          - print-op1
          retryPolicy:
            backoffDuration: 0s
            backoffFactor: 2.0
            backoffMaxDuration: 3600s
            maxRetryCount: 2
          taskInfo:
            name: fail-job

I didn't try with vertexAI, but I guess set_retry is also not supported by VertexAI yet, see https://issuetracker.google.com/issues/226569351

Faakhir30 avatar Oct 13 '24 11:10 Faakhir30

@Faakhir30 Please see my comment in the original thread. It works for me on vertex ai. The issue is pipelines-in-pipelines that do not.

ianbenlolo avatar Oct 16 '24 15:10 ianbenlolo

I am also encountering the same issue where retries are not functioning as expected.

The environment details are as follows:

How did you deploy Kubeflow Pipelines (KFP)?
Using a custom deployment on AWS EKS.

KFP version:
Kubeflow version: 1.9.0

KFP SDK version:

  • kfp: 2.9.0
  • kfp-pipeline-spec: 0.4.0
  • kfp-server-api: 2.3.0

Here is the code:

import os
import kfp
from kfp.v2 import dsl
from kfp import kubernetes

@dsl.component(base_image="python:3.8")
def test1() -> None:
    import sys
    print("complete")
    sys.exit(1)

@dsl.pipeline(
    name="TestRetry",
    description="test retry"
)
def test_pipeline():
    test_task = test1()
    test_task.set_retry(
        num_retries=3,
        backoff_duration="60s",
        backoff_factor=2,
        backoff_max_duration="3600s"
    )
    test_task.set_caching_options(enable_caching=False)

print("Compiling pipeline for cloud execution...")
kfp.v2.compiler.Compiler().compile(test_pipeline, "./test_retry.yaml")

The compiled result is as follows:

# PIPELINE DEFINITION
# Name: testretry
# Description: test retry
components:
  comp-test1:
    executorLabel: exec-test1
deploymentSpec:
  executors:
    exec-test1:
      container:
        args:
        - --executor_input
        - '{{$}}'
        - --function_to_execute
        - test1
        command:
        - sh
        - -c
        - "\nif ! [ -x \"$(command -v pip)\" ]; then\n    python3 -m ensurepip ||\
          \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
          \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.9.0'\
          \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
          $0\" \"$@\"\n"
        - sh
        - -ec
        - 'program_path=$(mktemp -d)


          printf "%s" "$0" > "$program_path/ephemeral_component.py"

          _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main                         --component_module_path                         "$program_path/ephemeral_component.py"                         "$@"

          '
        - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
          \ *\n\ndef test1() -> None:\n    import sys\n    print(\"complete\")\n \
          \   sys.exit(1)\n\n"
        image: python:3.8
pipelineInfo:
  description: test retry
  name: testretry
root:
  dag:
    tasks:
      test1:
        cachingOptions: {}
        componentRef:
          name: comp-test1
        retryPolicy:
          backoffDuration: 60s
          backoffFactor: 2.0
          backoffMaxDuration: 3600s
          maxRetryCount: 3
        taskInfo:
          name: test1
schemaVersion: 2.1.0
sdkVersion: kfp-2.9.0
  • After the retry time has elapsed following a failed execution: image

  • Status on the list screen immediately after pressing the retry button: image

  • Details screen immediately after pressing the retry button: image

ishisakok-nttd avatar Nov 14 '24 12:11 ishisakok-nttd

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

github-actions[bot] avatar Jan 14 '25 07:01 github-actions[bot]

/remove-lifecycle stale

hbelmiro avatar Jan 14 '25 14:01 hbelmiro

should be fixed by https://github.com/kubeflow/pipelines/pull/11585 /close

juliusvonkohout avatar Feb 25 '25 11:02 juliusvonkohout

@juliusvonkohout: Closing this issue.

In response to this:

should be fixed by https://github.com/kubeflow/pipelines/pull/11585 /close

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository.

google-oss-prow[bot] avatar Feb 25 '25 11:02 google-oss-prow[bot]

It is set_retry at the pipeline level . I mentioned it in my PR by mistake. It actually fixes set_retry at the component level

ntny avatar Feb 25 '25 19:02 ntny

Then lets reopen for https://github.com/kubeflow/pipelines/pull/11673

juliusvonkohout avatar Feb 26 '25 12:02 juliusvonkohout

@juliusvonkohout I mistakenly mentioned this task in the MR and removed the mention. It's about set_retry at the pipeline level, and I fixed the set_retry at the execution level. I apologize for the confusion

ntny avatar Feb 26 '25 21:02 ntny

I also just confirmed this was only fixed for component level. The following works:

component_level_works.yaml

from kfp import compiler
from kfp import dsl

@dsl.component(base_image="quay.io/opendatahub/ds-pipelines-ci-executor-image:v1.0")
def print_op1(msg: str) -> str:
    print(msg)
    return msg

@dsl.container_component
def print_op2(msg: str):
    return dsl.ContainerSpec(
        image='quay.io/hukhan/alpine',
        command=['echo', msg],
    )

@dsl.component(base_image="quay.io/opendatahub/ds-pipelines-ci-executor-image:v1.0")
def fail_job():
    raise ValueError('This job failed')

@dsl.pipeline
def my_pipeline(msg: str):
    task = print_op1(msg=msg)

    fail_job().after(task).set_retry(num_retries = 2)

    print_op2(msg=task.output)


if __name__ == '__main__':
    compiler.Compiler().compile(
        pipeline_func=my_pipeline,
        package_path=__file__.replace('.py', '.yaml'))

But this does not:

pipeline_level_does_not_work.yaml
from kfp import compiler
from kfp import dsl

@dsl.component(base_image="quay.io/opendatahub/ds-pipelines-ci-executor-image:v1.0")
def print_op1(msg: str) -> str:
    print(msg)
    return msg

@dsl.container_component
def print_op2(msg: str):
    return dsl.ContainerSpec(
        image='quay.io/hukhan/alpine',
        command=['echo', msg],
    )

@dsl.component(base_image="quay.io/opendatahub/ds-pipelines-ci-executor-image:v1.0")
def fail_job():
    raise ValueError('This job failed')

@dsl.pipeline
def inner_pipeline(msg: str):
    task = print_op1(msg=msg)

    fail_job().after(task).set_retry(num_retries = 2)

    print_op2(msg=task.output)


@dsl.pipeline(name='pipeline-in-pipeline')
def my_pipeline():
    op1_out = print_op1(msg='Hello')
    inner_out = inner_pipeline(msg='world').set_retry(num_retries=3).after(op1_out)
    print_op1(msg='bye').after(inner_out)

if __name__ == '__main__':
    compiler.Compiler().compile(
        pipeline_func=my_pipeline,
        package_path=__file__.replace('.py', '.yaml'))

Note that in the latter, the component will also not retry within the nested dag, and no retryStrategy is set for the associated workflow.

HumairAK avatar Apr 16 '25 14:04 HumairAK

I gonna look at the nested DAG, but not sure when yet

ntny avatar Apr 17 '25 10:04 ntny

Hey @ntny! I have started work on the nested dags support already, though I have yet to assign myself. Would you be open to reviewing the pr once I have it ready?

alyssacgoins avatar Apr 17 '25 18:04 alyssacgoins

Hi @alyssacgoins yes, definitely! I’d be happy to take a look

https://github.com/kubeflow/pipelines/pull/11836 Enabled today: component-level tests — might be useful to include the fix if you've already branched out.

ntny avatar Apr 17 '25 18:04 ntny

Hi @ntny here is the PR if interested: https://github.com/kubeflow/pipelines/pull/11908

alyssacgoins avatar Jun 02 '25 21:06 alyssacgoins