Improve performance of broadcast hash join
What is the problem the feature request solves?
Query:
select ss_sold_date_sk, ss_sold_time_sk, ss_quantity, d_year, d_moy, d_dom
from date_dim join store_sales on d_date_sk = ss_sold_date_sk
where d_year = 2000;
Benchmark results:
AMD Ryzen 9 7950X3D 16-Core Processor
TPCDS Micro Benchmarks: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
join_inner 495 509 13 582.3 1.7 1.0X
join_inner: Comet (Scan) 736 750 14 391.4 2.6 0.7X
join_inner: Comet (Scan, Exec) 1094 1110 22 263.3 3.8 0.5X
Native metrics (for one task).
ProjectionExec: expr=[col_0@4 as col_0, col_1@5 as col_1, col_2@6 as col_2, col_1@1 as col_3, col_2@2 as col_4, col_3@3 as col_5], metrics=[output_rows=582202, elapsed_compute=181.746µs]
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(col_0@0, col_0@0)], metrics=[output_rows=582202, build_input_rows=366, output_batches=370, build_input_batches=1, input_rows=2894083, input_batches=370, build_mem_used=15032, build_time=46.427µs, join_time=18.433483ms]
CopyExec, metrics=[output_rows=366, elapsed_compute=9.938µs]
ScanExec: source=[BroadcastExchange (unknown)], schema=[col_0: Int32, col_1: Int32, col_2: Int32, col_3: Int32], metrics=[output_rows=366, elapsed_compute=560ns]
CopyExec, metrics=[output_rows=2894083, elapsed_compute=4.801962ms]
FilterExec: col_0@0 IS NOT NULL, metrics=[output_rows=2894083, elapsed_compute=23.927183ms]
ScanExec: source=[CometScan parquet spark_catalog.default.store_sales (unknown)], schema=[col_0: Int32, col_1: Int32, col_2: Int32], metrics=[output_rows=3030375, elapsed_compute=12.800384ms]
Describe the potential solution
No response
Additional context
No response
The FilterExec in the above example is even more expensive than the HashJoinExec. Evaluating the predicate is cheap but copying data to the filtered batch takes 99% of the time. We could potentially avoid this copy by using a selection vector approach instead.
Time to compute filter mask on batch of 32768 rows is: 581ns
Time to filter batch is: 252.194µs
The filter on the probe input is very simple (col_0@0 IS NOT NULL) and it should be possible to push down to the parquet scan?
edit: we do push the filter down to the scan:
+- CometFilter [ss_sold_date_sk#1545, ss_sold_time_sk#1546, ss_quantity#1555], isnotnull(ss_sold_date_sk#1545)
+- CometScan parquet ... PushedFilters: [IsNotNull(ss_sold_date_sk)], ...
The FilterExec does receive batches where ss_sold_date_sk is null though:
predicate: length=8192, true=7843, false=349
predicate: length=8192, true=7832, false=360
predicate: length=8192, true=7846, false=346
Latest results after merging https://github.com/apache/datafusion-comet/pull/835
sf 10
AMD Ryzen 9 7950X3D 16-Core Processor
TPCDS Micro Benchmarks: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
join_inner 98 110 8 293.4 3.4 1.0X
join_inner: Comet (Scan) 125 137 11 231.2 4.3 0.8X
join_inner: Comet (Scan, Exec) 151 164 12 190.9 5.2 0.7X
sf 100
OpenJDK 64-Bit Server VM 11.0.24+8-post-Ubuntu-1ubuntu322.04 on Linux 6.5.0-45-generic
AMD Ryzen 9 7950X3D 16-Core Processor
TPCDS Micro Benchmarks: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
join_inner 516 535 23 558.4 1.8 1.0X
join_inner: Comet (Scan) 747 767 10 385.8 2.6 0.7X
join_inner: Comet (Scan, Exec) 990 1018 17 291.1 3.4 0.5X