kubeflow_v2_entrypoint_utils fails to retrieve classpath
from tfx import v1 as tfx from tfx.types import standard_artifacts example_gen = CsvExampleGen(input_base=data_path, input_config=input) components.append(example_gen) examples = example_gen.outputs['examples'] statistics_gen = StatisticsGen(examples=examples, stats_options=stats_options)
System information
- Have I specified the code to reproduce the issue (Yes, No):Yes
- Environment in which the code is executed (e.g., Local(Linux/MacOS/Windows), Interactive Notebook, Google Cloud, etc): Kubeflow 1.8 on Linux
- TensorFlow version: 2.16.2
- TFX Version: 1.16.0
- Python version: 3.10
- Python dependencies (from
pip freezeoutput): kfp 2.0.1 kfp-kubernetes 1.0.0 kfp-pipeline-spec 0.2.2 kfp-server-api 2.0.5
I0510 00:47:39.256530 50 launcher_v2.go:90] input ComponentSpec:{ "inputDefinitions": { "artifacts": { "examples": { "artifactType": { "instanceSchema": "title: tfx.Examples\ntype: object\nproperties:\n span:\n type: integer\n description: Span for an artifact.\n version:\n type: integer\n description: Version for an artifact.\n split_names:\n type: string\n description: JSON-encoded list of splits for an artifact. Empty string means artifact has no split.\n" } } }, "parameters": { "exclude_splits": { "parameterType": "STRING" }, "stats_options_json": { "parameterType": "STRING" } } }, "outputDefinitions": { "artifacts": { "statistics": { "artifactType": { "instanceSchema": "title: tfx.ExampleStatistics\ntype: object\nproperties:\n span:\n type: integer\n description: Span for an artifact.\n split_names:\n type: string\n description: JSON-encoded list of splits for an artifact. Empty string means artifact has no split.\n" } } } }, "executorLabel": "StatisticsGen_executor" } I0510 00:47:39.257013 50 cache.go:116] Connecting to cache endpoint ml-pipeline.kubeflow.svc.cluster.local:8887
Traceback (most recent call last):
File "/opt/conda/lib/python3.10/runpy.py", line 196, in _run_module_as_main
return _run_code(code, main_globals, None,
File "/opt/conda/lib/python3.10/runpy.py", line 86, in _run_code
exec(code, run_globals)
File "/opt/conda/lib/python3.10/site-packages/tfx/orchestration/kubeflow/v2/container/kubeflow_v2_run_executor.py", line 233, in
Hey @edi-bice
We recommend to assign example_gen before referencing it. Below is snippet:
example_gen = CsvExampleGen(input_base=data_path, input_config=input)
components.append(example_gen)
Also, suggest you to ensure examples = example_gen.outputs['examples'] is a valid artifact
Please let us know if this resolves the issue. If not request to provide more details to reproduce the issue.
Thank you!
I do indeed assign the component before referencing it - updated my simplified code snippet to show that. The full blown version is something like if path is specified import examples otherwise generate. And indeed, CsvExampleGen produces examples. I can see the examples node in the pipeline run UI and can navigate to the object store path and see the files there as well.
One thing of note is that I specify s3 in the pipeline root and configure s3 compliant object store in kfp-launcher as described here https://www.kubeflow.org/docs/components/pipelines/operator-guides/configure-object-store/#s3-and-s3-compatible-provider
@edi-bice I think the way artifacts are being handled is changed with kfp v2. The reason you're seeing this might be because type_schema.instance_schema is empty in kubeflow_v2_entrypoint_utils.py. The fix is to ensure artifact_instance.type is not empty in kubeflow_v2_run_executor.py.
If you put the below code block after outputs_parameters is initialized in kubeflow_v2_run_executor.py, you should be able to patch it.
if inputs_spec:
for input_key, artifact_list in inputs_dict.items():
# Check if this artifact key exists in the static component spec
if input_key in inputs_spec.artifacts:
static_spec = inputs_spec.artifacts[input_key]
# Check if the static spec has a defined schema title (e.g., tfx.ExampleStatistics)
if static_spec.artifact_type.instance_schema:
# If the artifact list is not empty, iterate through runtime instances
for artifact_instance in artifact_list.artifacts:
# Check if the runtime instance is missing the instanceSchema
if not artifact_instance.type.instance_schema:
# Copy the full schema from the static spec into the runtime instance
artifact_instance.type.CopyFrom(static_spec.artifact_type)
Hopefully this solves your issue.
@nikelite please advise if this is a good way to approach this scenario, or is there a more fundamental way to handle kfp v2. I'm willing to document/implement the change with your's/team's guidance.