pipelines icon indicating copy to clipboard operation
pipelines copied to clipboard

[feature] Make `set_env_variable` compatible with the channels [KFP V2]

Open alexdashkov opened this issue 1 year ago • 6 comments

Feature Area

/area sdk

What feature would you like to see?

I want to be able to use values from the parameters and/or outputs to set environment variables in KFP v2

What is the use case or pain point?

I'm trying to pass a variable in the parameters of the job execution and then set it as a variable. Example:


@dsl.pipeline(name="useful-pipeline")
def training_pipeline(
    env: str = "dev",
):
    great_operator_op = great_operator().set_env_variable("env", env)

However current code raises an exception:

TypeError: Cannot set ml_pipelines.PipelineDeploymentConfig.PipelineContainerSpec.EnvVar.value to {{channel:task=;name=env;type=String;}}: {{channel:task=;name=env;type=String;}} has type <class 'kfp.dsl.pipeline_channel.PipelineParameterChannel'>, but expected one of: (<class 'bytes'>, <class 'str'>) for field EnvVar.value

I've tried to pass trough the intermediate task to cast a type (it helped me with the similar situation to cast parameters types ) but it doesn't work neither.

@dsl.component()
def convert_env_to_string(environment: str) -> str:
    return environment

@dsl.pipeline(name="useful-pipeline")
def training_pipeline(
    env: str = "dev",
):
    convert_env_to_string_op = convert_env_to_string(environment=env)
    great_operator_op = great_operator().set_env_variable("env", convert_env_to_string_op.output)
TypeError: Cannot set ml_pipelines.PipelineDeploymentConfig.PipelineContainerSpec.EnvVar.value to {{channel:task=convert-env-to-string;name=Output;type=String;}}: {{channel:task=convert-env-to-string;name=Output;type=String;}} has type <class 'kfp.dsl.pipeline_channel.PipelineParameterChannel'>, but expected one of: (<class 'bytes'>, <class 'str'>) for field EnvVar.value

Is there a workaround currently?

It's possible to don't use an environment at all, but it's not really a workaround.


Love this idea? Give it a 👍.

alexdashkov avatar Oct 17 '23 15:10 alexdashkov

As an alternative, would you consider to pass as a parameter to great_operator instead?

Davidnet avatar Oct 17 '23 21:10 Davidnet

Hi @Davidnet, yes. I should have been more clear. That's exactly what I meant by don't use an environment at all. This solution works and I'll use it. But still I have to move out from the environment variables. I can imagine even deeper solution by passing this value as a param and then, inject it using for example environ but it looks really hackish.

@dsl.component()
def convert_env_to_string(env: str) -> str:
    import os
    os.environ["env"] = env
    # rest of the code

@dsl.pipeline(name="useful-pipeline")
def training_pipeline(
    env: str = "dev",
):
    great_operator_op = great_operator(env=env)

alexdashkov avatar Oct 17 '23 21:10 alexdashkov

Hi, I am also struggling with this.

I am using the env variables to set the image to a dynamic version inside of the components like so:


@dsl.component(
    base_image=os.environ.get("BASE_IMAGE"),
)
def some_op(foo: str):
...


@dsl.pipeline(name="useful-pipeline")
def training_pipeline(
    base_image: str = "IMAGE_ID"
):
  op = (
       some_op(
            foo="bar",
        )
        .set_env_variable(name="BASE_IMAGE", value=base_image)
    )

Is there any alternative way of doing this? Otherwise I would like to help out on implementing this feature.

ViktorWelbers avatar Nov 16 '23 09:11 ViktorWelbers

@ViktorWelbers , are you sure that you're looking for the same feature? It looks like it won't help you with your problem. If you want it to fill base_image for your component only, you should provide this variable during pipeline compilation, not in runtime.

Something like that:

import os
BASE_IMAGE = =os.environ.get("BASE_IMAGE")

@dsl.component(
    base_image=BASE_IMAGE
)
def some_op(foo: str):
    ...


@dsl.pipeline(name="useful-pipeline")
def training_pipeline(
    base_image: str = "IMAGE_ID"
):
  op = (
       some_op(
            foo="bar",
        )
    )

From my understanding, base_image from the dsl.component won't be accessible at runtime even if you would have a possibility to set env variable from the parameters. So if you really want to change an image from parameters I guess you might want to go trough the container component. Something like that:

from kfp import dsl

@dsl.container_component
def some_op(foo:str, base_image: str):
    return dsl.ContainerSpec(image=base_image, command=['echo'], args=[foo])
    
@dsl.pipeline(name="useful-pipeline")
def training_pipeline(
    base_image: str = "IMAGE_ID"
):
  op = (
       some_op(
            foo="bar",
            base_image=base_image
        )
    )

alexdashkov avatar Nov 16 '23 09:11 alexdashkov

@alexdashkov

I want to reuse the same component logic with different base images.

You are totally correct. I think the second option you posted is what I was looking for. Thank you.

ViktorWelbers avatar Nov 16 '23 09:11 ViktorWelbers

Same problem with

kubernetes.use_secret_as_env(task, secret_name=secret_name_from_pipeline_args,...)

fw8 avatar Feb 14 '24 15:02 fw8

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

github-actions[bot] avatar Apr 15 '24 07:04 github-actions[bot]

A comment to remove stale label as I still want to have this feature

alexdashkov avatar Apr 15 '24 12:04 alexdashkov

@beoygnas Where do you run your pipeline? Do you have an on-premise kubeflow instance? I'm using GCP's vertex ai pipelines and it does not pass there. I can compile a pipeline, but then an executor fails.

com.google.cloud.ai.platform.common.errors.AiPlatformException: code=INVALID_ARGUMENT, message=List of found errors: 1.Field: job_spec.worker_pool_specs[0].container_spec.env[0].value; Message: Required field is not set. , cause=null; Failed to create custom job for the task. 

alexdashkov avatar Jun 12 '24 09:06 alexdashkov