starrocks
starrocks copied to clipboard
[Enhancement] Allow one-phase local aggregate in the single BE
What type of PR is this:
- [ ] bug
- [ ] feature
- [x] enhancement
- [ ] refactor
- [ ] others
Problem Summary:
If there is only one BE and one fragment instance, we could generate one-phase local aggregation with the local shuffle operator (Scan->OnePhaseAgg->LocalShuffle) regardless of the differences between grouping keys and scan distribution keys.
Modification
- Add
withLocalShuffle
toLogicalAggregationOperator
andPhysicalAggregationOperator
. - Add
GenerateAggregateWithLocalShuffleRule
to setLogicalAggregationOperator#withLocalShuffle
as true. -
HashDistributionSpec#isSatisfy()
returns true when required property need local shuffle, and this isLOCAL
scan. -
PlanFragmentBuilder#visitPhysicalHashAggregate()
setisNeedLocalShuffle
andAssignScanRangesPerDriverSeq
according toPhysicalAggregationOperator#withLocalShuffle
.
run starrocks_fe_unittest
run starrocks_fe_unittest
[FE PR Coverage Check]
:heart_eyes: pass : 86 / 87 (98.85%)
file detail
path | covered_line | new_line | coverage | not_covered_line_detail | |
---|---|---|---|---|---|
:large_blue_circle: | com/starrocks/sql/optimizer/base/HashDistributionDesc.java | 8 | 9 | 88.89% | [131] |
:large_blue_circle: | com/starrocks/sql/optimizer/RequiredPropertyDeriver.java | 3 | 3 | 100.00% | [] |
:large_blue_circle: | com/starrocks/sql/optimizer/Optimizer.java | 2 | 2 | 100.00% | [] |
:large_blue_circle: | com/starrocks/sql/optimizer/rule/transformation/GenerateAggregateWithLocalShuffleRule.java | 15 | 15 | 100.00% | [] |
:large_blue_circle: | com/starrocks/qe/SessionVariable.java | 5 | 5 | 100.00% | [] |
:large_blue_circle: | com/starrocks/sql/optimizer/rule/RuleSet.java | 2 | 2 | 100.00% | [] |
:large_blue_circle: | com/starrocks/sql/optimizer/rule/implementation/HashAggImplementationRule.java | 2 | 2 | 100.00% | [] |
:large_blue_circle: | com/starrocks/planner/AggregationNode.java | 5 | 5 | 100.00% | [] |
:large_blue_circle: | com/starrocks/sql/plan/PlanFragmentBuilder.java | 8 | 8 | 100.00% | [] |
:large_blue_circle: | com/starrocks/sql/optimizer/operator/physical/PhysicalHashAggregateOperator.java | 6 | 6 | 100.00% | [] |
:large_blue_circle: | com/starrocks/sql/optimizer/operator/logical/LogicalAggregationOperator.java | 11 | 11 | 100.00% | [] |
:large_blue_circle: | com/starrocks/qe/Coordinator.java | 1 | 1 | 100.00% | [] |
:large_blue_circle: | com/starrocks/sql/optimizer/rule/RuleType.java | 1 | 1 | 100.00% | [] |
:large_blue_circle: | com/starrocks/sql/optimizer/PropertyDeriverBase.java | 3 | 3 | 100.00% | [] |
:large_blue_circle: | com/starrocks/sql/optimizer/base/HashDistributionSpec.java | 3 | 3 | 100.00% | [] |
:large_blue_circle: | com/starrocks/sql/optimizer/rewrite/AddDecodeNodeForDictStringRule.java | 2 | 2 | 100.00% | [] |
:large_blue_circle: | com/starrocks/planner/PlanFragment.java | 4 | 4 | 100.00% | [] |
:large_blue_circle: | com/starrocks/sql/optimizer/cost/CostModel.java | 5 | 5 | 100.00% | [] |
run starrocks_admit_test
run starrocks_admit_test
run starrocks_admit_test
run starrocks_admit_test
The enhancement point is handle the sence about high cardinality aggregate on one BE node, and will compare cost of single-stage aggregate plan and multi-stage aggregate plan, but we conside some question:
- when the optimize return a single-stage aggregate with exchange, means it must be a high cardinality aggregate, not low cardinality aggregate, so to compare the cost of local-shuffle mode is meaningless
- the enhancement only work when scan on one BE node, but the optimizer don't care the actually execute node now, so optimizer don't know the scan only work on one BE node, I think the optimizer need to consider this in the future and need a complete solution, but the way of this pr is too intrusive for optimizer framework.
So I have a suggestion, you can modify on FragmentBuilder:
- can be modify the Fragment generate way
- add mark on exchange node to do local-shuffle
- mark the BE node on ScanNode, and check only one BE node when meet aggregate node.
I think it's can be work on an aggregate sub-plan, not need check the whole plan. and consider the point 1, you don't care the optimizer will return a multi-stage aggregate plan when low cardinality aggregate.