starrocks icon indicating copy to clipboard operation
starrocks copied to clipboard

[Enhancement] Allow one-phase local aggregate in the single BE

Open ZiheLiu opened this issue 2 years ago • 9 comments

What type of PR is this:

  • [ ] bug
  • [ ] feature
  • [x] enhancement
  • [ ] refactor
  • [ ] others

Problem Summary:

If there is only one BE and one fragment instance, we could generate one-phase local aggregation with the local shuffle operator (Scan->OnePhaseAgg->LocalShuffle) regardless of the differences between grouping keys and scan distribution keys.

Modification

  • Add withLocalShuffle to LogicalAggregationOperator and PhysicalAggregationOperator.
  • Add GenerateAggregateWithLocalShuffleRule to set LogicalAggregationOperator#withLocalShuffle as true.
  • HashDistributionSpec#isSatisfy() returns true when required property need local shuffle, and this is LOCAL scan.
  • PlanFragmentBuilder#visitPhysicalHashAggregate() set isNeedLocalShuffle and AssignScanRangesPerDriverSeq according to PhysicalAggregationOperator#withLocalShuffle.

ZiheLiu avatar Jul 19 '22 09:07 ZiheLiu

run starrocks_fe_unittest

ZiheLiu avatar Jul 21 '22 04:07 ZiheLiu

run starrocks_fe_unittest

ZiheLiu avatar Jul 21 '22 05:07 ZiheLiu

[FE PR Coverage Check]

:heart_eyes: pass : 86 / 87 (98.85%)

file detail

path covered_line new_line coverage not_covered_line_detail
:large_blue_circle: com/starrocks/sql/optimizer/base/HashDistributionDesc.java 8 9 88.89% [131]
:large_blue_circle: com/starrocks/sql/optimizer/RequiredPropertyDeriver.java 3 3 100.00% []
:large_blue_circle: com/starrocks/sql/optimizer/Optimizer.java 2 2 100.00% []
:large_blue_circle: com/starrocks/sql/optimizer/rule/transformation/GenerateAggregateWithLocalShuffleRule.java 15 15 100.00% []
:large_blue_circle: com/starrocks/qe/SessionVariable.java 5 5 100.00% []
:large_blue_circle: com/starrocks/sql/optimizer/rule/RuleSet.java 2 2 100.00% []
:large_blue_circle: com/starrocks/sql/optimizer/rule/implementation/HashAggImplementationRule.java 2 2 100.00% []
:large_blue_circle: com/starrocks/planner/AggregationNode.java 5 5 100.00% []
:large_blue_circle: com/starrocks/sql/plan/PlanFragmentBuilder.java 8 8 100.00% []
:large_blue_circle: com/starrocks/sql/optimizer/operator/physical/PhysicalHashAggregateOperator.java 6 6 100.00% []
:large_blue_circle: com/starrocks/sql/optimizer/operator/logical/LogicalAggregationOperator.java 11 11 100.00% []
:large_blue_circle: com/starrocks/qe/Coordinator.java 1 1 100.00% []
:large_blue_circle: com/starrocks/sql/optimizer/rule/RuleType.java 1 1 100.00% []
:large_blue_circle: com/starrocks/sql/optimizer/PropertyDeriverBase.java 3 3 100.00% []
:large_blue_circle: com/starrocks/sql/optimizer/base/HashDistributionSpec.java 3 3 100.00% []
:large_blue_circle: com/starrocks/sql/optimizer/rewrite/AddDecodeNodeForDictStringRule.java 2 2 100.00% []
:large_blue_circle: com/starrocks/planner/PlanFragment.java 4 4 100.00% []
:large_blue_circle: com/starrocks/sql/optimizer/cost/CostModel.java 5 5 100.00% []

wanpengfei-git avatar Jul 21 '22 05:07 wanpengfei-git

run starrocks_admit_test

ZiheLiu avatar Jul 21 '22 10:07 ZiheLiu

run starrocks_admit_test

ZiheLiu avatar Jul 21 '22 10:07 ZiheLiu

run starrocks_admit_test

ZiheLiu avatar Jul 21 '22 11:07 ZiheLiu

run starrocks_admit_test

ZiheLiu avatar Jul 21 '22 11:07 ZiheLiu

The enhancement point is handle the sence about high cardinality aggregate on one BE node, and will compare cost of single-stage aggregate plan and multi-stage aggregate plan, but we conside some question:

  • when the optimize return a single-stage aggregate with exchange, means it must be a high cardinality aggregate, not low cardinality aggregate, so to compare the cost of local-shuffle mode is meaningless
  • the enhancement only work when scan on one BE node, but the optimizer don't care the actually execute node now, so optimizer don't know the scan only work on one BE node, I think the optimizer need to consider this in the future and need a complete solution, but the way of this pr is too intrusive for optimizer framework.

So I have a suggestion, you can modify on FragmentBuilder:

  • can be modify the Fragment generate way
  • add mark on exchange node to do local-shuffle
  • mark the BE node on ScanNode, and check only one BE node when meet aggregate node.

I think it's can be work on an aggregate sub-plan, not need check the whole plan. and consider the point 1, you don't care the optimizer will return a multi-stage aggregate plan when low cardinality aggregate.

Seaven avatar Jul 25 '22 02:07 Seaven