pipelines
pipelines copied to clipboard
[feature] Parse Iterables for PipelineParameterChannels
Feature Area
/area sdk
What feature would you like to see?
During the build stage of the pipeline spec the kfp.dsl.pipeline_spec_builder.build_task_spec_for_task method loops through the tasks' inputs and based on the data type of the input performs certain logic based on that type. I would like to see additional logic to check for PipelineParameterChannels/PipelineArtifactChannels within common python iterables that are used as kfp inputs i.e. list & dict as the current state only checks for those types on the object it is looping over rather than going one level deep before sending the input to the _to_protobuf_value method.
What is the use case or pain point?
My team currently uses a custom mapping of our components to allow for our end users to dynamically build pipelines definitions in v1. We store our components output dictionary within another dictionary and reference those outputs as we iterate over the required components to run. This feature can assist in dynamic pipeline definitions and also helps with minimizing code changes/design for migration to v2.
Is there a workaround currently?
I am currently unaware of any workaround at this moment
Love this idea? Give it a 👍.
This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.
This issue has been automatically closed because it has not had recent activity. Please comment "/reopen" to reopen it.
/reopen
@zazulam: Reopened this issue.
In response to this:
/reopen
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository.
I created some examples of what I've attempted in v2.
- Attempt using kfp Artifacts / List[Datasets]
from typing import List
from kfp.dsl import Dataset, component, pipeline, Output, Input
@component(base_image="python:3.9",)
def create_dataset_paths(name:str, out_dfs: Output[Dataset], input_dfs:List[Dataset]=None):
if input_dfs:
input_df_paths = {input_df.name:input_df.metadata for input_df in input_dfs}
print(input_df_paths)
dataset_paths = {
'wine': 's3://my-bucket/datasets/wine_dataset.csv',
'iris': 's3://my-bucket/datasets/iris_dataset.csv',
'cancer': 's3://my-bucket/datasets/cancer_dataset.csv'
}
out_dfs.name = f'{name}_dfs'
out_dfs.metadata = dataset_paths
@component(base_image="python:3.9",)
def process_datasets(dataset_artifact: Input[Dataset]):
dataset_paths = dataset_artifact.metadata
for name, path in dataset_paths.items():
print(f"Looking at {name} dataset at S3 path: {path}")
@pipeline(name="dynamic-pipeline-example")
def dynamic_pipeline():
fruits = {
'apple': ['banana', 'orange'],
'banana': ['orange'],
'orange': [],
}
sorted_fruits = dict(sorted(fruits.items(), key=lambda item: len(item[1])))
output_pool = {}
for fruit, children in sorted_fruits.items():
if children:
current_task = create_dataset_paths(name=fruit, input_dfs=[output_pool[child] for child in children])
else:
current_task = create_dataset_paths(name=fruit)
output_pool[fruit] = current_task.outputs["out_dfs"]
process_datasets(dataset_artifact=current_task.outputs["out_dfs"])
endpoint = 'http://localhost:80'
kfp_client = kfp.client.Client(host=endpoint)
run = kfp_client.create_run_from_pipeline_func(
dynamic_pipeline,
arguments={},
)
That results in the following error:
Traceback (most recent call last):
File "/mnt/c/Users/zaz/Projects/Python/KFP/dataset_usage.py", line 30, in <module>
@pipeline(name="dynamic-pipeline-example")
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/mnt/c/Users/zaz/miniconda3/envs/kfpv2/lib/python3.11/site-packages/kfp/dsl/pipeline_context.py", line 65, in pipeline
return component_factory.create_graph_component_from_func(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/mnt/c/Users/zaz/miniconda3/envs/kfpv2/lib/python3.11/site-packages/kfp/dsl/component_factory.py", line 673, in create_graph_component_from_func
return graph_component.GraphComponent(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/mnt/c/Users/zaz/miniconda3/envs/kfpv2/lib/python3.11/site-packages/kfp/dsl/graph_component.py", line 58, in __init__
pipeline_outputs = pipeline_func(*args_list)
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/mnt/c/Users/zaz/Projects/Python/KFP/dataset_usage.py", line 41, in dynamic_pipeline
current_task = create_dataset_paths(name=fruit, input_dfs=[output_pool[child] for child in children])
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/mnt/c/Users/zaz/miniconda3/envs/kfpv2/lib/python3.11/site-packages/kfp/dsl/base_component.py", line 101, in __call__
return pipeline_task.PipelineTask(
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/mnt/c/Users/zaz/miniconda3/envs/kfpv2/lib/python3.11/site-packages/kfp/dsl/pipeline_task.py", line 118, in __init__
type_utils.verify_type_compatibility(
File "/mnt/c/Users/zaz/miniconda3/envs/kfpv2/lib/python3.11/site-packages/kfp/dsl/types/type_utils.py", line 330, in verify_type_compatibility
raise InconsistentTypeException(error_text)
kfp.dsl.types.type_utils.InconsistentTypeException: Incompatible argument passed to the input 'input_dfs' of component 'create-dataset-paths': Argument type 'LIST'
is incompatible with the input type 'List[[email protected]]'
- Using a PipelineArtifactChannel for those components result in this error:
Traceback (most recent call last):
File "/mnt/c/Users/zaz/Projects/Python/KFP/artifact_collections.py", line 95, in <module>
@pipeline(name="dynamic-pipeline-example")
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/mnt/c/Users/zaz/miniconda3/envs/kfpv2/lib/python3.11/site-packages/kfp/dsl/pipeline_context.py", line 65, in pipeline
return component_factory.create_graph_component_from_func(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/mnt/c/Users/zaz/miniconda3/envs/kfpv2/lib/python3.11/site-packages/kfp/dsl/component_factory.py", line 673, in create_graph_component_from_func
return graph_component.GraphComponent(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/mnt/c/Users/zaz/miniconda3/envs/kfpv2/lib/python3.11/site-packages/kfp/dsl/graph_component.py", line 68, in __init__
pipeline_spec, platform_spec = builder.create_pipeline_spec(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/mnt/c/Users/zaz/miniconda3/envs/kfpv2/lib/python3.11/site-packages/kfp/compiler/pipeline_spec_builder.py", line 1919, in create_pipeline_spec
build_spec_by_group(
File "/mnt/c/Users/zaz/miniconda3/envs/kfpv2/lib/python3.11/site-packages/kfp/compiler/pipeline_spec_builder.py", line 1272, in build_spec_by_group
subgroup_task_spec = build_task_spec_for_task(
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/mnt/c/Users/zaz/miniconda3/envs/kfpv2/lib/python3.11/site-packages/kfp/compiler/pipeline_spec_builder.py", line 309, in build_task_spec_for_task
to_protobuf_value(input_value))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/mnt/c/Users/zaz/miniconda3/envs/kfpv2/lib/python3.11/site-packages/kfp/compiler/pipeline_spec_builder.py", line 78, in to_protobuf_value
values=[to_protobuf_value(v) for v in value]))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/mnt/c/Users/zaz/miniconda3/envs/kfpv2/lib/python3.11/site-packages/kfp/compiler/pipeline_spec_builder.py", line 78, in <listcomp>
values=[to_protobuf_value(v) for v in value]))
^^^^^^^^^^^^^^^^^^^^
File "/mnt/c/Users/zaz/miniconda3/envs/kfpv2/lib/python3.11/site-packages/kfp/compiler/pipeline_spec_builder.py", line 80, in to_protobuf_value
raise ValueError('Value must be one of the following types: '
ValueError: Value must be one of the following types: str, int, float, bool, dict, and list. Got: "{{channel:task=create-dataset-paths;name=orange;type=Dataset;}}"
of type "<class 'kfp.dsl.pipeline_channel.PipelineArtifactChannel'>".
This ValueError was the same error I would receive when processing a dictionary input to a component where said dict was dynamically created during client-side compilation when setting the values to outputs of previous components. However, the type of the argument received was a PipelineParameterChannel.
When modifying the kfp.dsl.pipeline_spec_builder.build_task_spec_for_task to perform a check on lists/dicts for PipelineParameterChannels and PipelineArtifactChannels I was able to successful compile and have a properly connected DAG render in the UI based on the component outputs that would be fed in via that dictionary.
It seems to me that the List[Dataset] is something that can only be retrieved as a component output by having a component return an object of that type rather than declaring that list within the pipeline during compilation.
Is there an existing solution or method of collecting component outputs and feeding them into downstream components programmatically? My team uses this capability in v1 as we generate dynamic DAGs using the same pipeline definition function.
PipelineParameterChannel and PipelineArtifactChannel are compilation time placeholder that will be replaced either with the constant values or, more commonly, runtime placeholders (e.g.: {{$inputs.parameters[...]}} which can be resolved only at runtime. So the type checking you're proposal is only applicable for the constant value case.
I'm still a bit unclear on what real-world use case you're proposing to solve. Can you maybe give an example and how you would achieve it in kfp v1?
Thanks for the response Chen! So the main reason for all of this is because we have some teams that run a large pipeline where they built the ability to use slightly different configurations that can determine which components run and which ones do not. These teams have pipelines where the number of components/nodes can span from 5 to 120, and when operating at that scale, to modify their pipeline definition each time and “rewire” the components to either skip or select which ones run makes it rough on the end user. This is the root issue, large scale pipeline users needing to manipulate their workflow without the need to rewire their DAG manually. Since in v1 we did not have the feature of sub-DAGs, the team determined that they can use their existing yaml structure that was already defined and essentially dynamically generate the pipeline definition by leveraging the placeholders values to connect the DAG for them at runtime without the need to statically type out each component instantiation in the pipeline function. The fact that they can funnel the outputs from varying components into a collection and then feed that collection into a downstream node in v1 allows for this ability to generate the DAG structure.
However, that ability is not available in v2.
I'll share an example of how a team does this in v1:
Here is an example of the yaml structure that would be used.
operation_a:
operator_name: operator_a
required_args:
arg1: value1
arg2: value2
operator_inputs:
{}
operation_b:
operator_name: operator_b
required_args:
arg1: value1
arg2: value2
operator_inputs:
input_x:
operation_c: output_key
operation_c:
operator_name: operator_c
required_args:
arg1: value1
arg2: value2
operator_inputs:
input_x:
operation_d: output_key
operation_d:
operator_name: operator_d
required_args: {}
operator_inputs:
input_x:
operation_a: output_key
operation_e:
operator_name: operator_e
required_args:
arg1: value1
arg2: value2
operator_inputs:
input_x:
operation_d: output_key
operation_b: output_key
Here is an example of parsing an OrderedDict, workflow_components, from that yaml after performing a topological sort on it.
@kfp.dsl.pipeline(name='operation flow')
def sample_pipeline():
comp_outputs = {}
for comp in workflow_components.keys():
comp_details = workflow_components[comp]
# simple example of checking for starting nodes in the dag
if len(comp_details["operator_inputs"]) == 0:
operator_task = operator_component(operator_name=comp, required_args=comp_details["required_args"], )
else:
op_inputs = {}
for key, value in comp_details["operator_inputs"].items():
op = list(value.keys())[0]
op_inputs[key] = comp_outputs[op]["output_key"]
operator_task = operator_component(operator_name=comp, required_args=comp_details["required_args"], operator_inputs=op_inputs)
operator_task.set_display_name(comp)
comp_outputs[comp] = operator_task.outputs
kfp.compiler.Compiler().compile(sample_pipeline, 'sample_pipeline.yaml')
kfp_client = Client()
run = kfp_client.create_run_from_pipeline_func(
sample_pipeline,
arguments={
},
)
Here is also a snip of the dag spec in the argo wf manifest:
templates:
- dag:
tasks:
- arguments: {}
name: operator-comp
template: operator-comp
- arguments:
parameters:
- name: operator-comp-output_key
value: '{{tasks.operator-comp.outputs.parameters.operator-comp-output_key}}'
dependencies:
- operator-comp
name: operator-comp-2
template: operator-comp-2
- arguments:
parameters:
- name: operator-comp-2-output_key
value: '{{tasks.operator-comp-2.outputs.parameters.operator-comp-2-output_key}}'
dependencies:
- operator-comp-2
name: operator-comp-3
template: operator-comp-3
- arguments:
parameters:
- name: operator-comp-3-output_key
value: '{{tasks.operator-comp-3.outputs.parameters.operator-comp-3-output_key}}'
dependencies:
- operator-comp-3
name: operator-comp-4
template: operator-comp-4
- arguments:
parameters:
- name: operator-comp-2-output_key
value: '{{tasks.operator-comp-2.outputs.parameters.operator-comp-2-output_key}}'
- name: operator-comp-4-output_key
value: '{{tasks.operator-comp-4.outputs.parameters.operator-comp-4-output_key}}'
dependencies:
- operator-comp-2
- operator-comp-4
name: operator-comp-5
template: operator-comp-5
@zazulam your team has implemented a DSL on top of a DSL :expressionless:
:exploding_head: because here we have a yaml abstraction on top of python so we can write pipelines in yaml instead of python
and the python being abstracted is itself is an abstraction on top of a way to write pipelines using yaml
DSLs all the way down 🤣. It's def unorthodox (our users / data scientists can be quite...creative), but it was supported in V1, is not supported in V2, is trivial to add support for, and the work is already complete.
This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.
After diving into it more, there were changes to the pipelinespec that needed to be added to appropriately handle the downstream usage of the component outputs. I'll be updating the associated PR soon.
The so-called dynamic pipeline is kind of a misuse of KFP DSL. A pipeline has a static definition when sent to the API. All the "dynamic" logic shown in this sample--for loops, if/else conditions, etc.--has happened at pipeline compilation locally.
@kfp.dsl.pipeline(name='operation flow')
def sample_pipeline():
comp_outputs = {}
for comp in workflow_components.keys():
comp_details = workflow_components[comp]
# simple example of checking for starting nodes in the dag
if len(comp_details["operator_inputs"]) == 0:
operator_task = operator_component(operator_name=comp, required_args=comp_details["required_args"], )
else:
op_inputs = {}
for key, value in comp_details["operator_inputs"].items():
op = list(value.keys())[0]
op_inputs[key] = comp_outputs[op]["output_key"]
operator_task = operator_component(operator_name=comp, required_args=comp_details["required_args"], operator_inputs=op_inputs)
operator_task.set_display_name(comp)
comp_outputs[comp] = operator_task.outputs
I strongly suggest users avoid using non-DSL python code in pipeline definition, as many often got confused why their code didn't run as expected. Likely all dynamic logics can and possible should be captured within components, and executed remotely at runtime.
This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.
This issue has been automatically closed because it has not had recent activity. Please comment "/reopen" to reopen it.
/reopen
@marcocamilo: You can't reopen an issue/PR unless you authored it or you are a collaborator.
In response to this:
/reopen
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository.
I’d love to see this reopened!
I would like to support passing a list of outputs from multiple components as a single structured input (e.g., a List[Artifact]) to another component in Kubeflow Pipelines, as opposed to the current option of simply collecting the outputs of a single component iterating over multiple input parameters (e.g. dsl.ParallelFor / dsl.Collected). This is conceptually different from what dsl.ParallelFor and dsl.Collected.
The new feature should enable collecting outputs from different components, each invoked independently, into a single input for downstream components.
What is the use case or pain point?
In many ML workflows—especially ensemble methods, boosting pipelines, or model selection tasks—multiple models are trained using different algorithms or configurations, but share the same downstream evaluation process.
For example, I may train several models using different components like train_svc_op, train_xgb_op, and train_lr_op, each producing the same type of Model artifact. These need to be evaluated together by a single component that accepts a List[Model].
What is the ideal behavior?
Dynamically collect outputs from multiple independent components and feed them as a single structured input (e.g., List[Model]) to a downstream component, this would be a true fanning in workflow, by replicating one set of input parameters over multiple components.
Example (conceptual pseudocode):
@pipeline()
def ml_pipeline():
models = []
for train_func in [train_svc, train_xgb, train_lr]:
model = train_func(
train_set=prep_data_op.outputs["train_set"],
val_set=prep_data_op.outputs["val_set"],
mlflow_experiment_name=experiment_name
).outputs["model"]
models.append(model)
evaluate_model(
models=models,
test_set=prep_data_op.outputs["test_set"]
)
This behavior would dramatically improve the composability and clarity of pipelines that follow fan-out/fan-in patterns, a common design in machine learning workflows.
This enhancement would:
- Improve ergonomics and readability for ML pipelines involving multiple parallel tasks.
- Enable scalable and dynamic fan-in patterns.
- Reduce boilerplate and potential for error in component signature definitions.
- Align with how users intuitively think about passing data between components.
/reopen
@zazulam: Reopened this issue.
In response to this:
/reopen
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository.
@marcocamilo well put, I think your post here is better suited as an issue, could you convert your comment to one? the request is fairly reasonable, I can add it to the upcoming milestone
@marcocamilo well put, I think your post here is better suited as an issue, could you convert your comment to one? the request is fairly reasonable, I can add it to the upcoming milestone
@HumairAK there is an existing issue @marcocamilo created here. I'll close this and we can continue the discussion on the new one.