[Enhancement] support adaptive partition hash join
Why I'm doing:
When the hash table is larger, the performance of outputting the right table is worse. In this PR, we achieve performance improvement by splitting the hash table right table so that a single hash table size can fit into the cache.
But partition hash join requires the probe side to do an additional shuffle. So this can lead to negative optimization if the probe side has a large number of columns. So we made an adaptive model.
OutputBuildColumnTime
2s958ms[max=3s142ms, min=2s764ms]
PartitionProbeOverhead
11s758ms[max=12s628ms, min=10s862ms]
We define the shuffle cost of the probe side as A, the cost of the output hash table after partitioning as B1, and the cost of the output hash table before partitioning as B2. A partition hash join is used only if A + B1< B2.
Partition hash join test
Query SQL pattern (8C 16G):
Run 5 times to get the best result, using the appearance catalog
Case output the right table, the cardinality is different
with supplier as (
SELECT
generate_series as s_suppkey,
concat("Supplier#", s_suppkey) as s_name,
repeat('x', 10) as s_address,
repeat('a', 10) as s_city,
repeat('b', 14) as s_nation,
repeat('c', 11) as s_region,
repeat('p', 11) as s_phone
FROM
TABLE(generate_series(1, 200000))
)
select
count(s_city),
count(s_nation),
count(s_region),
count(s_name),
count(s_address),
count(s_phone)
from
lineorder
join supplier on lo_suppkey % 200000 = s_suppkey;
| right table ndv | baseline, broadcast | partition=32,broadcast,stageing=4 |
|---|---|---|
| 20 | 4.45 | 4.24 |
| 200 | 4.09 | 4.59 |
| 2000 | 4.31 | 4.53 |
| 20000 | 5.34 | 4.73 |
| 200000 | 11.51 | 6.64 |
Case outputs the right table and the left table at the same time
with supplier as (
SELECT
generate_series as s_suppkey,
concat("Supplier#", s_suppkey) as s_name,
repeat('x', 10) as s_address,
repeat('a', 10) as s_city,
repeat('b', 14) as s_nation,
repeat('c', 11) as s_region,
repeat('p', 11) as s_phone
FROM
TABLE(generate_series(1, 200000))
)
select
count(s_city),
count(s_nation),
count(s_region),
count(s_name),
count(s_address),
count(s_phone),
count(lo_linenumber),
count(lo_custkey),
count(lo_partkey),
count(lo_suppkey),
count(lo_shippriority),
count(lo_quantity),
count(lo_extendedprice),
count(lo_ordtotalprice),
count(lo_discount),
count(lo_revenue),
count(lo_supplycost),
count(lo_tax),
count(lo_commitdate),
count(lo_orderdate),
count(p1),
count(p2),
count(p3),
count(p4),
count(p5),
count(p6),
count(p7),
count(p8),
count(p9),
count(p10),
count(p11),
count(p12),
count(p13),
count(p14),
count(p15),
count(p16),
count(lo_orderkey)
from
(
select
*,
lo_orderkey + 1 p1,
lo_orderkey + 2 p2,
lo_orderkey + 3 p3,
lo_orderkey + 4 p4,
lo_orderkey + 5 p5,
lo_orderkey + 6 p6,
lo_orderkey + 7 p7,
lo_orderkey + 8 p8,
lo_orderkey + 9 p9,
lo_orderkey + 10 p10,
lo_orderkey + 11 p11,
lo_orderkey + 12 p12,
lo_orderkey + 13 p13,
lo_orderkey + 14 p14,
lo_orderkey + 15 p15,
lo_orderkey + 16 p16
from
lineorder
) x
join supplier on lo_suppkey % 200000 = s_suppkey;
| probe int cnts | baseline, broadcast | partition=16,broadcast,staging=4 |
|---|---|---|
| 1 | 12.42 | 7.52 |
| 2 | 13.06 | 7.8 |
| 4 | 14.79 | 9.35 |
| 8 | 15.66 | 11.37 |
| 15 | 17.43 | 15.56 |
| 31 | 20.18 | 26.17 |
Case outputs the right table, which has many columns.
with supplier as (
SELECT
generate_series as s_suppkey,
repeat('p', 16) as s_phone1,
repeat('x', 16) as s_phone2,
repeat('y', 16) as s_phone3,
repeat('j', 16) as s_phone4,
repeat('m', 16) as s_phone5,
repeat('l', 16) as s_phone6,
repeat('t', 16) as s_phone7,
repeat('u', 16) as s_phone8,
repeat('v', 16) as s_phone9,
repeat('d', 16) as s_phone10,
repeat('w', 16) as s_phone11,
repeat('a', 16) as s_phone12,
repeat('c', 16) as s_phone13,
repeat('i', 16) as s_phone14,
repeat('g', 16) as s_phone15,
repeat('aa', 8) as s_phone16,
repeat('bb', 8) as s_phone17,
repeat('cc', 8) as s_phone18,
repeat('dd', 8) as s_phone19,
repeat('ee', 8) as s_phone20,
repeat('ff', 8) as s_phone21,
repeat('gg', 8) as s_phone22,
repeat('hh', 8) as s_phone23,
repeat('ii', 8) as s_phone24,
repeat('jj', 8) as s_phone25,
repeat('kk', 8) as s_phone26,
repeat('ll', 8) as s_phone27,
repeat('mm', 8) as s_phone28,
repeat('nn', 8) as s_phone29,
repeat('pp', 8) as s_phone30,
repeat('qq', 8) as s_phone31,
repeat('x', 16) as s_phone
FROM
TABLE(generate_series(1, 200000))
)
select
count(s_phone1),
count(s_phone2),
count(s_phone3),
count(s_phone4),
count(s_phone5),
count(s_phone6),
count(s_phone7),
count(s_phone8),
count(s_phone9),
count(s_phone10),
count(s_phone11),
count(s_phone12),
count(s_phone13),
count(s_phone14),
count(s_phone15),
count(s_phone16),
count(s_phone17),
count(s_phone18),
count(s_phone19),
count(s_phone20),
count(s_phone21),
count(s_phone22),
count(s_phone23),
count(s_phone24),
count(s_phone25),
count(s_phone26),
count(s_phone27),
count(s_phone28),
count(s_phone29),
count(s_phone30),
count(s_phone31),
count(s_phone)
from
lineorder
join supplier on lo_suppkey % 200000 = s_suppkey;
| right table string cnts | baseline, broadcast | partition=16,broadcast,staging=4 |
|---|---|---|
| 1 | 3.33 | 3.27 |
| 2 | 4.54 | 3.77 |
| 4 | 6.22 | 4.5 |
| 8 | 15.67 | 7.43 |
| 16 | 36.02 | 15.62 |
| 32 | 1 min 15.93 | 29.48 |
The table on the right has a higher cardinality: 160W.
select
count(p_name),
count(p_mfgr),
count(p_category),
count(p_brand),
count(p_color),
count(p_container)
from
lineorder
join part on lo_partkey = p_partkey;
| join ndv | baseline, broadcast | partition=16,broadcast,staging=4 |
|---|---|---|
| 160000 | 33.14 | 16.53 |
Join + AGG
select
count(cn),
count(cm),
count(cc),
count(cb),
count(cp),
count(ci)
from
(
select
count(p_name) cn,
count(p_mfgr) cm,
count(p_category) cc,
count(p_brand) cb,
count(p_color) cp,
count(p_container) ci
from
lineorder
join part on lo_partkey = p_partkey
group by
p_name,
p_mfgr,
p_category,
p_color,
p_container
) tb;
| baseline | partition=16,staging =4 | |
|---|---|---|
| 49.99 | 36.24 |
Project pollution cache
select
bitmap_agg(lo_custkey),
bitmap_agg(lo_linenumber),
bitmap_agg(lo_partkey),
bitmap_agg(lo_orderkey),
count(p_name) cn,
count(p_mfgr) cm,
count(p_category) cc,
count(p_brand) cb,
count(p_color) cp,
count(p_container) ci
from
lineorder
join part on lo_partkey = p_partkey;
| baseline | partition=16,staging =4 | partition=32, staging = 4 | ||
|---|---|---|---|---|
| 42.17 | 36.24 |
Right table ultra high base: 20M
8C 16C supports outputting up to two columns of the right table, otherwise it will OOM.
select count(r.lo_custkey),count(r.lo_partkey) from lineorder l join [shuffle] lineorder r on l. lo_orderkey = r.lo_orderkey and l.lo_linenumber=r.lo_linenumber;select count(r.lo_custkey),count(r.lo_partkey) from lineorder l join [shuffle] lineorder r on l. lo_orderkey = r.lo_orderkey and l.lo_linenumber=r.lo_linenumber;
| output build columns | baseline | partition=16,staging =4 |
|---|---|---|
| 1 | 16.15 | 15.93 |
| 2 | 18.31 | 18.17 |
Lazy materialize
select sum(lo_revenue), count(lo_shipmode), count(p_name), count(p_mfgr), count(p_category), count(p_brand), count(p_color), count(p_type), count(p_size), count(p_container) from lineorder join part
on lo_partkey=p_partkey
where (p_size+lo_linenumber)>50;
| baseline | baseline-lazy materialize | partition=16, staging = 4 | partition=16, staging = 4, lazy-materialize | |
|---|---|---|---|---|
| 42.22 | 8.06 | 25.47 | 10.14 |
What type of PR is this:
- [ ] BugFix
- [ ] Feature
- [x] Enhancement
- [ ] Refactor
- [ ] UT
- [ ] Doc
- [ ] Tool
Does this PR entail a change in behavior?
- [ ] Yes, this PR will result in a change in behavior.
- [x] No, this PR will not result in a change in behavior.
If yes, please specify the type of change:
- [ ] Interface/UI changes: syntax, type conversion, expression evaluation, display information
- [ ] Parameter changes: default values, similar parameters but with different default values
- [ ] Policy changes: use new policy to replace old one, functionality automatically enabled
- [ ] Feature removed
- [ ] Miscellaneous: upgrade & downgrade compatibility, etc.
Checklist:
- [ ] I have added test cases for my bug fix or my new feature
- [ ] This pr needs user documentation (for new or modified features or behaviors)
- [ ] I have added documentation for my new feature or new function
- [ ] This is a backport pr
Bugfix cherry-pick branch check:
- [x] I have checked the version labels which the pr will be auto-backported to the target branch
- [ ] 3.3
- [ ] 3.2
- [ ] 3.1
- [ ] 3.0
- [ ] 2.5
Quality Gate passed
Issues
3 New issues
0 Accepted issues
Measures
0 Security Hotspots
0.0% Coverage on New Code
0.0% Duplication on New Code
[Java-Extensions Incremental Coverage Report]
:white_check_mark: pass : 0 / 0 (0%)
[FE Incremental Coverage Report]
:white_check_mark: pass : 11 / 12 (91.67%)
file detail
| path | covered_line | new_line | coverage | not_covered_line_detail | |
|---|---|---|---|---|---|
| :large_blue_circle: | com/starrocks/planner/HashJoinNode.java | 8 | 9 | 88.89% | [130] |
| :large_blue_circle: | com/starrocks/qe/SessionVariable.java | 2 | 2 | 100.00% | [] |
| :large_blue_circle: | com/starrocks/common/Config.java | 1 | 1 | 100.00% | [] |
[BE Incremental Coverage Report]
:x: fail : 0 / 394 (00.00%)
file detail
| path | covered_line | new_line | coverage | not_covered_line_detail | |
|---|---|---|---|---|---|
| :large_blue_circle: | be/src/exec/hash_joiner.h | 0 | 8 | 00.00% | [93, 94, 280, 281, 284, 286, 287, 288] |
| :large_blue_circle: | be/src/exec/pipeline/hashjoin/hash_join_probe_operator.cpp | 0 | 1 | 00.00% | [90] |
| :large_blue_circle: | be/src/exec/hash_join_components.h | 0 | 5 | 00.00% | [55, 56, 57, 59, 69] |
| :large_blue_circle: | be/src/util/cpu_info.h | 0 | 8 | 00.00% | [85, 86, 87, 89, 90, 91, 92, 94] |
| :large_blue_circle: | be/src/exec/hash_joiner.cpp | 0 | 8 | 00.00% | [47, 60, 92, 93, 95, 255, 256, 373] |
| :large_blue_circle: | be/src/exec/mor_processor.cpp | 0 | 1 | 00.00% | [51] |
| :large_blue_circle: | be/src/exec/hash_join_node.cpp | 0 | 5 | 00.00% | [148, 149, 209, 479, 480] |
| :large_blue_circle: | be/src/exec/hash_join_components.cpp | 0 | 349 | 00.00% | [42, 46, 49, 50, 103, 105, 106, 107, 109, 110, 111, 112, 113, 114, 116, 117, 118, 119, 121, 123, 124, 127, 129, 131, 141, 142, 149, 150, 151, 152, 162, 163, 164, 166, 167, 168, 173, 174, 175, 183, 184, 185, 186, 187, 189, 190, 191, 193, 194, 196, 198, 199, 201, 202, 203, 204, 206, 207, 208, 210, 211, 212, 213, 214, 215, 216, 218, 219, 222, 223, 227, 229, 230, 232, 233, 235, 236, 239, 240, 243, 244, 245, 247, 249, 250, 251, 252, 253, 256, 257, 260, 261, 263, 264, 267, 268, 269, 273, 274, 276, 277, 278, 279, 280, 281, 282, 284, 285, 287, 288, 289, 290, 292, 293, 294, 296, 297, 299, 300, 301, 302, 303, 305, 310, 311, 312, 313, 314, 315, 316, 317, 318, 320, 321, 323, 324, 325, 327, 328, 331, 332, 333, 334, 335, 336, 408, 466, 467, 468, 469, 470, 471, 472, 473, 474, 475, 477, 478, 480, 481, 482, 483, 484, 490, 492, 496, 497, 499, 500, 501, 502, 503, 508, 512, 513, 517, 518, 522, 523, 526, 527, 528, 529, 532, 534, 536, 540, 541, 543, 546, 549, 550, 551, 552, 553, 554, 555, 557, 558, 560, 562, 564, 566, 567, 569, 570, 571, 572, 573, 575, 577, 578, 579, 581, 582, 583, 584, 585, 586, 587, 588, 589, 591, 592, 593, 594, 596, 597, 598, 601, 602, 603, 604, 605, 606, 607, 608, 609, 611, 612, 613, 615, 616, 619, 620, 623, 624, 625, 628, 630, 631, 633, 634, 635, 638, 639, 641, 642, 643, 645, 646, 647, 648, 649, 650, 651, 653, 654, 657, 658, 662, 664, 665, 667, 668, 670, 671, 674, 675, 678, 679, 680, 683, 684, 685, 686, 687, 690, 691, 692, 693, 694, 695, 697, 698, 699, 702, 703, 704, 705, 706, 710, 711, 713, 716, 719, 720, 723, 724, 726, 727, 730, 731, 733, 734, 737, 738, 739, 741, 743, 744, 746, 747, 749, 750, 751, 752, 753, 755, 756, 757, 760, 761, 764, 765, 766, 767, 768, 769, 770, 771, 772, 773, 774, 776, 778, 780, 781, 783] |
| :large_blue_circle: | be/src/exec/join_hash_map.cpp | 0 | 9 | 00.00% | [659, 660, 662, 663, 665, 666, 668, 670, 672] |