flink
flink copied to clipboard
[FLINK-36576][runtime] Improving amount-based data balancing distribution algorithm for DefaultVertexParallelismAndInputInfosDecider
What is the purpose of the change
Currently, the DefaultVertexParallelismAndInputInfosDecider is able to implement a balanced distribution algorithm based on the amount of data and the number of subpartitions, however it also has some limitations:
- Currently, Decider selects the data distribution algorithm via the AllToAll or Pointwise attribute of the input, which limits the ability of the operator to dynamically modify the data distribution algorithm.
- Doesn't support data volume-based balanced distribution for Pointwise inputs.
- For AllToAll type inputs, it does not support splitting the data corresponding to the specific key, i.e., it cannot solve the data skewing caused by single-key hotspot.
For that we plan to introduce the following improvements:
- Introducing InterInputsKeyCorrelation and IntraInputKeyCorrelation to the input characterisation which allows the operator to flexibly choose the data balanced distribution algorithm.
- Introducing a data volume-based data balanced distribution algorithm for Pointwise inputs
- Introducing the ability to split data corresponding to the specific key to optimise AllToAll's data volume-based data balancing distribution algorithm.
Brief change log
- Introducing InterInputsKeyCorrelation and IntraInputKeyCorrelation.
- Introducing amount-based data balanced distribution algorithm for Pointwise.
- Introducing the ability to split data corresponding to the specific key for AllToAll
Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (no)
- The public API, i.e., is any changed class annotated with
@Public(Evolving): (no) - The serializers: (no)
- The runtime per-record code paths (performance sensitive): (no)
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
- The S3 file system connector: (no)
Documentation
- Does this pull request introduce a new feature? (no)
- If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
CI report:
- 35324d24fdf79b1a743460f0a8acbbda19e4d397 Azure: SUCCESS
Bot commands
The @flinkbot bot supports the following commands:@flinkbot run azurere-run the last Azure build
@flinkbot run azure