datafusion icon indicating copy to clipboard operation
datafusion copied to clipboard

extract OR clause for join

Open HuSen8891 opened this issue 3 years ago • 1 comments

Which issue does this PR close?

Closes #3577

Rationale for this change

optimize join's performance when join has OR clauses in filter or on clause.

examine any OR clauses of join to see if any useful clauses can be extracted and push down to join's rel to filter more rows before join.

for TPCH q19

the logical plan before optimized

    "Explain [plan_type:Utf8, plan:Utf8]",
    "  Projection: #lineitem.l_partkey [l_partkey:Int64]",
    "    Projection: #part.p_size >= Int32(1) AS #part.p_size >= Int32(1)Int32(1)#part.p_size, #lineitem.l_partkey, #lineitem.l_quantity, #part.p_brand, #part.p_size [#part.p_size >= Int32(1)Int32(1)#part.p_size:Boolean;N, l_partkey:Int64, l_quantity:Decimal128(15, 2), p_brand:Utf8, p_size:Int32]",
    "      Filter: #part.p_brand = Utf8(\"Brand#12\") AND #lineitem.l_quantity >= Decimal128(Some(100),15,2) AND #lineitem.l_quantity <= Decimal128(Some(1100),15,2) AND #part.p_size <= Int32(5) OR #part.p_brand = Utf8(\"Brand#23\") AND #lineitem.l_quantity >= Decimal128(Some(1000),15,2) AND #lineitem.l_quantity <= Decimal128(Some(2000),15,2) AND #part.p_size <= Int32(10) OR #part.p_brand = Utf8(\"Brand#34\") AND #lineitem.l_quantity >= Decimal128(Some(2000),15,2) AND #lineitem.l_quantity <= Decimal128(Some(3000),15,2) AND #part.p_size <= Int32(15) [l_partkey:Int64, l_quantity:Decimal128(15, 2), p_partkey:Int64, p_brand:Utf8, p_size:Int32]",
    "        Inner Join: #lineitem.l_partkey = #part.p_partkey [l_partkey:Int64, l_quantity:Decimal128(15, 2), p_partkey:Int64, p_brand:Utf8, p_size:Int32]",
    "          TableScan: lineitem projection=[l_partkey, l_quantity] [l_partkey:Int64, l_quantity:Decimal128(15, 2)]",
    "          Filter: #part.p_size >= Int32(1) [p_partkey:Int64, p_brand:Utf8, p_size:Int32]",
    "            TableScan: part projection=[p_partkey, p_brand, p_size], partial_filters=[#part.p_size >= Int32(1)] [p_partkey:Int64, p_brand:Utf8, p_size:Int32]",

the logical plan after optimized

    "Explain [plan_type:Utf8, plan:Utf8]",
    "  Projection: #lineitem.l_partkey [l_partkey:Int64]",
    "    Projection: #part.p_size >= Int32(1) AS #part.p_size >= Int32(1)Int32(1)#part.p_size, #lineitem.l_partkey, #lineitem.l_quantity, #part.p_brand, #part.p_size [#part.p_size >= Int32(1)Int32(1)#part.p_size:Boolean;N, l_partkey:Int64, l_quantity:Decimal128(15, 2), p_brand:Utf8, p_size:Int32]",
    "      Filter: #part.p_brand = Utf8(\"Brand#12\") AND #lineitem.l_quantity >= Decimal128(Some(100),15,2) AND #lineitem.l_quantity <= Decimal128(Some(1100),15,2) AND #part.p_size <= Int32(5) OR #part.p_brand = Utf8(\"Brand#23\") AND #lineitem.l_quantity >= Decimal128(Some(1000),15,2) AND #lineitem.l_quantity <= Decimal128(Some(2000),15,2) AND #part.p_size <= Int32(10) OR #part.p_brand = Utf8(\"Brand#34\") AND #lineitem.l_quantity >= Decimal128(Some(2000),15,2) AND #lineitem.l_quantity <= Decimal128(Some(3000),15,2) AND #part.p_size <= Int32(15) [l_partkey:Int64, l_quantity:Decimal128(15, 2), p_partkey:Int64, p_brand:Utf8, p_size:Int32]",
    "        Inner Join: #lineitem.l_partkey = #part.p_partkey [l_partkey:Int64, l_quantity:Decimal128(15, 2), p_partkey:Int64, p_brand:Utf8, p_size:Int32]",
    "          Filter: #lineitem.l_quantity >= Decimal128(Some(100),15,2) AND #lineitem.l_quantity <= Decimal128(Some(1100),15,2) OR #lineitem.l_quantity >= Decimal128(Some(1000),15,2) AND #lineitem.l_quantity <= Decimal128(Some(2000),15,2) OR #lineitem.l_quantity >= Decimal128(Some(2000),15,2) AND #lineitem.l_quantity <= Decimal128(Some(3000),15,2) [l_partkey:Int64, l_quantity:Decimal128(15, 2)]",
    "            TableScan: lineitem projection=[l_partkey, l_quantity], partial_filters=[#lineitem.l_quantity >= Decimal128(Some(100),15,2) AND #lineitem.l_quantity <= Decimal128(Some(1100),15,2) OR #lineitem.l_quantity >= Decimal128(Some(1000),15,2) AND #lineitem.l_quantity <= Decimal128(Some(2000),15,2) OR #lineitem.l_quantity >= Decimal128(Some(2000),15,2) AND #lineitem.l_quantity <= Decimal128(Some(3000),15,2)] [l_partkey:Int64, l_quantity:Decimal128(15, 2)]",
    "          Filter: #part.p_size >= Int32(1) AND #part.p_brand = Utf8(\"Brand#12\") AND #part.p_size <= Int32(5) OR #part.p_brand = Utf8(\"Brand#23\") AND #part.p_size <= Int32(10) OR #part.p_brand = Utf8(\"Brand#34\") AND #part.p_size <= Int32(15) [p_partkey:Int64, p_brand:Utf8, p_size:Int32]",
    "            TableScan: part projection=[p_partkey, p_brand, p_size], partial_filters=[#part.p_size >= Int32(1), #part.p_brand = Utf8(\"Brand#12\") AND #part.p_size <= Int32(5) OR #part.p_brand = Utf8(\"Brand#23\") AND #part.p_size <= Int32(10) OR #part.p_brand = Utf8(\"Brand#34\") AND #part.p_size <= Int32(15)] [p_partkey:Int64, p_brand:Utf8, p_size:Int32]",

we extract new predicate and push down to join's rel, this predicate filters more rows before join , makes join more effective.

What changes are included in this PR?

add extract OR clasue in datafusion/optimizer/src/filter_push_down.rs.

HuSen8891 avatar Sep 21 '22 18:09 HuSen8891

Codecov Report

Merging #3578 (f65dd47) into master (0a2b0a7) will increase coverage by 0.00%. The diff coverage is 98.16%.

@@           Coverage Diff           @@
##           master    #3578   +/-   ##
=======================================
  Coverage   85.92%   85.92%           
=======================================
  Files         301      300    -1     
  Lines       56249    56309   +60     
=======================================
+ Hits        48330    48383   +53     
- Misses       7919     7926    +7     
Impacted Files Coverage Δ
datafusion/core/src/execution/context.rs 79.31% <ø> (-0.03%) :arrow_down:
datafusion/core/tests/sql/joins.rs 99.33% <ø> (ø)
datafusion/core/tests/sql/predicates.rs 100.00% <ø> (ø)
datafusion/core/src/physical_plan/planner.rs 77.47% <90.90%> (+0.11%) :arrow_up:
datafusion/optimizer/src/filter_push_down.rs 97.76% <98.75%> (+0.09%) :arrow_up:
datafusion/core/src/physical_plan/sorts/sort.rs 93.86% <100.00%> (-0.60%) :arrow_down:
...e/src/physical_plan/sorts/sort_preserving_merge.rs 93.49% <0.00%> (-0.36%) :arrow_down:
datafusion/expr/src/logical_plan/plan.rs 77.92% <0.00%> (-0.33%) :arrow_down:
datafusion/expr/src/binary_rule.rs 84.50% <0.00%> (-0.09%) :arrow_down:
... and 3 more

:mega: We’re building smart automated test selection to slash your CI/CD build times. Learn more

codecov-commenter avatar Sep 21 '22 20:09 codecov-commenter

I am sorry for the late review here -- it is on my list. Basically I am struggling to find time enough to sit down and convince myself that this is a correct transformation in all cases (esp with outer joins and nullability), and then also that we want to do this kind of pushdown. I just haven't had the time yet.

alamb avatar Oct 07 '22 10:10 alamb

I am sorry for the late review here -- it is on my list. Basically I am struggling to find time enough to sit down and convince myself that this is a correct transformation in all cases (esp with outer joins and nullability), and then also that we want to do this kind of pushdown. I just haven't had the time yet.

Thanks!

HuSen8891 avatar Oct 07 '22 14:10 HuSen8891

I would be willing to help write some more tests / comments for this PR if others think it is a good idea.

alamb avatar Oct 12 '22 18:10 alamb

@AssHero do you have some numbers, e.g. how much does it help q19?

@alamb one thing we might consider is to apply the optimization whenever there is an expensive parent node (joins/aggregate) that benefits from more filtering? Anyway, in most cases, filtering should be cheap compared to other operations as long as the expression is not very expensive (for example, no expensive UDF).

Dandandan avatar Oct 12 '22 19:10 Dandandan

@AssHero do you have some numbers, e.g. how much does it help q19?

@alamb one thing we might consider is to apply the optimization whenever there is an expensive parent node (joins/aggregate) that benefits from more filtering? Anyway, in most cases, filtering should be cheap compared to other operations as long as the expression is not very expensive (for example, no expensive UDF).

Before this commit 'Convert more cross joins to inner joins'(https://github.com/apache/arrow-datafusion/pull/3482), this optimization helps much in q19, and I intruduce this optimization to solve q19's performance problem. Now q19 is transformed to inner join, and this helps little in q19, but I think this will help other queries with OR clauses.

HuSen8891 avatar Oct 13 '22 08:10 HuSen8891

I think this optimization might actually slow some plans down (if the filters that are pushed down don't actually filter many rows, they will just consume CPU). I don't have a great suggestion about this

I have made some tests on q19 with this optimization on current version, it does not slow down the queries even through it helps little(may consume more CPU). I think more better way is combined with statistic data(CBO). With the statistic data, we can evaluate the rows filtered by this predicate, then make the decision.

HuSen8891 avatar Oct 13 '22 09:10 HuSen8891

add some comments about extract_or_clause.

HuSen8891 avatar Oct 13 '22 15:10 HuSen8891

I took the liberty of merging this branch from master and resolving the merge conflict in fe582a7

alamb avatar Oct 15 '22 11:10 alamb

Thanks again for sticking with this one @AssHero

alamb avatar Oct 15 '22 11:10 alamb

The newly added TPCH plan benchmarks needed to be updated as well

alamb avatar Oct 15 '22 11:10 alamb

@alamb running the benchmarks now for q7 and q19 posting them here when done

Dandandan avatar Oct 15 '22 11:10 Dandandan

~~Looks like performance regressed a bit :/ we probably have a better look before merging it @alamb~~

Looks like roughly a 1.35x speedup on q7 and a smaller 1.16x speedup on q19 :rocket:

Running with (on partitioned parquet data - 16 partitions, 16 vCPU):

cargo run --release --bin tpch -- benchmark datafusion --iterations 3 --path [path] --format parquet --query [query] --batch-size 8192 -n 16

Master:

Query 7 iteration 0 took 6488.3 ms and returned 4 rows
Query 7 iteration 1 took 6235.8 ms and returned 4 rows
Query 7 iteration 2 took 6293.0 ms and returned 4 rows
Query 7 avg time: 6339.04 ms

Query 19 iteration 0 took 1180.6 ms and returned 1 rows
Query 19 iteration 1 took 1012.3 ms and returned 1 rows
Query 19 iteration 2 took 1017.0 ms and returned 1 rows
Query 19 avg time: 1069.99 ms

This branch:

Query 7 iteration 0 took 4856.0 ms and returned 4 rows
Query 7 iteration 1 took 4645.7 ms and returned 4 rows
Query 7 iteration 2 took 4661.6 ms and returned 4 rows
Query 7 avg time: 4721.10 ms

Query 19 iteration 0 took 1021.7 ms and returned 1 rows
Query 19 iteration 1 took 866.5 ms and returned 1 rows
Query 19 iteration 2 took 872.9 ms and returned 1 rows
Query 19 avg time: 920.37 ms

Dandandan avatar Oct 15 '22 12:10 Dandandan

Hm getting different results now - may have been something running in the background on my machine - will update the results accordingly

Dandandan avatar Oct 15 '22 12:10 Dandandan

Ok - updated the benchmark results - looks good to go now!

Dandandan avatar Oct 15 '22 12:10 Dandandan

Thanks @AssHero @alamb

Dandandan avatar Oct 15 '22 13:10 Dandandan

Benchmark runs are scheduled for baseline = fc5081d48ef59e39c1b353dd45fcd13af6186676 and contender = e02376ddc431a818e1f19a5bb16fe45307a512e8. e02376ddc431a818e1f19a5bb16fe45307a512e8 is a master commit associated with this PR. Results will be available as each benchmark for each run completes. Conbench compare runs links: [Skipped :warning: Benchmarking of arrow-datafusion-commits is not supported on ec2-t3-xlarge-us-east-2] ec2-t3-xlarge-us-east-2 [Skipped :warning: Benchmarking of arrow-datafusion-commits is not supported on test-mac-arm] test-mac-arm [Skipped :warning: Benchmarking of arrow-datafusion-commits is not supported on ursa-i9-9960x] ursa-i9-9960x [Skipped :warning: Benchmarking of arrow-datafusion-commits is not supported on ursa-thinkcentre-m75q] ursa-thinkcentre-m75q Buildkite builds: Supported benchmarks: ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True test-mac-arm: Supported benchmark langs: C++, Python, R ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

ursabot avatar Oct 15 '22 13:10 ursabot

Nice! Thanks everyone

alamb avatar Oct 16 '22 10:10 alamb