[backend] Pipeline channel placeholders in formatted string cannot be substituted correctly
Environment
- How did you deploy Kubeflow Pipelines (KFP)? Kubeflow manifests standalone deployment (ref)
- KFP version: 2.2.0
- KFP SDK version: kfp 2.8.0 kfp-kubernetes 1.2.0 kfp-pipeline-spec 0.3.0 kfp-server-api 2.0.5
Steps to reproduce
When using the pipeline channel placeholders in formatted string and then passing it as another task input, the downstream PipelineTask often get a non-substituted version of input value. See example for details. (you may have to run the pipeline run a few times to trigger this issue)
Source codes
from kfp import dsl, compiler
@dsl.component(base_image='python:3.10-slim-bookworm')
def print_op(message: str) -> str:
print(message)
return message
@dsl.pipeline(name='example-pipeline')
def pipeline(name: str):
for i in range(1, 6):
message = f'Hello {name} {i}'
print_op(message=message)
if __name__ == "__main__":
pipeline_yaml_path = __file__.replace(".py", ".yaml")
compiler.Compiler().compile(pipeline, package_path=pipeline_yaml_path)
Pipeline run results
- Input
- name:
Kubeflow Pipeline
- name:
- Output:
# pipeline print-op
Hello Kubeflow Pipeline 1
[KFP Executor 2024-07-22 07:46:45,388 INFO]: Wrote executor output file to /tmp/kfp_outputs/output_metadata.json.
I0722 07:46:45.400565 22 launcher_v2.go:705] ExecutorOutput: {
"parameterValues": {
"Output": "Hello Kubeflow Pipeline 1"
}
}
# pipeline print-op-2
Hello Kubeflow Pipeline 2
[KFP Executor 2024-07-22 07:46:45,542 INFO]: Wrote executor output file to /tmp/kfp_outputs/output_metadata.json.
I0722 07:46:45.553621 22 launcher_v2.go:705] ExecutorOutput: {
"parameterValues": {
"Output": "Hello Kubeflow Pipeline 2"
}
}
# pipeline print-op-3
Hello {{$.inputs.parameters['pipelinechannel--name']}} 3
[KFP Executor 2024-07-22 07:46:42,415 INFO]: Wrote executor output file to /tmp/kfp_outputs/output_metadata.json.
I0722 07:46:42.426454 24 launcher_v2.go:705] ExecutorOutput: {
"parameterValues": {
"Output": "Hello {{$.inputs.parameters['pipelinechannel--name']}} 3"
}
}
# pipeline print-op-4
Hello Kubeflow Pipeline 4
[KFP Executor 2024-07-22 07:46:45,782 INFO]: Wrote executor output file to /tmp/kfp_outputs/output_metadata.json.
I0722 07:46:45.794531 24 launcher_v2.go:705] ExecutorOutput: {
"parameterValues": {
"Output": "Hello Kubeflow Pipeline 4"
}
}
# pipeline print-op-5
Hello Kubeflow Pipeline 5
[KFP Executor 2024-07-22 07:46:42,380 INFO]: Wrote executor output file to /tmp/kfp_outputs/output_metadata.json.
I0722 07:46:42.396266 23 launcher_v2.go:705] ExecutorOutput: {
"parameterValues": {
"Output": "Hello Kubeflow Pipeline 5"
}
}
Expected result
The downstream task should always receive input values with all pipeline placeholders substituted.
Materials and Reference
Generated YAML
# PIPELINE DEFINITION
# Name: example-pipeline
# Inputs:
# name: str
components:
comp-print-op:
executorLabel: exec-print-op
inputDefinitions:
parameters:
message:
parameterType: STRING
outputDefinitions:
parameters:
Output:
parameterType: STRING
comp-print-op-2:
executorLabel: exec-print-op-2
inputDefinitions:
parameters:
message:
parameterType: STRING
outputDefinitions:
parameters:
Output:
parameterType: STRING
comp-print-op-3:
executorLabel: exec-print-op-3
inputDefinitions:
parameters:
message:
parameterType: STRING
outputDefinitions:
parameters:
Output:
parameterType: STRING
comp-print-op-4:
executorLabel: exec-print-op-4
inputDefinitions:
parameters:
message:
parameterType: STRING
outputDefinitions:
parameters:
Output:
parameterType: STRING
comp-print-op-5:
executorLabel: exec-print-op-5
inputDefinitions:
parameters:
message:
parameterType: STRING
outputDefinitions:
parameters:
Output:
parameterType: STRING
deploymentSpec:
executors:
exec-print-op:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- print_op
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.8.0'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef print_op(message: str) -> str:\n print(message)\n return\
\ message\n\n"
image: python:3.10-slim-bookworm
exec-print-op-2:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- print_op
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.8.0'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef print_op(message: str) -> str:\n print(message)\n return\
\ message\n\n"
image: python:3.10-slim-bookworm
exec-print-op-3:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- print_op
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.8.0'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef print_op(message: str) -> str:\n print(message)\n return\
\ message\n\n"
image: python:3.10-slim-bookworm
exec-print-op-4:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- print_op
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.8.0'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef print_op(message: str) -> str:\n print(message)\n return\
\ message\n\n"
image: python:3.10-slim-bookworm
exec-print-op-5:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- print_op
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.8.0'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef print_op(message: str) -> str:\n print(message)\n return\
\ message\n\n"
image: python:3.10-slim-bookworm
pipelineInfo:
name: example-pipeline
root:
dag:
tasks:
print-op:
cachingOptions:
enableCache: true
componentRef:
name: comp-print-op
inputs:
parameters:
message:
runtimeValue:
constant: Hello {{$.inputs.parameters['pipelinechannel--name']}} 1
pipelinechannel--name:
componentInputParameter: name
taskInfo:
name: print-op
print-op-2:
cachingOptions:
enableCache: true
componentRef:
name: comp-print-op-2
inputs:
parameters:
message:
runtimeValue:
constant: Hello {{$.inputs.parameters['pipelinechannel--name']}} 2
pipelinechannel--name:
componentInputParameter: name
taskInfo:
name: print-op-2
print-op-3:
cachingOptions:
enableCache: true
componentRef:
name: comp-print-op-3
inputs:
parameters:
message:
runtimeValue:
constant: Hello {{$.inputs.parameters['pipelinechannel--name']}} 3
pipelinechannel--name:
componentInputParameter: name
taskInfo:
name: print-op-3
print-op-4:
cachingOptions:
enableCache: true
componentRef:
name: comp-print-op-4
inputs:
parameters:
message:
runtimeValue:
constant: Hello {{$.inputs.parameters['pipelinechannel--name']}} 4
pipelinechannel--name:
componentInputParameter: name
taskInfo:
name: print-op-4
print-op-5:
cachingOptions:
enableCache: true
componentRef:
name: comp-print-op-5
inputs:
parameters:
message:
runtimeValue:
constant: Hello {{$.inputs.parameters['pipelinechannel--name']}} 5
pipelinechannel--name:
componentInputParameter: name
taskInfo:
name: print-op-5
inputDefinitions:
parameters:
name:
parameterType: STRING
schemaVersion: 2.1.0
sdkVersion: kfp-2.8.0
Impacted by this bug? Give it a 👍.
This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.
Also hitting this issue -- curious if anyone finds a workaround
My current workaround is to create an extra pipeline task just for substituting placeholder values, and used the output of that pipeline task as other tasks' input. In this way the placeholders can be substituted correctly, but this will cost an additional pod being created just for this simple format string task :(
Yea, I worked around it by actually inspecting the argo template and forcing the correct replacement myself 🥲 doesn't require an extra pod, but is probably brittle to updates
Have the same issue. For some reason it works fine for longer strings.
This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.
This issue has been automatically closed because it has not had recent activity. Please comment "/reopen" to reopen it.
Hi, I would like to check, will this be fixed in the next release? I'm still encountering the similar problem using KFP SDK 2.13.0 and backend version 2.5.0, i.e. the output parameters are not substituted correctly when using Python string formatter. But this problem is not always consistent: sometimes the parameters correctly substituted and sometimes it's not. This is quite critical for us as we use a reusable component for notification, which accept formatted string in Python. This has been working fine when we are using KFP v1.
Here's the results running the pipeline above:
First run (works as expected):
Second run (does not work as expected):
/reopen
@tkobatake: You can't reopen an issue/PR unless you authored it or you are a collaborator.
In response to this:
/reopen
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository.
/reopen
@HumairAK: Reopened this issue.
In response to this:
/reopen
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository.