pipelines
pipelines copied to clipboard
Converting v1 SDK code to v2 (`.add_pvolumes` and `.after`)
Environment
Nothing deployed, but it'll be using Kubeflow v2 when it is.
- KFP SDK version: kfp 2.6.0 kfp-kubernetes 1.1.0 kfp-pipeline-spec 0.3.0 kfp-server-api 2.0.5
Steps to reproduce
I'm trying to use the KFP SDK to dynamically define pipelines with arbitrary numbers of steps. This gives a flavor:
def _create_dynamic_pipeline(self):
"""Create a dynamic pipeline for each step."""
step_name_to_pipeline_task: Dict[str, dsl.PipelineTask] = {}
components = []
for step_name, step in deployment.step_configurations.items():
image = self.get_image(deployment=deployment, step_name=step_name)
command = StepEntrypointConfiguration.get_entrypoint_command()
arguments = StepEntrypointConfiguration.get_entrypoint_arguments(
step_name=step_name, deployment_id=deployment.id
)
dynamic_component = self._create_dynamic_component(
image, command, arguments, step_name
)
step_name_to_pipeline_task[step_name] = dynamic_component
# apply pod settings
for key, value in pod_settings.node_selectors.items():
dynamic_component.add_node_selector_constraint(label_name=key, value=value)
# add resource requirements
if step_settings.resource_settings.cpu_count is not None:
pipeline_task = pipeline_task.set_cpu_limit(
str(step_settings.resource_settings.cpu_count)
)
if step_settings.resource_settings.gpu_count is not None:
pipeline_task = pipeline_task.set_accelerator_limit(
step_settings.resource_settings.gpu_count
)
if step_settings.resource_settings.memory is not None:
memory_limit = step_settings.resource_settings.memory[:-1]
pipeline_task = pipeline_task.set_memory_limit(memory_limit)
# set environment variables
for key, value in environment.items():
dynamic_component.set_env_variable(
name=key,
value=value,
)
components.append(dynamic_component)
@dsl.pipeline(
display_name=orchestrator_run_name,
)
def dynamic_pipeline():
for component in components:
component().set_caching_options(
enable_caching=False
).set_env_variable(
name=ENV_ZENML_TEKTON_RUN_ID,
value="$(context.pipelineRun.name)",
)
return dynamic_pipeline
dynamic_pipeline = _create_dynamic_pipeline()
Expected result
I'm having difficulties with the following:
- adding k8s volumes to these components dynamically, esp with
.add_pvolumes
no longer being an option when using v2. - Setting the upstream components of each component as part of this dynamic creation. For example, previously I was doing something like this:
for upstream_step_name in step.spec.upstream_steps:
upstream_pipeline_task = ... # get the `PipelineTask` from somewhere else via its name
_pipeline_task().after(upstream_pipeline_task)
It's unclear to me how to do this with the new v2 SDK.
Labels
/area backend /area sdk /area components
Impacted by this bug? Give it a 👍.