pipelines
pipelines copied to clipboard
[feature] Make `set_env_variable` compatible with the channels [KFP V2]
Feature Area
/area sdk
What feature would you like to see?
I want to be able to use values from the parameters and/or outputs to set environment variables in KFP v2
What is the use case or pain point?
I'm trying to pass a variable in the parameters of the job execution and then set it as a variable. Example:
@dsl.pipeline(name="useful-pipeline")
def training_pipeline(
env: str = "dev",
):
great_operator_op = great_operator().set_env_variable("env", env)
However current code raises an exception:
TypeError: Cannot set ml_pipelines.PipelineDeploymentConfig.PipelineContainerSpec.EnvVar.value to {{channel:task=;name=env;type=String;}}: {{channel:task=;name=env;type=String;}} has type <class 'kfp.dsl.pipeline_channel.PipelineParameterChannel'>, but expected one of: (<class 'bytes'>, <class 'str'>) for field EnvVar.value
I've tried to pass trough the intermediate task to cast a type (it helped me with the similar situation to cast parameters types ) but it doesn't work neither.
@dsl.component()
def convert_env_to_string(environment: str) -> str:
return environment
@dsl.pipeline(name="useful-pipeline")
def training_pipeline(
env: str = "dev",
):
convert_env_to_string_op = convert_env_to_string(environment=env)
great_operator_op = great_operator().set_env_variable("env", convert_env_to_string_op.output)
TypeError: Cannot set ml_pipelines.PipelineDeploymentConfig.PipelineContainerSpec.EnvVar.value to {{channel:task=convert-env-to-string;name=Output;type=String;}}: {{channel:task=convert-env-to-string;name=Output;type=String;}} has type <class 'kfp.dsl.pipeline_channel.PipelineParameterChannel'>, but expected one of: (<class 'bytes'>, <class 'str'>) for field EnvVar.value
Is there a workaround currently?
It's possible to don't use an environment at all, but it's not really a workaround.
Love this idea? Give it a 👍.
As an alternative, would you consider to pass as a parameter to great_operator
instead?
Hi @Davidnet, yes. I should have been more clear. That's exactly what I meant by don't use an environment at all
. This solution works and I'll use it. But still I have to move out from the environment variables. I can imagine even deeper solution by passing this value as a param and then, inject it using for example environ
but it looks really hackish.
@dsl.component()
def convert_env_to_string(env: str) -> str:
import os
os.environ["env"] = env
# rest of the code
@dsl.pipeline(name="useful-pipeline")
def training_pipeline(
env: str = "dev",
):
great_operator_op = great_operator(env=env)
Hi, I am also struggling with this.
I am using the env variables to set the image to a dynamic version inside of the components like so:
@dsl.component(
base_image=os.environ.get("BASE_IMAGE"),
)
def some_op(foo: str):
...
@dsl.pipeline(name="useful-pipeline")
def training_pipeline(
base_image: str = "IMAGE_ID"
):
op = (
some_op(
foo="bar",
)
.set_env_variable(name="BASE_IMAGE", value=base_image)
)
Is there any alternative way of doing this? Otherwise I would like to help out on implementing this feature.
@ViktorWelbers , are you sure that you're looking for the same feature? It looks like it won't help you with your problem. If you want it to fill base_image
for your component only, you should provide this variable during pipeline compilation, not in runtime.
Something like that:
import os
BASE_IMAGE = =os.environ.get("BASE_IMAGE")
@dsl.component(
base_image=BASE_IMAGE
)
def some_op(foo: str):
...
@dsl.pipeline(name="useful-pipeline")
def training_pipeline(
base_image: str = "IMAGE_ID"
):
op = (
some_op(
foo="bar",
)
)
From my understanding, base_image
from the dsl.component
won't be accessible at runtime even if you would have a possibility to set env variable from the parameters.
So if you really want to change an image from parameters I guess you might want to go trough the container component.
Something like that:
from kfp import dsl
@dsl.container_component
def some_op(foo:str, base_image: str):
return dsl.ContainerSpec(image=base_image, command=['echo'], args=[foo])
@dsl.pipeline(name="useful-pipeline")
def training_pipeline(
base_image: str = "IMAGE_ID"
):
op = (
some_op(
foo="bar",
base_image=base_image
)
)
@alexdashkov
I want to reuse the same component logic with different base images.
You are totally correct. I think the second option you posted is what I was looking for. Thank you.
Same problem with
kubernetes.use_secret_as_env(task, secret_name=secret_name_from_pipeline_args,...)
This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.
A comment to remove stale
label as I still want to have this feature
@beoygnas Where do you run your pipeline? Do you have an on-premise kubeflow instance? I'm using GCP's vertex ai pipelines and it does not pass there. I can compile a pipeline, but then an executor fails.
com.google.cloud.ai.platform.common.errors.AiPlatformException: code=INVALID_ARGUMENT, message=List of found errors: 1.Field: job_spec.worker_pool_specs[0].container_spec.env[0].value; Message: Required field is not set. , cause=null; Failed to create custom job for the task.