starrocks icon indicating copy to clipboard operation
starrocks copied to clipboard

[Enhancement] support adaptive partition hash join

Open stdpain opened this issue 1 year ago • 4 comments

Why I'm doing:

When the hash table is larger, the performance of outputting the right table is worse. In this PR, we achieve performance improvement by splitting the hash table right table so that a single hash table size can fit into the cache.

But partition hash join requires the probe side to do an additional shuffle. So this can lead to negative optimization if the probe side has a large number of columns. So we made an adaptive model.

OutputBuildColumnTime
2s958ms[max=3s142ms, min=2s764ms]
PartitionProbeOverhead
11s758ms[max=12s628ms, min=10s862ms]

We define the shuffle cost of the probe side as A, the cost of the output hash table after partitioning as B1, and the cost of the output hash table before partitioning as B2. A partition hash join is used only if A + B1< B2.

Partition hash join test

Query SQL pattern (8C 16G):

Run 5 times to get the best result, using the appearance catalog

Case output the right table, the cardinality is different

with supplier as (
    SELECT
        generate_series as s_suppkey,
        concat("Supplier#", s_suppkey) as s_name,
        repeat('x', 10) as s_address,
        repeat('a', 10) as s_city,
        repeat('b', 14) as s_nation,
        repeat('c', 11) as s_region,
        repeat('p', 11) as s_phone
    FROM
        TABLE(generate_series(1, 200000))
)
select
    count(s_city),
    count(s_nation),
    count(s_region),
    count(s_name),
    count(s_address),
    count(s_phone)
from
    lineorder
    join supplier on lo_suppkey % 200000 = s_suppkey;
right table ndv baseline, broadcast partition=32,broadcast,stageing=4
20 4.45 4.24
200 4.09 4.59
2000 4.31 4.53
20000 5.34 4.73
200000 11.51 6.64

Case outputs the right table and the left table at the same time

with supplier as (
    SELECT
        generate_series as s_suppkey,
        concat("Supplier#", s_suppkey) as s_name,
        repeat('x', 10) as s_address,
        repeat('a', 10) as s_city,
        repeat('b', 14) as s_nation,
        repeat('c', 11) as s_region,
        repeat('p', 11) as s_phone
    FROM
        TABLE(generate_series(1, 200000))
)
select
    count(s_city),
    count(s_nation),
    count(s_region),
    count(s_name),
    count(s_address),
    count(s_phone),
    count(lo_linenumber),
    count(lo_custkey),
    count(lo_partkey),
    count(lo_suppkey),
    count(lo_shippriority),
    count(lo_quantity),
    count(lo_extendedprice),
    count(lo_ordtotalprice),
    count(lo_discount),
    count(lo_revenue),
    count(lo_supplycost),
    count(lo_tax),
    count(lo_commitdate),
    count(lo_orderdate),
    count(p1),
    count(p2),
    count(p3),
    count(p4),
    count(p5),
    count(p6),
    count(p7),
    count(p8),
    count(p9),
    count(p10),
    count(p11),
    count(p12),
    count(p13),
    count(p14),
    count(p15),
    count(p16),
    count(lo_orderkey)
from
    (
        select
            *,
            lo_orderkey + 1 p1,
            lo_orderkey + 2 p2,
            lo_orderkey + 3 p3,
            lo_orderkey + 4 p4,
            lo_orderkey + 5 p5,
            lo_orderkey + 6 p6,
            lo_orderkey + 7 p7,
            lo_orderkey + 8 p8,
            lo_orderkey + 9 p9,
            lo_orderkey + 10 p10,
            lo_orderkey + 11 p11,
            lo_orderkey + 12 p12,
            lo_orderkey + 13 p13,
            lo_orderkey + 14 p14,
            lo_orderkey + 15 p15,
            lo_orderkey + 16 p16
        from
            lineorder
    ) x
    join supplier on lo_suppkey % 200000 = s_suppkey;
probe int cnts baseline, broadcast partition=16,broadcast,staging=4
1 12.42 7.52
2 13.06 7.8
4 14.79 9.35
8 15.66 11.37
15 17.43 15.56
31 20.18 26.17

Case outputs the right table, which has many columns.

with supplier as (
    SELECT
        generate_series as s_suppkey,
        repeat('p', 16) as s_phone1,
        repeat('x', 16) as s_phone2,
        repeat('y', 16) as s_phone3,
        repeat('j', 16) as s_phone4,
        repeat('m', 16) as s_phone5,
        repeat('l', 16) as s_phone6,
        repeat('t', 16) as s_phone7,
        repeat('u', 16) as s_phone8,
        repeat('v', 16) as s_phone9,
        repeat('d', 16) as s_phone10,
        repeat('w', 16) as s_phone11,
        repeat('a', 16) as s_phone12,
        repeat('c', 16) as s_phone13,
        repeat('i', 16) as s_phone14,
        repeat('g', 16) as s_phone15,
        repeat('aa', 8) as s_phone16,
        repeat('bb', 8) as s_phone17,
        repeat('cc', 8) as s_phone18,
        repeat('dd', 8) as s_phone19,
        repeat('ee', 8) as s_phone20,
        repeat('ff', 8) as s_phone21,
        repeat('gg', 8) as s_phone22,
        repeat('hh', 8) as s_phone23,
        repeat('ii', 8) as s_phone24,
        repeat('jj', 8) as s_phone25,
        repeat('kk', 8) as s_phone26,
        repeat('ll', 8) as s_phone27,
        repeat('mm', 8) as s_phone28,
        repeat('nn', 8) as s_phone29,
        repeat('pp', 8) as s_phone30,
        repeat('qq', 8) as s_phone31,
        repeat('x', 16) as s_phone
    FROM
        TABLE(generate_series(1, 200000))
)
select
    count(s_phone1),
    count(s_phone2),
    count(s_phone3),
    count(s_phone4),
    count(s_phone5),
    count(s_phone6),
    count(s_phone7),
    count(s_phone8),
    count(s_phone9),
    count(s_phone10),
    count(s_phone11),
    count(s_phone12),
    count(s_phone13),
    count(s_phone14),
    count(s_phone15),
    count(s_phone16),
    count(s_phone17),
    count(s_phone18),
    count(s_phone19),
    count(s_phone20),
    count(s_phone21),
    count(s_phone22),
    count(s_phone23),
    count(s_phone24),
    count(s_phone25),
    count(s_phone26),
    count(s_phone27),
    count(s_phone28),
    count(s_phone29),
    count(s_phone30),
    count(s_phone31),
    count(s_phone)
from
    lineorder
    join supplier on lo_suppkey % 200000 = s_suppkey;
right table string cnts baseline, broadcast partition=16,broadcast,staging=4
1 3.33 3.27
2 4.54 3.77
4 6.22 4.5
8 15.67 7.43
16 36.02 15.62
32 1 min 15.93 29.48

The table on the right has a higher cardinality: 160W.

select
    count(p_name),
    count(p_mfgr),
    count(p_category),
    count(p_brand),
    count(p_color),
    count(p_container)
from
    lineorder
    join part on lo_partkey = p_partkey;
join ndv baseline, broadcast partition=16,broadcast,staging=4
160000 33.14 16.53

Join + AGG

select
    count(cn),
    count(cm),
    count(cc),
    count(cb),
    count(cp),
    count(ci)
from
    (
        select
            count(p_name) cn,
            count(p_mfgr) cm,
            count(p_category) cc,
            count(p_brand) cb,
            count(p_color) cp,
            count(p_container) ci
        from
            lineorder
            join part on lo_partkey = p_partkey
        group by
            p_name,
            p_mfgr,
            p_category,
            p_color,
            p_container
    ) tb;
baseline partition=16,staging =4
49.99 36.24

Project pollution cache

select
    bitmap_agg(lo_custkey),    
    bitmap_agg(lo_linenumber),
    bitmap_agg(lo_partkey),
    bitmap_agg(lo_orderkey),
    count(p_name) cn,
    count(p_mfgr) cm,
    count(p_category) cc,
    count(p_brand) cb,
    count(p_color) cp,
    count(p_container) ci
from
    lineorder
    join part on lo_partkey = p_partkey;
baseline partition=16,staging =4 partition=32, staging = 4
42.17 36.24

Right table ultra high base: 20M

8C 16C supports outputting up to two columns of the right table, otherwise it will OOM.

select count(r.lo_custkey),count(r.lo_partkey) from lineorder l join [shuffle] lineorder r on l. lo_orderkey = r.lo_orderkey and l.lo_linenumber=r.lo_linenumber;select count(r.lo_custkey),count(r.lo_partkey) from lineorder l join [shuffle] lineorder r on l. lo_orderkey = r.lo_orderkey and l.lo_linenumber=r.lo_linenumber;
output build columns baseline partition=16,staging =4
1 16.15 15.93
2 18.31 18.17

Lazy materialize

select sum(lo_revenue), count(lo_shipmode), count(p_name), count(p_mfgr),  count(p_category), count(p_brand), count(p_color), count(p_type), count(p_size), count(p_container) from lineorder join part     
on lo_partkey=p_partkey 
where (p_size+lo_linenumber)>50;
baseline baseline-lazy materialize partition=16, staging = 4 partition=16, staging = 4, lazy-materialize
42.22 8.06 25.47 10.14

What type of PR is this:

  • [ ] BugFix
  • [ ] Feature
  • [x] Enhancement
  • [ ] Refactor
  • [ ] UT
  • [ ] Doc
  • [ ] Tool

Does this PR entail a change in behavior?

  • [ ] Yes, this PR will result in a change in behavior.
  • [x] No, this PR will not result in a change in behavior.

If yes, please specify the type of change:

  • [ ] Interface/UI changes: syntax, type conversion, expression evaluation, display information
  • [ ] Parameter changes: default values, similar parameters but with different default values
  • [ ] Policy changes: use new policy to replace old one, functionality automatically enabled
  • [ ] Feature removed
  • [ ] Miscellaneous: upgrade & downgrade compatibility, etc.

Checklist:

  • [ ] I have added test cases for my bug fix or my new feature
  • [ ] This pr needs user documentation (for new or modified features or behaviors)
    • [ ] I have added documentation for my new feature or new function
  • [ ] This is a backport pr

Bugfix cherry-pick branch check:

  • [x] I have checked the version labels which the pr will be auto-backported to the target branch
    • [ ] 3.3
    • [ ] 3.2
    • [ ] 3.1
    • [ ] 3.0
    • [ ] 2.5

stdpain avatar Aug 27 '24 07:08 stdpain

[Java-Extensions Incremental Coverage Report]

:white_check_mark: pass : 0 / 0 (0%)

github-actions[bot] avatar Aug 30 '24 07:08 github-actions[bot]

[FE Incremental Coverage Report]

:white_check_mark: pass : 11 / 12 (91.67%)

file detail

path covered_line new_line coverage not_covered_line_detail
:large_blue_circle: com/starrocks/planner/HashJoinNode.java 8 9 88.89% [130]
:large_blue_circle: com/starrocks/qe/SessionVariable.java 2 2 100.00% []
:large_blue_circle: com/starrocks/common/Config.java 1 1 100.00% []

github-actions[bot] avatar Aug 30 '24 07:08 github-actions[bot]

[BE Incremental Coverage Report]

:x: fail : 0 / 394 (00.00%)

file detail

path covered_line new_line coverage not_covered_line_detail
:large_blue_circle: be/src/exec/hash_joiner.h 0 8 00.00% [93, 94, 280, 281, 284, 286, 287, 288]
:large_blue_circle: be/src/exec/pipeline/hashjoin/hash_join_probe_operator.cpp 0 1 00.00% [90]
:large_blue_circle: be/src/exec/hash_join_components.h 0 5 00.00% [55, 56, 57, 59, 69]
:large_blue_circle: be/src/util/cpu_info.h 0 8 00.00% [85, 86, 87, 89, 90, 91, 92, 94]
:large_blue_circle: be/src/exec/hash_joiner.cpp 0 8 00.00% [47, 60, 92, 93, 95, 255, 256, 373]
:large_blue_circle: be/src/exec/mor_processor.cpp 0 1 00.00% [51]
:large_blue_circle: be/src/exec/hash_join_node.cpp 0 5 00.00% [148, 149, 209, 479, 480]
:large_blue_circle: be/src/exec/hash_join_components.cpp 0 349 00.00% [42, 46, 49, 50, 103, 105, 106, 107, 109, 110, 111, 112, 113, 114, 116, 117, 118, 119, 121, 123, 124, 127, 129, 131, 141, 142, 149, 150, 151, 152, 162, 163, 164, 166, 167, 168, 173, 174, 175, 183, 184, 185, 186, 187, 189, 190, 191, 193, 194, 196, 198, 199, 201, 202, 203, 204, 206, 207, 208, 210, 211, 212, 213, 214, 215, 216, 218, 219, 222, 223, 227, 229, 230, 232, 233, 235, 236, 239, 240, 243, 244, 245, 247, 249, 250, 251, 252, 253, 256, 257, 260, 261, 263, 264, 267, 268, 269, 273, 274, 276, 277, 278, 279, 280, 281, 282, 284, 285, 287, 288, 289, 290, 292, 293, 294, 296, 297, 299, 300, 301, 302, 303, 305, 310, 311, 312, 313, 314, 315, 316, 317, 318, 320, 321, 323, 324, 325, 327, 328, 331, 332, 333, 334, 335, 336, 408, 466, 467, 468, 469, 470, 471, 472, 473, 474, 475, 477, 478, 480, 481, 482, 483, 484, 490, 492, 496, 497, 499, 500, 501, 502, 503, 508, 512, 513, 517, 518, 522, 523, 526, 527, 528, 529, 532, 534, 536, 540, 541, 543, 546, 549, 550, 551, 552, 553, 554, 555, 557, 558, 560, 562, 564, 566, 567, 569, 570, 571, 572, 573, 575, 577, 578, 579, 581, 582, 583, 584, 585, 586, 587, 588, 589, 591, 592, 593, 594, 596, 597, 598, 601, 602, 603, 604, 605, 606, 607, 608, 609, 611, 612, 613, 615, 616, 619, 620, 623, 624, 625, 628, 630, 631, 633, 634, 635, 638, 639, 641, 642, 643, 645, 646, 647, 648, 649, 650, 651, 653, 654, 657, 658, 662, 664, 665, 667, 668, 670, 671, 674, 675, 678, 679, 680, 683, 684, 685, 686, 687, 690, 691, 692, 693, 694, 695, 697, 698, 699, 702, 703, 704, 705, 706, 710, 711, 713, 716, 719, 720, 723, 724, 726, 727, 730, 731, 733, 734, 737, 738, 739, 741, 743, 744, 746, 747, 749, 750, 751, 752, 753, 755, 756, 757, 760, 761, 764, 765, 766, 767, 768, 769, 770, 771, 772, 773, 774, 776, 778, 780, 781, 783]
:large_blue_circle: be/src/exec/join_hash_map.cpp 0 9 00.00% [659, 660, 662, 663, 665, 666, 668, 670, 672]

github-actions[bot] avatar Aug 30 '24 07:08 github-actions[bot]