[SPARK-43025][SQL] Eliminate Union if filters have the same child plan
What changes were proposed in this pull request?
There are a lot of SQL with union multiple subquery with filter in user scenarios. Take an example, q1
SELECT ss_item_sk, ss_ticket_number, ss_customer_sk
FROM store_sales
LEFT JOIN store_returns
ON sr_item_sk = ss_item_sk AND sr_ticket_number = ss_ticket_number
WHERE sr_return_amt > 10000
UNION ALL
SELECT ss_item_sk, ss_ticket_number, ss_customer_sk
FROM store_sales
LEFT JOIN store_returns
ON sr_item_sk = ss_item_sk AND sr_ticket_number = ss_ticket_number
WHERE sr_return_amt < 1000
In fact, we can simplify this SQL as
SELECT ss_item_sk, ss_ticket_number, ss_customer_sk
FROM store_sales
LEFT JOIN store_returns
ON sr_item_sk = ss_item_sk
AND sr_ticket_number = ss_ticket_number
WHERE sr_return_amt > 10000 OR sr_return_amt < 1000
q2
SELECT ss_item_sk, ss_ticket_number, ss_customer_sk
FROM store_sales
WHERE ss_ext_discount_amt > 1000
UNION ALL
SELECT ss_item_sk, ss_ticket_number, ss_customer_sk
FROM store_sales
WHERE ss_ext_discount_amt < 100
In fact, we can simplify this SQL as
SELECT ss_item_sk, ss_ticket_number, ss_customer_sk
FROM store_sales
WHERE ss_ext_discount_amt > 1000 OR ss_ext_discount_amt < 100
This PR optimizes Union operators if the children exists at least two Filter by:
- Eliminate
Unionoperators if all the children areFilterand all the child of theseFilters are same. We just need merging the predicates into one single predicate by connecting theseFilters withOr. - Combines multiple
Filteroperators into one if all the child of theseFilters are same. We just need merging the predicates into one single predicate by connecting theseFilters withOrtoo.
Why are the changes needed?
Simply the SQL plan and improve the performance.
Does this PR introduce any user-facing change?
'No'. New feature and just update the inner implementation.
How was this patch tested?
New test cases. The micro benchmark for q1 and q2. Before this PR
Java HotSpot(TM) 64-Bit Server VM 1.8.0_311-b11 on Mac OS X 10.16
Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz
TPCDS Snappy: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
q1 51569 52030 653 0.6 1627.9 1.0X
Java HotSpot(TM) 64-Bit Server VM 1.8.0_311-b11 on Mac OS X 10.16
Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz
TPCDS Snappy: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
q2 4255 4287 45 6.8 147.7 1.0X
After this PR
Java HotSpot(TM) 64-Bit Server VM 1.8.0_311-b11 on Mac OS X 10.16
Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz
TPCDS Snappy: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
q1. 46806 47462 929 0.7 1477.5 1.0X
Java HotSpot(TM) 64-Bit Server VM 1.8.0_311-b11 on Mac OS X 10.16
Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz
TPCDS Snappy: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
q2. 2655 2674 28 10.8 92.2 1.0X
ping @cloud-fan
I think @peter-toth did something similar before, can you share some ideas @peter-toth ?
I think @peter-toth did something similar before, can you share some ideas @peter-toth ?
I guess @peter-toth did the similar thing for scalar subquery, but this one try to fix non-scalar subquery.
I think @peter-toth did something similar before, can you share some ideas @peter-toth ?
I guess @peter-toth did the similar thing for scalar subquery, but this one try to fix non-scalar subquery.
Sorry, I haven't got time to fully review the PR (maybe next week) but at first sight it seems to copy some fuctions (e.g. checkIdenticalPlans(), mergeNamedExpressions()) from MergeScalarSubqueries so there seems to be some room for improvement and we could share the common functions. Also, some names (e.g. subqueryIndex) might need some changes here.
This PR combines UNION ALL legs if they return disjoint set of rows from the same source node. I think this makes sense in those cases when there are overlaping scans in the legs (despite the disjoint filters), and by "overlapping" I mean that the scans use some common set of files. So seems like the only case when this change doesn't bring improvement is when the filter is a pushed-down partitioning/bucketing column filter and the scans in union legs doesn't overlap. But even in that case I'm not sure if this PR has any disadvantage, just doesn't improve anything...
BTW, MergeScalarSubqueries (https://github.com/apache/spark/pull/32298) does very similar merging, but we run that only once because merging can be costly when there are many candidates. Do we need EliminateUnions in operatorOptimizationBatch?
Sidenote: MergeScalarSubqueries doesn't work with different filters currently. This is because merging filters in subqueries is more comlicated as we need to propogate the filters up to an aggregate, and because it can cause performance degradation when we have non-overlapping scans. (See this WIP PR: https://github.com/apache/spark/pull/37630).
@peter-toth Thank you for your first look.
In fact, this PR references some functions as you mentioned above (e.g. checkIdenticalPlans). If we can share these functions, that be good.
The partitioning/bucketing column filter seems doesn't improve anything. I will optimize it in further.
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!