trino icon indicating copy to clipboard operation
trino copied to clipboard

Row wise group by on fixed width types

Open lukasz-stec opened this issue 3 years ago • 8 comments

This adds another GroupByHash implementation that works only on multiple fixed-width types and can store hash table state in a row-wise manner.

Hash table implementation

I tested two hash table implementations. One that stores almost everything in one big array (SingleTableHashTableData) and the second one that stores only group ids in a hash table but group values in the separate array (SeparateTableHashTableData). The single table is better for CPU because it only has one random read per row during hash table population (putIfAbsent). A separate table is better for memory utilization (it does not waste memory for empty hash table slots) but does at least two random memory reads (one to get groupId from hash table and the second one to get the values for a given group to compare to current row). For this reason, SingleTableHashTableData is used. SeparateTableHashTableData could be potentially used to store variable width data (VARCHARs).

Memory layout per hash table entry looks like this:

    0  - 4 : group id
    4  - 12 : hash
    12  - 12 + channelCount : isNull
    12 + channelCount - 12 + channelCount + valuesLength : values

Code generation

To avoid virtual method calls ( + to have more independent code), the current implementation uses one-time source code generation + multiple classes classloader isolation instead of runtime byte code generation. This is mainly to improve readability and maintainability. It's far easier to manually improve the generated code and then potentially change the generator. Also, it's easier to understand the code and analyze its performance in the tools like a profiler. The main issue is that class isolation is complicated now. I think it's possible to improve it.

Tpch/tpcds benchmarks

there is a slight (~2%) but stable improvement in terms of CPU (i.e. for queries that improve, the improvement is much bigger than variability)

Partitioned orc sf1000

label TPCH wall time TPC-DS wall time TPCH CPU time TPC-DS CPU time TPCH Network GB TPC-DS Network GB TPCH wall time % change TPC-DS wall time % change TPCH CPU time % change TPC-DS CPU time % change TPCH Network GB % change TPC-DS Network GB % change
baseline 847 1162 113307 134906 2660 2004 0 0 0 0 0 0
row-wise 828 1141 111698 131389 2660 2006 -2.28 -1.78 -1.42 -2.61 0 0.07

fixed-width-varhandle-single-table_with_mem_stats.pdf

Unpartitioned orc sf1000

label TPCH wall time TPC-DS wall time TPCH CPU time TPC-DS CPU time TPCH Network GB TPC-DS Network GB TPCH peak mem TPC-DS peak mem TPCH wall time % change TPC-DS wall time % change TPCH CPU time % change TPC-DS CPU time % change TPCH Network GB % change TPC-DS Network GB % change TPCH peak mem % change TPC-DS peak mem % change
baseline orc unpart sf1000 765 1817 100501 242056 2365 3899 2076369935 1122027991 0 0 0 0 0 0 0 0
row-wise orc unpart sf1000 759 1797 99439 237994 2365 3898 2072221145 1112983221 -0.8 -1.14 -1.06 -1.68 0 -0.02 -0.2 -0.81

fixed-width-varhandle-single-table_unpart.pdf

Jmh benchmark results.

Generally, a win across the board but significant improvement is for cases with multiple columns and many groups (i.e. hash table does not fit in L3). Cases with 1 column here are for illustration only as 1 BigInt column is handled differently anyway (BigIntGroupByHash).

Benchmark channelCount dataType groupCount hashEnabled positionCount rehash baseline row-wise row-wise %
BenchmarkGroupByHash.groupByHashPreCompute 1 BIGINT 8 TRUE 400000 TRUE 12.154 11.809 -2.838571664
BenchmarkGroupByHash.groupByHashPreCompute 1 BIGINT 8 TRUE 1000000 TRUE 17.087 10.374 -39.28717739
BenchmarkGroupByHash.groupByHashPreCompute 1 BIGINT 8 TRUE 10000000 TRUE 16.066 11.468 -28.61944479
BenchmarkGroupByHash.groupByHashPreCompute 1 BIGINT 100000 TRUE 400000 TRUE 54.214 40.622 -25.07101487
BenchmarkGroupByHash.groupByHashPreCompute 1 BIGINT 100000 TRUE 1000000 TRUE 42.644 31.33 -26.53128224
BenchmarkGroupByHash.groupByHashPreCompute 1 BIGINT 100000 TRUE 10000000 TRUE 33.43 23.24 -30.48160335
BenchmarkGroupByHash.groupByHashPreCompute 1 BIGINT 400000 TRUE 400000 TRUE 95.884 92.774 -3.243502566
BenchmarkGroupByHash.groupByHashPreCompute 1 BIGINT 400000 TRUE 1000000 TRUE 66.395 59.265 -10.73876045
BenchmarkGroupByHash.groupByHashPreCompute 1 BIGINT 400000 TRUE 10000000 TRUE 80.537 53.539 -33.52248035
BenchmarkGroupByHash.groupByHashPreCompute 1 BIGINT 1000000 TRUE 1000000 TRUE 122.168 109.249 -10.57478227
BenchmarkGroupByHash.groupByHashPreCompute 1 BIGINT 1000000 TRUE 10000000 TRUE 133.13 82.606 -37.95087508
BenchmarkGroupByHash.groupByHashPreCompute 1 BIGINT 3000000 TRUE 10000000 TRUE 166.151 119.592 -28.02210038
BenchmarkGroupByHash.groupByHashPreCompute 2 BIGINT 8 TRUE 400000 TRUE 19.811 16.535 -16.53626773
BenchmarkGroupByHash.groupByHashPreCompute 2 BIGINT 8 TRUE 1000000 TRUE 23.269 16.499 -29.09450342
BenchmarkGroupByHash.groupByHashPreCompute 2 BIGINT 8 TRUE 10000000 TRUE 23.243 17.051 -26.64027879
BenchmarkGroupByHash.groupByHashPreCompute 2 BIGINT 100000 TRUE 400000 TRUE 72.642 45.51 -37.35029322
BenchmarkGroupByHash.groupByHashPreCompute 2 BIGINT 100000 TRUE 1000000 TRUE 63.362 40.039 -36.8091285
BenchmarkGroupByHash.groupByHashPreCompute 2 BIGINT 100000 TRUE 10000000 TRUE 43.94 29.961 -31.81383705
BenchmarkGroupByHash.groupByHashPreCompute 2 BIGINT 400000 TRUE 400000 TRUE 127.03 111.982 -11.84602063
BenchmarkGroupByHash.groupByHashPreCompute 2 BIGINT 400000 TRUE 1000000 TRUE 97.573 86.728 -11.11475511
BenchmarkGroupByHash.groupByHashPreCompute 2 BIGINT 400000 TRUE 10000000 TRUE 125.269 87.279 -30.32673686
BenchmarkGroupByHash.groupByHashPreCompute 2 BIGINT 1000000 TRUE 1000000 TRUE 192.258 150.697 -21.61730591
BenchmarkGroupByHash.groupByHashPreCompute 2 BIGINT 1000000 TRUE 10000000 TRUE 282.49 122.211 -56.73793763
BenchmarkGroupByHash.groupByHashPreCompute 2 BIGINT 3000000 TRUE 10000000 TRUE 232.14 170.581 -26.51804945
BenchmarkGroupByHash.groupByHashPreCompute 4 BIGINT 8 TRUE 400000 TRUE 35.061 22.304 -36.3851573
BenchmarkGroupByHash.groupByHashPreCompute 4 BIGINT 8 TRUE 1000000 TRUE 36.199 23.023 -36.39879555
BenchmarkGroupByHash.groupByHashPreCompute 4 BIGINT 8 TRUE 10000000 TRUE 35.287 23.046 -34.68982912
BenchmarkGroupByHash.groupByHashPreCompute 4 BIGINT 100000 TRUE 400000 TRUE 104.561 70.343 -32.72539475
BenchmarkGroupByHash.groupByHashPreCompute 4 BIGINT 100000 TRUE 1000000 TRUE 98.415 68.298 -30.60204237
BenchmarkGroupByHash.groupByHashPreCompute 4 BIGINT 100000 TRUE 10000000 TRUE 92.981 63.708 -31.48277605
BenchmarkGroupByHash.groupByHashPreCompute 4 BIGINT 400000 TRUE 400000 TRUE 187.343 175.644 -6.244695558
BenchmarkGroupByHash.groupByHashPreCompute 4 BIGINT 400000 TRUE 1000000 TRUE 193.253 149.939 -22.41310614
BenchmarkGroupByHash.groupByHashPreCompute 4 BIGINT 400000 TRUE 10000000 TRUE 235.105 130.851 -44.34359116
BenchmarkGroupByHash.groupByHashPreCompute 4 BIGINT 1000000 TRUE 1000000 TRUE 237.468 228.146 -3.925581552
BenchmarkGroupByHash.groupByHashPreCompute 4 BIGINT 1000000 TRUE 10000000 TRUE 301.636 169.296 -43.87407339
BenchmarkGroupByHash.groupByHashPreCompute 4 BIGINT 3000000 TRUE 10000000 TRUE 375.312 215.302 -42.63386196
BenchmarkGroupByHash.groupByHashPreCompute 4 TINYINT_SMALLINT_INTEGER_BIGINT 8 TRUE 400000 TRUE 33.944 22.393 -34.02957813
BenchmarkGroupByHash.groupByHashPreCompute 4 TINYINT_SMALLINT_INTEGER_BIGINT 8 TRUE 1000000 TRUE 33.58 22.721 -32.33770101
BenchmarkGroupByHash.groupByHashPreCompute 4 TINYINT_SMALLINT_INTEGER_BIGINT 8 TRUE 10000000 TRUE 33.499 22.828 -31.85468223
BenchmarkGroupByHash.groupByHashPreCompute 4 TINYINT_SMALLINT_INTEGER_BIGINT 100000 TRUE 400000 TRUE 112.459 71.396 -36.51375168
BenchmarkGroupByHash.groupByHashPreCompute 4 TINYINT_SMALLINT_INTEGER_BIGINT 100000 TRUE 1000000 TRUE 102.979 57.611 -44.05558415
BenchmarkGroupByHash.groupByHashPreCompute 4 TINYINT_SMALLINT_INTEGER_BIGINT 100000 TRUE 10000000 TRUE 77.158 48.338 -37.35192721
BenchmarkGroupByHash.groupByHashPreCompute 4 TINYINT_SMALLINT_INTEGER_BIGINT 400000 TRUE 400000 TRUE 173.659 142.193 -18.11941794
BenchmarkGroupByHash.groupByHashPreCompute 4 TINYINT_SMALLINT_INTEGER_BIGINT 400000 TRUE 1000000 TRUE 150.297 114.326 -23.93327877
BenchmarkGroupByHash.groupByHashPreCompute 4 TINYINT_SMALLINT_INTEGER_BIGINT 400000 TRUE 10000000 TRUE 164.059 105.476 -35.70849511
BenchmarkGroupByHash.groupByHashPreCompute 4 TINYINT_SMALLINT_INTEGER_BIGINT 1000000 TRUE 1000000 TRUE 216.189 185.921 -14.00071234
BenchmarkGroupByHash.groupByHashPreCompute 4 TINYINT_SMALLINT_INTEGER_BIGINT 1000000 TRUE 10000000 TRUE 262.304 150.348 -42.68177382
BenchmarkGroupByHash.groupByHashPreCompute 4 TINYINT_SMALLINT_INTEGER_BIGINT 3000000 TRUE 10000000 TRUE 354.643 198.462 -44.03893493
BenchmarkGroupByHash.groupByHashPreCompute 8 BIGINT 8 TRUE 400000 TRUE 59.339 36.757 -38.05591601
BenchmarkGroupByHash.groupByHashPreCompute 8 BIGINT 8 TRUE 1000000 TRUE 59.936 37.438 -37.53670582
BenchmarkGroupByHash.groupByHashPreCompute 8 BIGINT 8 TRUE 10000000 TRUE 59.285 41.667 -29.71746648
BenchmarkGroupByHash.groupByHashPreCompute 8 BIGINT 100000 TRUE 400000 TRUE 201.448 135.095 -32.93802867
BenchmarkGroupByHash.groupByHashPreCompute 8 BIGINT 100000 TRUE 1000000 TRUE 198.653 120.085 -39.55037175
BenchmarkGroupByHash.groupByHashPreCompute 8 BIGINT 100000 TRUE 10000000 TRUE 190.603 108.591 -43.02765434
BenchmarkGroupByHash.groupByHashPreCompute 8 BIGINT 400000 TRUE 400000 TRUE 339.351 297.942 -12.20240989
BenchmarkGroupByHash.groupByHashPreCompute 8 BIGINT 400000 TRUE 1000000 TRUE 405.758 228.623 -43.6553315
BenchmarkGroupByHash.groupByHashPreCompute 8 BIGINT 400000 TRUE 10000000 TRUE 490.873 173.874 -64.57861809
BenchmarkGroupByHash.groupByHashPreCompute 8 BIGINT 1000000 TRUE 1000000 TRUE 413.285 323.824 -21.64632155
BenchmarkGroupByHash.groupByHashPreCompute 8 BIGINT 1000000 TRUE 10000000 TRUE 586.831 211.627 -63.93731756
BenchmarkGroupByHash.groupByHashPreCompute 8 BIGINT 3000000 TRUE 10000000 TRUE 637.8 271.012 -57.50830981
BenchmarkGroupByHash.groupByHashPreCompute 8 TINYINT_SMALLINT_INTEGER_BIGINT 8 TRUE 400000 TRUE 65.396 39.463 -39.6553306
BenchmarkGroupByHash.groupByHashPreCompute 8 TINYINT_SMALLINT_INTEGER_BIGINT 8 TRUE 1000000 TRUE 65.142 40.113 -38.42221608
BenchmarkGroupByHash.groupByHashPreCompute 8 TINYINT_SMALLINT_INTEGER_BIGINT 8 TRUE 10000000 TRUE 65.218 39.722 -39.09350179
BenchmarkGroupByHash.groupByHashPreCompute 8 TINYINT_SMALLINT_INTEGER_BIGINT 100000 TRUE 400000 TRUE 185.124 98.542 -46.76973272
BenchmarkGroupByHash.groupByHashPreCompute 8 TINYINT_SMALLINT_INTEGER_BIGINT 100000 TRUE 1000000 TRUE 175.062 91.293 -47.85104706
BenchmarkGroupByHash.groupByHashPreCompute 8 TINYINT_SMALLINT_INTEGER_BIGINT 100000 TRUE 10000000 TRUE 149.707 70.62 -52.82785708
BenchmarkGroupByHash.groupByHashPreCompute 8 TINYINT_SMALLINT_INTEGER_BIGINT 400000 TRUE 400000 TRUE 271.349 216.294 -20.28936904
BenchmarkGroupByHash.groupByHashPreCompute 8 TINYINT_SMALLINT_INTEGER_BIGINT 400000 TRUE 1000000 TRUE 308.117 188.871 -38.70153221
BenchmarkGroupByHash.groupByHashPreCompute 8 TINYINT_SMALLINT_INTEGER_BIGINT 400000 TRUE 10000000 TRUE 375.874 148.939 -60.37528533
BenchmarkGroupByHash.groupByHashPreCompute 8 TINYINT_SMALLINT_INTEGER_BIGINT 1000000 TRUE 1000000 TRUE 380.86 267.796 -29.68649898
BenchmarkGroupByHash.groupByHashPreCompute 8 TINYINT_SMALLINT_INTEGER_BIGINT 1000000 TRUE 10000000 TRUE 526.767 189.927 -63.94478014
BenchmarkGroupByHash.groupByHashPreCompute 8 TINYINT_SMALLINT_INTEGER_BIGINT 3000000 TRUE 10000000 TRUE 593.935 242.625 -59.14957024
BenchmarkGroupByHash.groupByHashPreCompute 10 BIGINT 8 TRUE 400000 TRUE 90.145 45.559 -49.46031394
BenchmarkGroupByHash.groupByHashPreCompute 10 BIGINT 8 TRUE 1000000 TRUE 91.879 44.206 -51.88672058
BenchmarkGroupByHash.groupByHashPreCompute 10 BIGINT 8 TRUE 10000000 TRUE 89.358 44.618 -50.06826473
BenchmarkGroupByHash.groupByHashPreCompute 10 BIGINT 100000 TRUE 400000 TRUE 279.863 151.421 -45.89459843
BenchmarkGroupByHash.groupByHashPreCompute 10 BIGINT 100000 TRUE 1000000 TRUE 328.599 157.006 -52.21957462
BenchmarkGroupByHash.groupByHashPreCompute 10 BIGINT 100000 TRUE 10000000 TRUE 321.395 129.668 -59.65463059
BenchmarkGroupByHash.groupByHashPreCompute 10 BIGINT 400000 TRUE 400000 TRUE 447.179 325.034 -27.31456531
BenchmarkGroupByHash.groupByHashPreCompute 10 BIGINT 400000 TRUE 1000000 TRUE 590.287 269.389 -54.36304713
BenchmarkGroupByHash.groupByHashPreCompute 10 BIGINT 400000 TRUE 10000000 TRUE 719.169 194.288 -72.98437502
BenchmarkGroupByHash.groupByHashPreCompute 10 BIGINT 1000000 TRUE 1000000 TRUE 538.421 349.492 -35.08945602
BenchmarkGroupByHash.groupByHashPreCompute 10 BIGINT 1000000 TRUE 10000000 TRUE 821.152 233.901 -71.51550505
BenchmarkGroupByHash.groupByHashPreCompute 10 BIGINT 3000000 TRUE 10000000 TRUE 834.155 305.253 -63.40572196

Experiments

The best gains from this change are for queries that have a group by with a lot of columns and number of groups in 10s K. It would be even better for a larger number of groups but the current partial aggregation memory limitation (16MB) makes this not as good as it could be. Below is a simple group by 8 BIGINT columns with 25K groups. It shows ~ 30% CPU drop and ~25% wall clock duration drop.

trino:tpch_sf1000_orc_part> set session use_enhanced_group_by=false;
SET SESSION
trino:tpch_sf1000_orc_part> EXPLAIN ANALYZE VERBOSE
                         -> select orderkey % 100000, orderkey % 100000 + 1,orderkey % 100000 + 2, orderkey % 100000 + 3, orderkey % 100000 + 4, orderkey % 100000 + 5, orderkey % 100000 + 6, orderkey % 100000 + 7, count(*)
                         -> from hive.tpch_sf1000_orc.lineitem
                         -> group by orderkey % 100000, orderkey % 100000 + 1,orderkey % 100000 + 2, orderkey % 100000 + 3, orderkey % 100000 + 4, orderkey % 100000 + 5, orderkey % 100000 + 6, orderkey % 100000 + 7;
 ...
 Query 20220201_161708_00117_du6m2, FINISHED, 7 nodes
http://localhost:8081/ui/query.html?20220201_161708_00117_du6m2
Splits: 3,260 total, 3,260 done (100.00%)
CPU Time: 2871.9s total, 2.09M rows/s, 1.77MB/s, 48% active
Per Node: 24.4 parallelism,   51M rows/s, 43.1MB/s
Parallelism: 170.8
Peak Memory: 79.2MB
16.81 [6B rows, 4.96GB] [357M rows/s, 302MB/s]

trino:tpch_sf1000_orc_part> set session use_enhanced_group_by=true;
SET SESSION
trino:tpch_sf1000_orc_part> EXPLAIN ANALYZE VERBOSE
                         -> select orderkey % 100000, orderkey % 100000 + 1,orderkey % 100000 + 2, orderkey % 100000 + 3, orderkey % 100000 + 4, orderkey % 100000 + 5, orderkey % 100000 + 6, orderkey % 100000 + 7, count(*)
                         -> from hive.tpch_sf1000_orc.lineitem
                         -> group by orderkey % 100000, orderkey % 100000 + 1,orderkey % 100000 + 2, orderkey % 100000 + 3, orderkey % 100000 + 4, orderkey % 100000 + 5, orderkey % 100000 + 6, orderkey % 100000 + 7;
...  
Query 20220201_161740_00119_du6m2, FINISHED, 7 nodes
http://localhost:8081/ui/query.html?20220201_161740_00119_du6m2
Splits: 3,260 total, 3,260 done (100.00%)
CPU Time: 2065.2s total 2.91M rows/s, 2.46MB/s, 47% active
Per Node: 23.7 parallelism, 68.7M rows/s, 58.1MB/s
Parallelism: 165.6
Peak Memory: 264MB
12.47 [6B rows, 4.96GB] [481M rows/s, 407MB/s]

lukasz-stec avatar Jan 20 '22 10:01 lukasz-stec

Your benchmark PDF is missing results for partitioned/unpartitioned data (for such a big change) and peak memory metrics

sopel39 avatar Feb 01 '22 16:02 sopel39

Your benchmark PDF is missing results for partitioned/unpartitioned data (for such a big change) and peak memory metrics

partitioned is there. I will run not partitioned

lukasz-stec avatar Feb 01 '22 18:02 lukasz-stec

I updated partitioned benchmark with peek memory statistics

lukasz-stec avatar Feb 01 '22 19:02 lukasz-stec

unpartitioned benchmark results added

lukasz-stec avatar Feb 02 '22 08:02 lukasz-stec

@lukasz-stec how could that peak memory didn't go up (or even dropped). Are we correctly accounting mem in this PR?

sopel39 avatar Feb 07 '22 11:02 sopel39

@lukasz-stec how could that peak memory didn't go up (or even dropped). Are we correctly accounting mem in this PR?

@sopel39 I suspect that peak memory is not "caused" by the HashAggregationOperator. I will double-check memory accounting though.

lukasz-stec avatar Feb 07 '22 12:02 lukasz-stec

I ran a benchmark for this (tpch/tpcds orc part sf1K) on top of the latest master that includes adaptive partial aggregation. There is a 5% gain for tpch overall so not bad for a change that affects only a minority of aggregations (most aggregations in tpc benchmarks are on varchar columns). IMO This shows that moving row-wise is a good direction to improve hash aggregation performance.

image

group-by-row-wise-fixed-width-orc-part-sf1K.pdf

lukasz-stec avatar Sep 02 '22 14:09 lukasz-stec

IMO This shows that moving row-wise is a good direction to improve hash aggregation performance.

I think we can start with having row-wise-signature for fast hash lookups (in MultiChannelGroupByHash). Keeping data row-wise within PagesHashStrategy is not essential for that and increases complexity

sopel39 avatar Sep 21 '22 09:09 sopel39

@lukasz-stec Rather than generating source code, we can use MethodHandle composition, see https://github.com/trinodb/trino/pull/14178

sopel39 avatar Sep 26 '22 12:09 sopel39

Rather than generating source code, we can use MethodHandle composition, see https://github.com/trinodb/trino/pull/14178

There are pros and cons to both approaches. it's way easier to read, understand and profile generated source code. On the other hand, generated byte code and method handle composition can be more elastic, work for all cases and adapt to more variables like mayHaveNull etc.

lukasz-stec avatar Sep 26 '22 13:09 lukasz-stec

👋 @lukasz-stec @sopel39 .. is this still being worked on or can we close this PR?

mosabua avatar Nov 03 '22 23:11 mosabua

Let's close it. Project hummingbird is going in this direction anyway.

lukasz-stec avatar Nov 04 '22 07:11 lukasz-stec