tfx icon indicating copy to clipboard operation
tfx copied to clipboard

KubeflowV2DagRunner ignores env part of the PipelineDeploymentConfig.PipelineContainerSpec

Open edi-bice opened this issue 8 months ago • 2 comments

Trying to migrate TFX v1 pipelines running on Kubeflow 1.8 to v2 ones and unable to set the necessary env variables for tensorflow-io to work with embedded Minio through S3 protocol.

System information

  • Have I specified the code to reproduce the issue (Yes, No): yes
  • Environment in which the code is executed (e.g., Local(Linux/MacOS/Windows), Interactive Notebook, Google Cloud, etc): local Linux
  • TensorFlow version: 2.16.2
  • TFX Version: 1.16
  • Python version: 3.9.2
  • Python dependencies (from pip freeze output):

Describe the current behavior

Attempt at setting globally at pipeline level does not work, i.e. the pipeline yaml does not show any of the components containing the desired env

`#'accelerator', 'cpu_limit', 'cpu_request', 'memory_limit', 'memory_request' #pipeline_pb2.PipelineDeploymentConfig.PipelineContainerSpec.ResourceSpec.AcceleratorConfig(type='nvidia', count=1) pcRes = pipeline_pb2.PipelineDeploymentConfig.PipelineContainerSpec.ResourceSpec(cpu_request=2.0, memory_request=8.0) pcEnv = [pipeline_pb2.PipelineDeploymentConfig.PipelineContainerSpec.EnvVar(name=name, value=value) for name, value in configs.get_MINIO_S3_ENV()] pcSpec = pipeline_pb2.PipelineDeploymentConfig.PipelineContainerSpec(resources=pcRes, env=pcEnv)

tfx.dsl.Pipeline( pipeline_name=pipeline_name, pipeline_root=pipeline_root, components=components, ... platform_config=pcSpec), # does not seem to work at pipeline level, trying per component`

Attempt at doing same at component level (though not ideal due to repetition, was hoping to layer on and override where necessary) also does not work, i.e. pipeline yaml does not show the env variables in the respective container

CsvExampleGen( input_base=data_path, input_config=input ).with_platform_config(pcSpec)

Browsing through the codebase it seems to restrict platform_config to be just the ResourceSpec part, the sibling of EnvVar in https://github.com/kubeflow/pipelines/blob/1ba6d5f1c402158966d7fdc552b99c0ffca2dfa8/api/v2alpha1/pipeline_spec.proto#L688

` def _build_container_spec(self) -> ContainerSpec: """Builds the container spec for a component.

Returns:
  The PipelineContainerSpec represents the container execution of the
  component.

Raises:
  NotImplementedError: When the executor class is neither ExecutorClassSpec
  nor TemplatedExecutorContainerSpec.
"""

assert isinstance(self._node, base_component.BaseComponent)

if self._node.platform_config:
  logging.info(
      'ResourceSpec with container execution parameters has been passed via platform_config'
  )
  assert isinstance(
      self._node.platform_config, pipeline_pb2.PipelineDeploymentConfig
      .PipelineContainerSpec.ResourceSpec
  ), ('platform_config, if set by the user, must be a ResourceSpec proto '
      'specifying vCPU and vRAM requirements')
  cpu_limit = self._node.platform_config.cpu_limit
  memory_limit = self._node.platform_config.memory_limit
  if cpu_limit:
    assert (cpu_limit >= 0), ('vCPU must be non-negative')
  if memory_limit:
    assert (memory_limit >= 0), ('vRAM must be non-negative')

  if self._node.platform_config.accelerator.type:
    assert (self._node.platform_config.accelerator.count >=
            0), ('GPU type and count must be set')

if isinstance(self._node.executor_spec,
              executor_specs.TemplatedExecutorContainerSpec):
  container_spec = self._node.executor_spec
  result = ContainerSpec(
      image=container_spec.image,
      command=_resolve_command_line(
          container_spec=container_spec,
          exec_properties=self._node.exec_properties,
      ))
  if self._node.platform_config:
    result.resources.CopyFrom(self._node.platform_config)
  return result`

edi-bice avatar Apr 22 '25 03:04 edi-bice

Hi @edi-bice and the TFX maintainers,

I’d love to work on fixing this issue: the KubeflowV2DagRunner currently ignores the env configuration defined in PipelineDeploymentConfig.PipelineContainerSpec, which breaks scenarios like setting required environment variables for embedded MinIO.

I’m familiar with TFX orchestration and Kubeflow V2 pipelines, and I'd like to dig into how env values are passed through the runner and into the component container spec. Would you be able to assign this issue to me or provide any pointers on where best to begin?

Thanks for maintaining this project—I look forward to contributing!

Vivek1106-04 avatar Jul 13 '25 05:07 Vivek1106-04

@edi-bice I'm not sure if this is the right answer, but the difference across v1 and v2 is that in v2, specifying kubernetes related configuration has been externalized to kfp-kubernetes. In v1 I think there is no intermediate representation, it goes straight to argo. I think a way to solve this would be to look at the IR for v2 for what you're trying to accomplish using kfp-kubernetes and a simple non-TFX kubeflow pipeline, and then try to get KubeflowV2DagRunner to produce that (or alternatively, patch it in the yaml that is produced).

@Vivek1106-04 are you familiar with the compilation process? Maybe we can design the compiler to produce the correct format that is compatible with the new approach.

pritamdodeja avatar Nov 20 '25 05:11 pritamdodeja