pipelines icon indicating copy to clipboard operation
pipelines copied to clipboard

[sdk] Can't use pipeline's input object in dsl.Condition

Open ittus opened this issue 2 years ago • 1 comments

Environment

  • How do you deploy Kubeflow Pipelines (KFP)? Full Kubeflow deployment

  • KFP version: 1.8.1

  • KFP SDK version: 1.8.12

Steps to reproduce

@dsl.pipeline(name="Test pipeline", description=__doc__)
def test_pipeline(
    output_config: Optional[dict] = {
        "s3_bucket": "my-s3-bucket",
        "s3_prefix": "my-s3-prefix"
    }
) -> None:
    with dsl.Condition(output_config is not None):
    (
        # run a container op
    )

Then there is an error when create the pipeline

    if isinstance(group.condition.operand1, dsl.PipelineParam):
AttributeError: 'bool' object has no attribute 'operand1'

If I change the condition to

 with dsl.Condition(output_config != None)

then when running, there is another error

This step is in Error state with this message: Invalid 'when' expression '"{"s3_bucket": "my-s3-bucket", "s3_prefix": "my-s3-prefix}" != "None"': Cannot transition token types from STRING [{] to VARIABLE [s3_bucket]

Expected result

  • Able to use dsl.Condition with pipeline's input

Materials and reference

Labels

/area sdk


Impacted by this bug? Give it a 👍.

ittus avatar Aug 28 '22 07:08 ittus

Thanks for reporting this @ittus. This is a known bug.

connor-mccarthy avatar Sep 01 '22 22:09 connor-mccarthy

Any update on this? We'd like to use the dsl.Condition to trigger some alert ops and can't whilst this is open.

sumanassarmathg avatar Nov 14 '22 23:11 sumanassarmathg

This has not yet been fixed in KFP v1.

This was fixed at least as early as kfp==2.0.0b1. The following runs without issue:

import kfp

assert kfp.__version__ == '2.0.0b1'

from kfp import compiler
from kfp import dsl


@dsl.component
def identity(string: str) -> str:
    return string


@dsl.pipeline()
def my_pipeline(string: str = 'string'):
    with dsl.Condition(string == 'string'):
        op1 = identity(string=string)


ir_file = __file__.replace('.py', '.yaml')
compiler.Compiler().compile(pipeline_func=my_pipeline, package_path=ir_file)

connor-mccarthy avatar Nov 17 '22 22:11 connor-mccarthy

That may run, however anything beyond a trivial equality (like checking if a file needs to be decompressed by checking a file extension) fails due to PipelineParameterChannel attribute issues. This seems related to #8626 and essentially prevents me from using kubeflow pipelines in a production setting.

MatthewRalston avatar Dec 23 '22 03:12 MatthewRalston

@MatthewRalston, thanks for your response. If I understand what you're describing correctly, that's independent from the use of pipeline input in dsl.Condition.

But yes, only a limited number of operators are permitted in dsl.Condition. Arbitrary user code is not permitted in dsl.Condition (or the pipeline body more generally) since the pipeline body defines the orchestration and will not actually be run at pipeline runtime.

connor-mccarthy avatar Dec 27 '22 16:12 connor-mccarthy

I am also getting the same error message as long as the condition is not based on the PipelineParam, e.g.

a_local_variable=5
with dsl.Condition(a_local_variable == 5):
    pass

AttributeError: 'bool' object has no attribute 'operand1'

xsqian avatar Feb 01 '23 02:02 xsqian

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

github-actions[bot] avatar Mar 03 '24 07:03 github-actions[bot]

This issue has been automatically closed because it has not had recent activity. Please comment "/reopen" to reopen it.

github-actions[bot] avatar Apr 11 '24 07:04 github-actions[bot]