extract OR clause for join
Which issue does this PR close?
Closes #3577
Rationale for this change
optimize join's performance when join has OR clauses in filter or on clause.
examine any OR clauses of join to see if any useful clauses can be extracted and push down to join's rel to filter more rows before join.
for TPCH q19
the logical plan before optimized
"Explain [plan_type:Utf8, plan:Utf8]",
" Projection: #lineitem.l_partkey [l_partkey:Int64]",
" Projection: #part.p_size >= Int32(1) AS #part.p_size >= Int32(1)Int32(1)#part.p_size, #lineitem.l_partkey, #lineitem.l_quantity, #part.p_brand, #part.p_size [#part.p_size >= Int32(1)Int32(1)#part.p_size:Boolean;N, l_partkey:Int64, l_quantity:Decimal128(15, 2), p_brand:Utf8, p_size:Int32]",
" Filter: #part.p_brand = Utf8(\"Brand#12\") AND #lineitem.l_quantity >= Decimal128(Some(100),15,2) AND #lineitem.l_quantity <= Decimal128(Some(1100),15,2) AND #part.p_size <= Int32(5) OR #part.p_brand = Utf8(\"Brand#23\") AND #lineitem.l_quantity >= Decimal128(Some(1000),15,2) AND #lineitem.l_quantity <= Decimal128(Some(2000),15,2) AND #part.p_size <= Int32(10) OR #part.p_brand = Utf8(\"Brand#34\") AND #lineitem.l_quantity >= Decimal128(Some(2000),15,2) AND #lineitem.l_quantity <= Decimal128(Some(3000),15,2) AND #part.p_size <= Int32(15) [l_partkey:Int64, l_quantity:Decimal128(15, 2), p_partkey:Int64, p_brand:Utf8, p_size:Int32]",
" Inner Join: #lineitem.l_partkey = #part.p_partkey [l_partkey:Int64, l_quantity:Decimal128(15, 2), p_partkey:Int64, p_brand:Utf8, p_size:Int32]",
" TableScan: lineitem projection=[l_partkey, l_quantity] [l_partkey:Int64, l_quantity:Decimal128(15, 2)]",
" Filter: #part.p_size >= Int32(1) [p_partkey:Int64, p_brand:Utf8, p_size:Int32]",
" TableScan: part projection=[p_partkey, p_brand, p_size], partial_filters=[#part.p_size >= Int32(1)] [p_partkey:Int64, p_brand:Utf8, p_size:Int32]",
the logical plan after optimized
"Explain [plan_type:Utf8, plan:Utf8]",
" Projection: #lineitem.l_partkey [l_partkey:Int64]",
" Projection: #part.p_size >= Int32(1) AS #part.p_size >= Int32(1)Int32(1)#part.p_size, #lineitem.l_partkey, #lineitem.l_quantity, #part.p_brand, #part.p_size [#part.p_size >= Int32(1)Int32(1)#part.p_size:Boolean;N, l_partkey:Int64, l_quantity:Decimal128(15, 2), p_brand:Utf8, p_size:Int32]",
" Filter: #part.p_brand = Utf8(\"Brand#12\") AND #lineitem.l_quantity >= Decimal128(Some(100),15,2) AND #lineitem.l_quantity <= Decimal128(Some(1100),15,2) AND #part.p_size <= Int32(5) OR #part.p_brand = Utf8(\"Brand#23\") AND #lineitem.l_quantity >= Decimal128(Some(1000),15,2) AND #lineitem.l_quantity <= Decimal128(Some(2000),15,2) AND #part.p_size <= Int32(10) OR #part.p_brand = Utf8(\"Brand#34\") AND #lineitem.l_quantity >= Decimal128(Some(2000),15,2) AND #lineitem.l_quantity <= Decimal128(Some(3000),15,2) AND #part.p_size <= Int32(15) [l_partkey:Int64, l_quantity:Decimal128(15, 2), p_partkey:Int64, p_brand:Utf8, p_size:Int32]",
" Inner Join: #lineitem.l_partkey = #part.p_partkey [l_partkey:Int64, l_quantity:Decimal128(15, 2), p_partkey:Int64, p_brand:Utf8, p_size:Int32]",
" Filter: #lineitem.l_quantity >= Decimal128(Some(100),15,2) AND #lineitem.l_quantity <= Decimal128(Some(1100),15,2) OR #lineitem.l_quantity >= Decimal128(Some(1000),15,2) AND #lineitem.l_quantity <= Decimal128(Some(2000),15,2) OR #lineitem.l_quantity >= Decimal128(Some(2000),15,2) AND #lineitem.l_quantity <= Decimal128(Some(3000),15,2) [l_partkey:Int64, l_quantity:Decimal128(15, 2)]",
" TableScan: lineitem projection=[l_partkey, l_quantity], partial_filters=[#lineitem.l_quantity >= Decimal128(Some(100),15,2) AND #lineitem.l_quantity <= Decimal128(Some(1100),15,2) OR #lineitem.l_quantity >= Decimal128(Some(1000),15,2) AND #lineitem.l_quantity <= Decimal128(Some(2000),15,2) OR #lineitem.l_quantity >= Decimal128(Some(2000),15,2) AND #lineitem.l_quantity <= Decimal128(Some(3000),15,2)] [l_partkey:Int64, l_quantity:Decimal128(15, 2)]",
" Filter: #part.p_size >= Int32(1) AND #part.p_brand = Utf8(\"Brand#12\") AND #part.p_size <= Int32(5) OR #part.p_brand = Utf8(\"Brand#23\") AND #part.p_size <= Int32(10) OR #part.p_brand = Utf8(\"Brand#34\") AND #part.p_size <= Int32(15) [p_partkey:Int64, p_brand:Utf8, p_size:Int32]",
" TableScan: part projection=[p_partkey, p_brand, p_size], partial_filters=[#part.p_size >= Int32(1), #part.p_brand = Utf8(\"Brand#12\") AND #part.p_size <= Int32(5) OR #part.p_brand = Utf8(\"Brand#23\") AND #part.p_size <= Int32(10) OR #part.p_brand = Utf8(\"Brand#34\") AND #part.p_size <= Int32(15)] [p_partkey:Int64, p_brand:Utf8, p_size:Int32]",
we extract new predicate and push down to join's rel, this predicate filters more rows before join , makes join more effective.
What changes are included in this PR?
add extract OR clasue in datafusion/optimizer/src/filter_push_down.rs.
Codecov Report
Merging #3578 (f65dd47) into master (0a2b0a7) will increase coverage by
0.00%. The diff coverage is98.16%.
@@ Coverage Diff @@
## master #3578 +/- ##
=======================================
Coverage 85.92% 85.92%
=======================================
Files 301 300 -1
Lines 56249 56309 +60
=======================================
+ Hits 48330 48383 +53
- Misses 7919 7926 +7
| Impacted Files | Coverage Δ | |
|---|---|---|
| datafusion/core/src/execution/context.rs | 79.31% <ø> (-0.03%) |
:arrow_down: |
| datafusion/core/tests/sql/joins.rs | 99.33% <ø> (ø) |
|
| datafusion/core/tests/sql/predicates.rs | 100.00% <ø> (ø) |
|
| datafusion/core/src/physical_plan/planner.rs | 77.47% <90.90%> (+0.11%) |
:arrow_up: |
| datafusion/optimizer/src/filter_push_down.rs | 97.76% <98.75%> (+0.09%) |
:arrow_up: |
| datafusion/core/src/physical_plan/sorts/sort.rs | 93.86% <100.00%> (-0.60%) |
:arrow_down: |
| ...e/src/physical_plan/sorts/sort_preserving_merge.rs | 93.49% <0.00%> (-0.36%) |
:arrow_down: |
| datafusion/expr/src/logical_plan/plan.rs | 77.92% <0.00%> (-0.33%) |
:arrow_down: |
| datafusion/expr/src/binary_rule.rs | 84.50% <0.00%> (-0.09%) |
:arrow_down: |
| ... and 3 more |
:mega: We’re building smart automated test selection to slash your CI/CD build times. Learn more
I am sorry for the late review here -- it is on my list. Basically I am struggling to find time enough to sit down and convince myself that this is a correct transformation in all cases (esp with outer joins and nullability), and then also that we want to do this kind of pushdown. I just haven't had the time yet.
I am sorry for the late review here -- it is on my list. Basically I am struggling to find time enough to sit down and convince myself that this is a correct transformation in all cases (esp with outer joins and nullability), and then also that we want to do this kind of pushdown. I just haven't had the time yet.
Thanks!
I would be willing to help write some more tests / comments for this PR if others think it is a good idea.
@AssHero do you have some numbers, e.g. how much does it help q19?
@alamb one thing we might consider is to apply the optimization whenever there is an expensive parent node (joins/aggregate) that benefits from more filtering? Anyway, in most cases, filtering should be cheap compared to other operations as long as the expression is not very expensive (for example, no expensive UDF).
@AssHero do you have some numbers, e.g. how much does it help q19?
@alamb one thing we might consider is to apply the optimization whenever there is an expensive parent node (joins/aggregate) that benefits from more filtering? Anyway, in most cases, filtering should be cheap compared to other operations as long as the expression is not very expensive (for example, no expensive UDF).
Before this commit 'Convert more cross joins to inner joins'(https://github.com/apache/arrow-datafusion/pull/3482), this optimization helps much in q19, and I intruduce this optimization to solve q19's performance problem. Now q19 is transformed to inner join, and this helps little in q19, but I think this will help other queries with OR clauses.
I think this optimization might actually slow some plans down (if the filters that are pushed down don't actually filter many rows, they will just consume CPU). I don't have a great suggestion about this
I have made some tests on q19 with this optimization on current version, it does not slow down the queries even through it helps little(may consume more CPU). I think more better way is combined with statistic data(CBO). With the statistic data, we can evaluate the rows filtered by this predicate, then make the decision.
add some comments about extract_or_clause.
I took the liberty of merging this branch from master and resolving the merge conflict in fe582a7
Thanks again for sticking with this one @AssHero
The newly added TPCH plan benchmarks needed to be updated as well
@alamb running the benchmarks now for q7 and q19 posting them here when done
~~Looks like performance regressed a bit :/ we probably have a better look before merging it @alamb~~
Looks like roughly a 1.35x speedup on q7 and a smaller 1.16x speedup on q19 :rocket:
Running with (on partitioned parquet data - 16 partitions, 16 vCPU):
cargo run --release --bin tpch -- benchmark datafusion --iterations 3 --path [path] --format parquet --query [query] --batch-size 8192 -n 16
Master:
Query 7 iteration 0 took 6488.3 ms and returned 4 rows
Query 7 iteration 1 took 6235.8 ms and returned 4 rows
Query 7 iteration 2 took 6293.0 ms and returned 4 rows
Query 7 avg time: 6339.04 ms
Query 19 iteration 0 took 1180.6 ms and returned 1 rows
Query 19 iteration 1 took 1012.3 ms and returned 1 rows
Query 19 iteration 2 took 1017.0 ms and returned 1 rows
Query 19 avg time: 1069.99 ms
This branch:
Query 7 iteration 0 took 4856.0 ms and returned 4 rows
Query 7 iteration 1 took 4645.7 ms and returned 4 rows
Query 7 iteration 2 took 4661.6 ms and returned 4 rows
Query 7 avg time: 4721.10 ms
Query 19 iteration 0 took 1021.7 ms and returned 1 rows
Query 19 iteration 1 took 866.5 ms and returned 1 rows
Query 19 iteration 2 took 872.9 ms and returned 1 rows
Query 19 avg time: 920.37 ms
Hm getting different results now - may have been something running in the background on my machine - will update the results accordingly
Ok - updated the benchmark results - looks good to go now!
Thanks @AssHero @alamb
Benchmark runs are scheduled for baseline = fc5081d48ef59e39c1b353dd45fcd13af6186676 and contender = e02376ddc431a818e1f19a5bb16fe45307a512e8. e02376ddc431a818e1f19a5bb16fe45307a512e8 is a master commit associated with this PR. Results will be available as each benchmark for each run completes. Conbench compare runs links: [Skipped :warning: Benchmarking of arrow-datafusion-commits is not supported on ec2-t3-xlarge-us-east-2] ec2-t3-xlarge-us-east-2 [Skipped :warning: Benchmarking of arrow-datafusion-commits is not supported on test-mac-arm] test-mac-arm [Skipped :warning: Benchmarking of arrow-datafusion-commits is not supported on ursa-i9-9960x] ursa-i9-9960x [Skipped :warning: Benchmarking of arrow-datafusion-commits is not supported on ursa-thinkcentre-m75q] ursa-thinkcentre-m75q Buildkite builds: Supported benchmarks: ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True test-mac-arm: Supported benchmark langs: C++, Python, R ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java
Nice! Thanks everyone