model-analysis icon indicating copy to clipboard operation
model-analysis copied to clipboard

TFMA on Flink does not seems to be parallelizing work.

Open jccarles opened this issue 2 years ago • 5 comments

Hello I am running into an issue running the evaluator component of a tfx pipeline. I use the FlinkRunner for beam and the evaluator component is super slow as the size of data scales. It seems it is because the work is done only by a single Task Manager.

System information

I am running a TFX pipeline using python 3.7. TFX version 1.8.1 which comes with TFMA version tensorflow-model-analysis==0.39.0. I don't have a small example to reproduce, I can work on one if you think it will help.

Describe the problem

I use the evaluator TFX component as such

evaluator = Evaluator(
        examples=example_gen.outputs[standard_component_specs.EXAMPLES_KEY],
        model=trainer.outputs[standard_component_specs.MODEL_KEY],
        eval_config=eval_config,
    )

With a simple eval_config without any splits. So we only have the eval_split which is used for evaluation.

To run the TFX pipeline we use the FlinkRunner for beam. The sidecar image is built from tensorflow/tfx:1.8.1.

We run flink with a parellism of 10. So 10 files of tf_records are in input of the evaluator component.

From what we could gather, beam tells flink to build a single task for the 3 p_transforms:

"ReadFromTFRecordToArrow" | "FlattenExamples" | "ExtractEvaluateAndWriteResults"

Our issue is that this ends up creating a single subtask for Flink, so a single task manager is doing all the work as you can see in the attached screenshot. So the issue seems to be with the beam workflow which does not parallelized.

I have two main questions:

  • Is this behavior normal ?
  • Is it possible to better dispatch the workload between the different taskmanagers ?

Screenshot from 2023-01-26 15-18-44

jccarles avatar Jan 31 '23 15:01 jccarles

@jccarles,

Can you please share the eval_config passed in evaluator component to analyse the root cause of the issue? Thank you!

singhniraj08 avatar Feb 06 '23 08:02 singhniraj08

Hello ! Thank you for your answer, here is the eval_config used. We used fake very low bounds for testing.

{
  "model_specs": [
    {
      "signature_name": "serving_default",
      "label_key": "label",
      "preprocessing_function_names": [
        "transform_features"
      ]
    }
  ],
  "metrics_specs": [
    {
      "thresholds": {
        "precision": {
          "value_threshold": {
            "lower_bound": 1e-03
          },
          "change_threshold": {
            "absolute": -1e-10,
            "direction": "HIGHER_IS_BETTER"
          }
        }
      }
    }
  ]
}

jccarles avatar Feb 06 '23 08:02 jccarles

@mdreves, Can we dispatch the evaluator between different task managers. Thanks!

singhniraj08 avatar Feb 07 '23 09:02 singhniraj08

Hello, thank you for checking this issue, did you have time to take a look ? Have you identified anything so far, can I help somehow ?

jccarles avatar Apr 03 '23 14:04 jccarles

This issue is currently preventing us from using the Evaluator component in production, since it makes the memory requirements on a single Flink TaskManager rather huge.

Enzo90910 avatar Apr 11 '23 11:04 Enzo90910