pipelines icon indicating copy to clipboard operation
pipelines copied to clipboard

Converting v1 SDK code to v2 (`.add_pvolumes` and `.after`)

Open strickvl opened this issue 1 year ago • 0 comments

Environment

Nothing deployed, but it'll be using Kubeflow v2 when it is.

  • KFP SDK version: kfp 2.6.0 kfp-kubernetes 1.1.0 kfp-pipeline-spec 0.3.0 kfp-server-api 2.0.5

Steps to reproduce

I'm trying to use the KFP SDK to dynamically define pipelines with arbitrary numbers of steps. This gives a flavor:

def _create_dynamic_pipeline(self):
            """Create a dynamic pipeline for each step."""
            step_name_to_pipeline_task: Dict[str, dsl.PipelineTask] = {}
            components = []

            for step_name, step in deployment.step_configurations.items():
                image = self.get_image(deployment=deployment, step_name=step_name)
                command = StepEntrypointConfiguration.get_entrypoint_command()
                arguments = StepEntrypointConfiguration.get_entrypoint_arguments(
                    step_name=step_name, deployment_id=deployment.id
                )

                dynamic_component = self._create_dynamic_component(
                    image, command, arguments, step_name
                )
                step_name_to_pipeline_task[step_name] = dynamic_component

                # apply pod settings
                for key, value in pod_settings.node_selectors.items():
                    dynamic_component.add_node_selector_constraint(label_name=key, value=value)

                # add resource requirements
                if step_settings.resource_settings.cpu_count is not None:
                    pipeline_task = pipeline_task.set_cpu_limit(
                        str(step_settings.resource_settings.cpu_count)
                    )

                if step_settings.resource_settings.gpu_count is not None:
                    pipeline_task = pipeline_task.set_accelerator_limit(
                        step_settings.resource_settings.gpu_count
                    )

                if step_settings.resource_settings.memory is not None:
                    memory_limit = step_settings.resource_settings.memory[:-1]
                    pipeline_task = pipeline_task.set_memory_limit(memory_limit)

                # set environment variables
                for key, value in environment.items():
                    dynamic_component.set_env_variable(
                        name=key,
                        value=value,
                    )
                components.append(dynamic_component)

            @dsl.pipeline(
                display_name=orchestrator_run_name,
            )
            def dynamic_pipeline():
                for component in components:
                    component().set_caching_options(
                        enable_caching=False
                    ).set_env_variable(
                        name=ENV_ZENML_TEKTON_RUN_ID,
                        value="$(context.pipelineRun.name)",
                    )

            return dynamic_pipeline

        dynamic_pipeline = _create_dynamic_pipeline()

Expected result

I'm having difficulties with the following:

  • adding k8s volumes to these components dynamically, esp with .add_pvolumes no longer being an option when using v2.
  • Setting the upstream components of each component as part of this dynamic creation. For example, previously I was doing something like this:
for upstream_step_name in step.spec.upstream_steps:
                    upstream_pipeline_task = ... # get the `PipelineTask` from somewhere else via its name
                    _pipeline_task().after(upstream_pipeline_task)

It's unclear to me how to do this with the new v2 SDK.

Labels

/area backend /area sdk /area components


Impacted by this bug? Give it a 👍.

strickvl avatar Feb 08 '24 14:02 strickvl