tfx
tfx copied to clipboard
Dataflow jobs fail when using the same beam_pipeline_args for InteractiveContext and KubeflowDagRunner
System information
- Have I specified the code to reproduce the issue (Yes, No): Yes
- Environment in which the code is executed (e.g., Local(Linux/MacOS/Windows), Interactive Notebook, Google Cloud, etc): Kubeflow Notebooks
- TensorFlow version: 2.5.0
- TFX Version: 1.2.1
- Python version: 3.7
- Python dependencies (from
pip freezeoutput): ...
Describe the current behavior
Typically, we define a BEAM_PIPELINE_ARGS constant and pass it to both InteractiveContext and a Pipeline via beam_pipeline_args during experimentation in a notebook. InteractiveContext adds labels to the beam_pipelines_args list passed to it when running a component. If this same list is passed to a pipeline and that pipeline is run on Kubeflow Pipelines, its components fail on Dataflow because the labels list is malformed.
The Dataflow job fails with the following error:
Encountered an invalid user label. generic::invalid_argument: Invalid field "user_labels";
key "['tfx_executor" does not conform to regular expression "[\p{Ll}\p{Lo}][\p{Ll}\p{Lo}\p{N}_-]{0,62}";
first character "[" is not a non-uppercased letter (Unicode character class Ll or Lo)
[google.rpc.error_details_ext] { message: "Invalid field \"user_labels\"; key \"[\'tfx_executor\"
does not conform to regular expression \"[\\p{Ll}\\p{Lo}][\\p{Ll}\\p{Lo}\\p{N}_-]{0,62}\";
first character \"[\" is not a non-uppercased letter (Unicode character class Ll or Lo)" }
The list of labels looks like this, for example:
["['tfx_executor=third_party_executor', 'tfx_py_version=3-7', 'tfx_runner=interactivecontext',
'tfx_version=1-2-1', 'tfx_executor=tfx-components-transform-executor-executor', 'tfx_py_version=3-7',
'tfx_runner=interactivecontext', 'tfx_version=1-2-1']", 'tfx_executor=third_party_executor',
'tfx_py_version=3-7', 'tfx_runner=kfp', 'tfx_version=1-2-1']
The list of labels is expected to look like this instead:
['tfx_executor=third_party_executor', 'tfx_py_version=3-7', 'tfx_runner=kfp', 'tfx_version=1-2-1']
This error seems limited to running a component with InteractiveContext then running it on Kubeflow Pipelines:
- Running a component with
InteractiveContextthen running it usingLocalDagRunnerworks as expected. - Only running a component with
LocalDagRunnerthen running it in Kubeflow Pipelines works as expected.
Describe the expected behavior
Running components on Dataflow via Kubeflow Pipelines after running them with InteractiveContext should work without error. InteractiveContext shouldn't mutate beam_pipeline_args passed to it and should instead make a copy of this list.
Standalone code to reproduce the issue
BEAM_PIPELINE_ARGS = ['--runner=DataflowRunner', ...]
example_gen = BigQueryExampleGen(...)
context = InteractiveContext(beam_pipeline_args=BEAM_PIPELINE_ARGS)
pipeline = Pipeline(beam_pipeline_args=BEAM_PIPELINE_ARGS, components=[example_gen])
context.run(example_gen)
# compile the pipeline with KubeflowDagRunner and run it using a kfp.Client method
Name of your Organization (Optional) Twitter
Other info / logs We suspect this happens hereabouts: https://github.com/tensorflow/tfx/blob/master/tfx/dsl/components/base/base_beam_executor.py#L88
Hi, InteractiveContext is mainly for local debugging, you can follow https://www.tensorflow.org/tfx/tutorials/tfx/cloud-ai-platform-pipelines
Yes, we're using InteractiveContext for local debugging. The use case is running components one by one as you iterate on them before running a pipeline end-to-end using LocalDagRunner then KubeflowDagRunner + kfp.Client. The expectation is that you should be able to define beam pipeline args in a constant and reliably use the same args for your components / pipeline at each stage (interactive, local, kfp). In other words, the runner shouldn't manipulate the list passed to it but should instead update a copy of the list.
Thanks! Could you check whether changing the following line fixes this?
Specifically, change this line:
https://github.com/tensorflow/tfx/blob/6429c643233f1c1fca41c7c02e7da966c763c7eb/tfx/orchestration/experimental/interactive/interactive_context.py#L139
to
beam_pipeline_args = list(beam_pipeline_args or self.beam_pipeline_args)
Changing to beam_pipeline_args = list(beam_pipeline_args or self.beam_pipeline_args) fixes this. There are no changes to the original list and the labels are well-formed now.
@codesue,
Kindly let us know if this issue can be closed. Thank you!
Hi @singhniraj08, I have a workaround, but it would be nice to have this fix merged. Should I submit a pull request for this?
@codesue, Please go ahead to create a PR for the fix. Thank you!
Hi @singhniraj08, I just opened a PR with the fix.