flink icon indicating copy to clipboard operation
flink copied to clipboard

[FLINK-36576][runtime] Improving amount-based data balancing distribution algorithm for DefaultVertexParallelismAndInputInfosDecider

Open noorall opened this issue 1 year ago • 1 comments

What is the purpose of the change

Currently, the DefaultVertexParallelismAndInputInfosDecider is able to implement a balanced distribution algorithm based on the amount of data and the number of subpartitions, however it also has some limitations:

  1. Currently, Decider selects the data distribution algorithm via the AllToAll or Pointwise attribute of the input, which limits the ability of the operator to dynamically modify the data distribution algorithm.
  2. Doesn't support data volume-based balanced distribution for Pointwise inputs.
  3. For AllToAll type inputs, it does not support splitting the data corresponding to the specific key, i.e., it cannot solve the data skewing caused by single-key hotspot.

For that we plan to introduce the following improvements:

  1. Introducing InterInputsKeyCorrelation and IntraInputKeyCorrelation to the input characterisation which allows the operator to flexibly choose the data balanced distribution algorithm.
  2. Introducing a data volume-based data balanced distribution algorithm for Pointwise inputs
  3. Introducing the ability to split data corresponding to the specific key to optimise AllToAll's data volume-based data balancing distribution algorithm.

Brief change log

  • Introducing InterInputsKeyCorrelation and IntraInputKeyCorrelation.
  • Introducing amount-based data balanced distribution algorithm for Pointwise.
  • Introducing the ability to split data corresponding to the specific key for AllToAll

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

noorall avatar Oct 21 '24 08:10 noorall

CI report:

  • 35324d24fdf79b1a743460f0a8acbbda19e4d397 Azure: SUCCESS
Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

flinkbot avatar Oct 21 '24 08:10 flinkbot

@flinkbot run azure

noorall avatar Jan 07 '25 12:01 noorall