pipelines icon indicating copy to clipboard operation
pipelines copied to clipboard

[backend] Pipeline channel placeholders in formatted string cannot be substituted correctly

Open JasonNS1425 opened this issue 1 year ago • 3 comments

Environment

  • How did you deploy Kubeflow Pipelines (KFP)? Kubeflow manifests standalone deployment (ref)
  • KFP version: 2.2.0
  • KFP SDK version: kfp 2.8.0 kfp-kubernetes 1.2.0 kfp-pipeline-spec 0.3.0 kfp-server-api 2.0.5

Steps to reproduce

When using the pipeline channel placeholders in formatted string and then passing it as another task input, the downstream PipelineTask often get a non-substituted version of input value. See example for details. (you may have to run the pipeline run a few times to trigger this issue)

Source codes

from kfp import dsl, compiler


@dsl.component(base_image='python:3.10-slim-bookworm')
def print_op(message: str) -> str:
    print(message)
    return message

@dsl.pipeline(name='example-pipeline')
def pipeline(name: str):
    for i in range(1, 6):
        message = f'Hello {name} {i}'
        print_op(message=message)

if __name__ == "__main__":
    pipeline_yaml_path = __file__.replace(".py", ".yaml")
    compiler.Compiler().compile(pipeline, package_path=pipeline_yaml_path)

Pipeline run results

  • Input
    • name: Kubeflow Pipeline
  • Output:
# pipeline print-op
Hello Kubeflow Pipeline 1
[KFP Executor 2024-07-22 07:46:45,388 INFO]: Wrote executor output file to /tmp/kfp_outputs/output_metadata.json.
I0722 07:46:45.400565      22 launcher_v2.go:705] ExecutorOutput: {
  "parameterValues": {
    "Output": "Hello Kubeflow Pipeline 1"
  }
}

# pipeline print-op-2
Hello Kubeflow Pipeline 2
[KFP Executor 2024-07-22 07:46:45,542 INFO]: Wrote executor output file to /tmp/kfp_outputs/output_metadata.json.
I0722 07:46:45.553621      22 launcher_v2.go:705] ExecutorOutput: {
  "parameterValues": {
    "Output": "Hello Kubeflow Pipeline 2"
  }
}

# pipeline print-op-3
Hello {{$.inputs.parameters['pipelinechannel--name']}} 3
[KFP Executor 2024-07-22 07:46:42,415 INFO]: Wrote executor output file to /tmp/kfp_outputs/output_metadata.json.
I0722 07:46:42.426454      24 launcher_v2.go:705] ExecutorOutput: {
  "parameterValues": {
    "Output": "Hello {{$.inputs.parameters['pipelinechannel--name']}} 3"
  }
}

# pipeline print-op-4
Hello Kubeflow Pipeline 4
[KFP Executor 2024-07-22 07:46:45,782 INFO]: Wrote executor output file to /tmp/kfp_outputs/output_metadata.json.
I0722 07:46:45.794531      24 launcher_v2.go:705] ExecutorOutput: {
  "parameterValues": {
    "Output": "Hello Kubeflow Pipeline 4"
  }
}

# pipeline print-op-5
Hello Kubeflow Pipeline 5
[KFP Executor 2024-07-22 07:46:42,380 INFO]: Wrote executor output file to /tmp/kfp_outputs/output_metadata.json.
I0722 07:46:42.396266      23 launcher_v2.go:705] ExecutorOutput: {
  "parameterValues": {
    "Output": "Hello Kubeflow Pipeline 5"
  }
}

Expected result

The downstream task should always receive input values with all pipeline placeholders substituted.

Materials and Reference

Generated YAML
 # PIPELINE DEFINITION
 # Name: example-pipeline
 # Inputs:
 #    name: str
 components:
   comp-print-op:
     executorLabel: exec-print-op
     inputDefinitions:
       parameters:
         message:
           parameterType: STRING
     outputDefinitions:
       parameters:
         Output:
           parameterType: STRING
   comp-print-op-2:
     executorLabel: exec-print-op-2
     inputDefinitions:
       parameters:
         message:
           parameterType: STRING
     outputDefinitions:
       parameters:
         Output:
           parameterType: STRING
   comp-print-op-3:
     executorLabel: exec-print-op-3
     inputDefinitions:
       parameters:
         message:
           parameterType: STRING
     outputDefinitions:
       parameters:
         Output:
           parameterType: STRING
   comp-print-op-4:
     executorLabel: exec-print-op-4
     inputDefinitions:
       parameters:
         message:
           parameterType: STRING
     outputDefinitions:
       parameters:
         Output:
           parameterType: STRING
   comp-print-op-5:
     executorLabel: exec-print-op-5
     inputDefinitions:
       parameters:
         message:
           parameterType: STRING
     outputDefinitions:
       parameters:
         Output:
           parameterType: STRING
 deploymentSpec:
   executors:
     exec-print-op:
       container:
         args:
         - --executor_input
         - '{{$}}'
         - --function_to_execute
         - print_op
         command:
         - sh
         - -c
         - "\nif ! [ -x \"$(command -v pip)\" ]; then\n    python3 -m ensurepip ||\
           \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
           \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.8.0'\
           \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
           $0\" \"$@\"\n"
         - sh
         - -ec
         - 'program_path=$(mktemp -d)
 
 
           printf "%s" "$0" > "$program_path/ephemeral_component.py"
 
           _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main                         --component_module_path                         "$program_path/ephemeral_component.py"                         "$@"
 
           '
         - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
           \ *\n\ndef print_op(message: str) -> str:\n    print(message)\n    return\
           \ message\n\n"
         image: python:3.10-slim-bookworm
     exec-print-op-2:
       container:
         args:
         - --executor_input
         - '{{$}}'
         - --function_to_execute
         - print_op
         command:
         - sh
         - -c
         - "\nif ! [ -x \"$(command -v pip)\" ]; then\n    python3 -m ensurepip ||\
           \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
           \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.8.0'\
           \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
           $0\" \"$@\"\n"
         - sh
         - -ec
         - 'program_path=$(mktemp -d)
 
 
           printf "%s" "$0" > "$program_path/ephemeral_component.py"
 
           _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main                         --component_module_path                         "$program_path/ephemeral_component.py"                         "$@"
 
           '
         - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
           \ *\n\ndef print_op(message: str) -> str:\n    print(message)\n    return\
           \ message\n\n"
         image: python:3.10-slim-bookworm
     exec-print-op-3:
       container:
         args:
         - --executor_input
         - '{{$}}'
         - --function_to_execute
         - print_op
         command:
         - sh
         - -c
         - "\nif ! [ -x \"$(command -v pip)\" ]; then\n    python3 -m ensurepip ||\
           \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
           \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.8.0'\
           \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
           $0\" \"$@\"\n"
         - sh
         - -ec
         - 'program_path=$(mktemp -d)
 
 
           printf "%s" "$0" > "$program_path/ephemeral_component.py"
 
           _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main                         --component_module_path                         "$program_path/ephemeral_component.py"                         "$@"
 
           '
         - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
           \ *\n\ndef print_op(message: str) -> str:\n    print(message)\n    return\
           \ message\n\n"
         image: python:3.10-slim-bookworm
     exec-print-op-4:
       container:
         args:
         - --executor_input
         - '{{$}}'
         - --function_to_execute
         - print_op
         command:
         - sh
         - -c
         - "\nif ! [ -x \"$(command -v pip)\" ]; then\n    python3 -m ensurepip ||\
           \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
           \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.8.0'\
           \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
           $0\" \"$@\"\n"
         - sh
         - -ec
         - 'program_path=$(mktemp -d)
 
 
           printf "%s" "$0" > "$program_path/ephemeral_component.py"
 
           _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main                         --component_module_path                         "$program_path/ephemeral_component.py"                         "$@"
 
           '
         - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
           \ *\n\ndef print_op(message: str) -> str:\n    print(message)\n    return\
           \ message\n\n"
         image: python:3.10-slim-bookworm
     exec-print-op-5:
       container:
         args:
         - --executor_input
         - '{{$}}'
         - --function_to_execute
         - print_op
         command:
         - sh
         - -c
         - "\nif ! [ -x \"$(command -v pip)\" ]; then\n    python3 -m ensurepip ||\
           \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
           \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.8.0'\
           \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
           $0\" \"$@\"\n"
         - sh
         - -ec
         - 'program_path=$(mktemp -d)
 
 
           printf "%s" "$0" > "$program_path/ephemeral_component.py"
 
           _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main                         --component_module_path                         "$program_path/ephemeral_component.py"                         "$@"
 
           '
         - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
           \ *\n\ndef print_op(message: str) -> str:\n    print(message)\n    return\
           \ message\n\n"
         image: python:3.10-slim-bookworm
 pipelineInfo:
   name: example-pipeline
 root:
   dag:
     tasks:
       print-op:
         cachingOptions:
           enableCache: true
         componentRef:
           name: comp-print-op
         inputs:
           parameters:
             message:
               runtimeValue:
                 constant: Hello {{$.inputs.parameters['pipelinechannel--name']}} 1
             pipelinechannel--name:
               componentInputParameter: name
         taskInfo:
           name: print-op
       print-op-2:
         cachingOptions:
           enableCache: true
         componentRef:
           name: comp-print-op-2
         inputs:
           parameters:
             message:
               runtimeValue:
                 constant: Hello {{$.inputs.parameters['pipelinechannel--name']}} 2
             pipelinechannel--name:
               componentInputParameter: name
         taskInfo:
           name: print-op-2
       print-op-3:
         cachingOptions:
           enableCache: true
         componentRef:
           name: comp-print-op-3
         inputs:
           parameters:
             message:
               runtimeValue:
                 constant: Hello {{$.inputs.parameters['pipelinechannel--name']}} 3
             pipelinechannel--name:
               componentInputParameter: name
         taskInfo:
           name: print-op-3
       print-op-4:
         cachingOptions:
           enableCache: true
         componentRef:
           name: comp-print-op-4
         inputs:
           parameters:
             message:
               runtimeValue:
                 constant: Hello {{$.inputs.parameters['pipelinechannel--name']}} 4
             pipelinechannel--name:
               componentInputParameter: name
         taskInfo:
           name: print-op-4
       print-op-5:
         cachingOptions:
           enableCache: true
         componentRef:
           name: comp-print-op-5
         inputs:
           parameters:
             message:
               runtimeValue:
                 constant: Hello {{$.inputs.parameters['pipelinechannel--name']}} 5
             pipelinechannel--name:
               componentInputParameter: name
         taskInfo:
           name: print-op-5
   inputDefinitions:
     parameters:
       name:
         parameterType: STRING
 schemaVersion: 2.1.0
 sdkVersion: kfp-2.8.0

Impacted by this bug? Give it a 👍.

JasonNS1425 avatar Jul 24 '24 10:07 JasonNS1425

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

github-actions[bot] avatar Sep 23 '24 07:09 github-actions[bot]

Also hitting this issue -- curious if anyone finds a workaround

jaredwillard12 avatar Oct 01 '24 15:10 jaredwillard12

My current workaround is to create an extra pipeline task just for substituting placeholder values, and used the output of that pipeline task as other tasks' input. In this way the placeholders can be substituted correctly, but this will cost an additional pod being created just for this simple format string task :(

JasonNS1425 avatar Oct 14 '24 02:10 JasonNS1425

Yea, I worked around it by actually inspecting the argo template and forcing the correct replacement myself 🥲 doesn't require an extra pod, but is probably brittle to updates

jaredwillard12 avatar Oct 21 '24 15:10 jaredwillard12

Have the same issue. For some reason it works fine for longer strings. image image

lethnis avatar Nov 18 '24 04:11 lethnis

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

github-actions[bot] avatar Jan 17 '25 07:01 github-actions[bot]

This issue has been automatically closed because it has not had recent activity. Please comment "/reopen" to reopen it.

github-actions[bot] avatar Feb 08 '25 07:02 github-actions[bot]

Hi, I would like to check, will this be fixed in the next release? I'm still encountering the similar problem using KFP SDK 2.13.0 and backend version 2.5.0, i.e. the output parameters are not substituted correctly when using Python string formatter. But this problem is not always consistent: sometimes the parameters correctly substituted and sometimes it's not. This is quite critical for us as we use a reusable component for notification, which accept formatted string in Python. This has been working fine when we are using KFP v1.

Here's the results running the pipeline above:

First run (works as expected): Image

Second run (does not work as expected): Image

devina-ekawati avatar Jul 30 '25 05:07 devina-ekawati

/reopen

tkobatake avatar Aug 04 '25 02:08 tkobatake

@tkobatake: You can't reopen an issue/PR unless you authored it or you are a collaborator.

In response to this:

/reopen

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository.

google-oss-prow[bot] avatar Aug 04 '25 02:08 google-oss-prow[bot]

/reopen

HumairAK avatar Aug 04 '25 12:08 HumairAK

@HumairAK: Reopened this issue.

In response to this:

/reopen

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository.

google-oss-prow[bot] avatar Aug 04 '25 12:08 google-oss-prow[bot]