pipelines
pipelines copied to clipboard
[feature] Compile processing of PipelineParam
Feature Area
/area sdk
What feature would you like to see?
Currently, KFP compiler compiles pipeline arguments into PipelineParam. Hence, it alters the string processing behavior of pipeline arguments at compile-time and runtime.
What is the use case or pain point?
I am building a Vertex Pipeline for batch prediction using Kubeflow Pipeline SDK. I want to process the pipeline argument bq_table
as a string. The pipeline compiles successfully but it fails at runtime (see screenshot below).
@dsl.pipeline(name='batch-prediction-pipeline')
def batch_prediction_pipeline(
project: str,
region: str,
gcs_source: str,
bq_location: str = 'us',
bq_table: str = 'ml.testing_data',
):
bq_dataset = str(bq_table).split('.')[0] # parse BigQuery dataset name
bq_table_uri = f'{project}.{bq_table}'
bq_dataset_uri = f'{project}.{bq_dataset}'
bq_query = f'''CREATE OR REPLACE EXTERNAL TABLE
{bq_table_uri}
OPTIONS (
format = 'CSV',
uris = ['{gcs_source}']
);'''
bq_query_op = BigqueryQueryJobOp(
project=project,
location=bq_location,
query=bq_query
)
.
.
.
batch_op = gcc_aip.ModelBatchPredictOp(
project=project,
location=region,
bigquery_source_input_uri=f'bq://{bq_table_uri}',
bigquery_destination_output_uri=f'bq://{bq_dataset_uri}',
).after(bq_query_op)
As seen in the screenshot, the value of bigquery_destination_output_uri
doesn't include the split
logic which is implemented. In this case, I am using the pre-built Google Cloud Pipeline Components so I can't change the component implementation.
Other related issues: #2725, #2937
Is there a workaround currently?
Yes. I can define more granular pipeline arguments (e.g. bq_dataset
) to support my use case.
Love this idea? Give it a 👍. We prioritize fulfilling features with the most 👍.
I've also been wrestling with this for a while now.
Would like to highlight another problem that comes from the same behavior -- if you try to add string PipelineParams to a list within a component, the compiler fails because PipelineParams are not JSON serializable.
For example:
from typing import List
@component
def submit_dataproc_job(job_args: List):
# submit job using dataproc client -- job args for dataproc jobs have to be strings
@pipeline
def example_pipeline(
dataproc_job_arg1: str,
dataproc_job_arg2: str,
dataproc_job_arg3: str,
):
submit_dp_job_task = submit_dataproc_job(
job_args=[
dataproc_job_arg1,
dataproc_job_arg2,
dataproc_job_arg3,
]
)
if __name__ == "__main__":
compiler.Compiler().compile(
pipeline_func=example_pipeline,
package_path=f"example_pipeline.json",
)
Running this code results in
TypeError: Object of type PipelineParam is not JSON serializable
If you try to cast the parameter values to strings (which you'd expect them to be handled as, given the type hint) like so:
job_args=[
str(dataproc_job_arg1),
str(dataproc_job_arg2),
str(dataproc_job_arg3),
]
you get an error like the original one shown in this issue. From the VAI UI, job_args
is type string
and set to:
["{{pipelineparam:op=;name=dataproc_job_arg1}}", "{{pipelineparam:op=;name=dataproc_job_arg2}}", "{{pipelineparam:op=;name=dataproc_job_arg3}}"]
Worth noting I have been able to work around this by passing in job_args
as a pipeline level parameter as
@pipeline
def example_pipeline(job_args: List):
submit_dp_job_task = submit_dataproc_job(
job_args=job_args
)
The problem here is that I'd like to be able to use the original pipeline parameters across several components without having to unpack them from a list. Discrete parameters are also more convenient for comparing runs.
If we were able to access the value of a PipelineParam, working around this behavior wouldn't be too difficult. Perhaps that's already possible? My IDE only provides completion hints for str
because of the type hint on these params.
EDIT:
It is absolutely possible to access the value of a PipelineParam by calling .value
, but the values all come out to null
.
The docs for PipelineParam say that value
is "The actual value of the PipelineParam. If provided, the PipelineParam is 'resolved' immediately. For now, we support string only."
I'm not sure where PipelineParams are created, but I'd assume it's in the pipeline decorator logic. I'd have expected that the PipelineParam value
would be set based on the default value in the pipeline function definition, if one was set, and then updated later when the RuntimeConfigBuilder
is created, using its _parameter_values
property which is defined when an actual pipeline run is started.
Has anyone succeeded with retrieving the value
parameter from within the pipeline function and constructing a new string with it? I couldn't find any reference of doing so
https://kubeflow-pipelines.readthedocs.io/en/stable/source/kfp.dsl.html#kfp.dsl.PipelineParam
@yonaroze
In short, no, it isn’t possible to manipulate pipeline parameters outside of components.
What we do instead is create components which preprocess pipeline parameters into the form that’s consumable by your components.
I think this is not a feature it should be designated as a BUG! I also ran into this problem pipeline params are changed during compile time and you cannot get the functionality of the original parameter like split etc.
e.g. ValueError: Value must be one of the following types: str, int, float, bool, dict, and list. Got: "{{channel:task=;name=tableRef;type=String;}}" of type "<class 'kfp.dsl.pipeline_channel.PipelineParameterChannel'>".
@dsl.pipeline(name="BigQuery_to_Parquet", description="Generic pipeline to extract a table to parquet file")
def bigquery_to_parquet(project: str, location: str, service_account: str,
temp_location: str, staging_location: str, tableRef: str, bucket:str
) -> None:
from google_cloud_pipeline_components.v1.wait_gcp_resources import WaitGcpResourcesOp
from google_cloud_pipeline_components.preview.dataflow import DataflowFlexTemplateJobOp
train_dataset_op = DataflowFlexTemplateJobOp(project=project, location=location,
container_spec_gcs_path="gs://dataflow-templates/2021-12-06-00_RC00/flex/BigQuery_to_Parquet",
service_account_email=service_account,
temp_location=temp_location,
staging_location=staging_location,
parameters={
"tableRef": tableRef,
"bucket": bucket}
)
# wait serverless, no costs
_ = WaitGcpResourcesOp(
gcp_resources=train_dataset_op.outputs["gcp_resources"]
)
return None
and when I change the parameters to dict the third party component falters because it cannot deserialize the JSON it receives
@dsl.pipeline(name="BigQuery_to_Parquet", description="Generic pipeline to extract a table to parquet file")
def bigquery_to_parquet(project: str, location: str, service_account: str,
temp_location: str, staging_location: str, parameters: dict
) -> None:
But we can wait as long until this "feature" is getting stale and it will disappear into electronic oblivion
facing a similar issue. We should be able to access and use pipeline parameters outside components...
Any update on this feature? I have encountered several cases where this would be very useful. For example, when wanting to create tasks for a sequential for-loop, that has to be done in a certain order. It would be really helpful to have some way to do this!
Any updates? I am currently facing a use case where depending on the inputs I would like to increase the parallelization of my pipeline.
This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.