[SPARK-40193][SQL] Merge subquery plans with different filters
What changes were proposed in this pull request?
After https://github.com/apache/spark/pull/32298 we were able to merge scalar subquery plans. This PR is a follow-up improvement to the merging logic to be able to combine Filter nodes with different conditions if those conditions can be merged in an ancestor Aggregate node.
Consider the following query with 2 subqueries:
SELECT
(SELECT avg(a) FROM t WHERE c = 1)
(SELECT sum(a) FROM t WHERE c = 2)
where the subqueries can be merged to:
SELECT
avg(a) FILTER (WHERE c = 1),
sum(b) FILTER (WHERE c = 2)
FORM t
WHERE c = 1 OR c = 2
After this PR the 2 subqueries are merged to this optimized form:
== Optimized Logical Plan ==
Project [scalar-subquery#260 [].avg(a) AS scalarsubquery()#277, scalar-subquery#261 [].sum(b) AS scalarsubquery()#278L]
: :- Project [named_struct(avg(a), avg(a)#268, sum(b), sum(b)#271L) AS mergedValue#286]
: : +- Aggregate [avg(a#264) FILTER (WHERE propagatedFilter#285) AS avg(a)#268, sum(b#265) FILTER (WHERE propagatedFilter#284) AS sum(b)#271L]
: : +- Project [a#264, b#265, (isnotnull(c#266) AND (c#266 = 2)) AS propagatedFilter#284, (isnotnull(c#266) AND (c#266 = 1)) AS propagatedFilter#285]
: : +- Filter ((isnotnull(c#266) AND (c#266 = 1)) OR (isnotnull(c#266) AND (c#266 = 2)))
: : +- Relation spark_catalog.default.t[a#264,b#265,c#266] parquet
: +- Project [named_struct(avg(a), avg(a)#268, sum(b), sum(b)#271L) AS mergedValue#286]
: +- Aggregate [avg(a#264) FILTER (WHERE propagatedFilter#285) AS avg(a)#268, sum(b#265) FILTER (WHERE propagatedFilter#284) AS sum(b)#271L]
: +- Project [a#264, b#265, (isnotnull(c#266) AND (c#266 = 2)) AS propagatedFilter#284, (isnotnull(c#266) AND (c#266 = 1)) AS propagatedFilter#285]
: +- Filter ((isnotnull(c#266) AND (c#266 = 1)) OR (isnotnull(c#266) AND (c#266 = 2)))
: +- Relation spark_catalog.default.t[a#264,b#265,c#266] parquet
+- OneRowRelation
and physical form:
== Physical Plan ==
*(1) Project [Subquery scalar-subquery#260, [id=#148].avg(a) AS scalarsubquery()#277, ReusedSubquery Subquery scalar-subquery#260, [id=#148].sum(b) AS scalarsubquery()#278L]
: :- Subquery scalar-subquery#260, [id=#148]
: : +- *(2) Project [named_struct(avg(a), avg(a)#268, sum(b), sum(b)#271L) AS mergedValue#286]
: : +- *(2) HashAggregate(keys=[], functions=[avg(a#264), sum(b#265)], output=[avg(a)#268, sum(b)#271L])
: : +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=143]
: : +- *(1) HashAggregate(keys=[], functions=[partial_avg(a#264) FILTER (WHERE propagatedFilter#285), partial_sum(b#265) FILTER (WHERE propagatedFilter#284)], output=[sum#288, count#289L, sum#290L])
: : +- *(1) Project [a#264, b#265, (isnotnull(c#266) AND (c#266 = 2)) AS propagatedFilter#284, (isnotnull(c#266) AND (c#266 = 1)) AS propagatedFilter#285]
: : +- *(1) Filter ((isnotnull(c#266) AND (c#266 = 1)) OR (isnotnull(c#266) AND (c#266 = 2)))
: : +- *(1) ColumnarToRow
: : +- FileScan parquet spark_catalog.default.t[a#264,b#265,c#266] Batched: true, DataFilters: [((isnotnull(c#266) AND (c#266 = 1)) OR (isnotnull(c#266) AND (c#266 = 2)))], Format: Parquet, Location: ..., PartitionFilters: [], PushedFilters: [Or(And(IsNotNull(c),EqualTo(c,1)),And(IsNotNull(c),EqualTo(c,2)))], ReadSchema: struct<a:int,b:int,c:int>
: +- ReusedSubquery Subquery scalar-subquery#260, [id=#148]
+- *(1) Scan OneRowRelation[]
The PR introduces 2 configs:
spark.sql.planMerge.filterPropagation.enabledto disable filter merge andspark.sql.planMerge.filterPropagation.maxCostto control how complex plans are allowed to be merged.
Why are the changes needed?
Performance improvement.
[info] TPCDS Snappy: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] q9 - Merge different filters off 9526 9634 97 0.0 244257993.6 1.0X
[info] q9 - Merge different filters on 3798 3881 133 0.0 97381735.1 2.5X
The performance improvement in case of q9 comes from merging 15 subqueries into 1 subquery (https://github.com/apache/spark/pull/32298 was able to merge 15 subqueries into 5).
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Existing and new UTs.
cc @cloud-fan, @sigmod
@peter-toth Could you fix these conflicts. I want test this PR. Thank you!
@peter-toth Could you fix these conflicts. I want test this PR. Thank you!
I've updated the PR with the latest master.
@beliefer, I made a mistake previously with merging master (with SPARK-40618 changes) into this PR so I had to force-push to https://github.com/apache/spark/pull/37630/commits/56c287fc510b07869d18ba40228d61657543e6c6. Please check the latest version.
We tested this PR and the results is:

cc @sigmod too.
@peter-toth Could you fix the conflicts again?
@peter-toth Could you fix the conflicts again?
Sure, done.
Tested this pr using 10TB TPC-DS, the latency of q9 has been reduced by 83.39% in my production environment.
| Master | SPARK-40193 | Percentage | |
|---|---|---|---|
| q9 | 88895.32683 ms | 14766.8049 ms | 83.39% |
also cc @wangyum FYI
I extracted the first commit of this PR, that just moves MergeScalarSubqueries from spark-catalyst to spark-sql, to https://github.com/apache/spark/pull/40932 to make the actual change of this PR clearer once that PR has been merged.
I've updated this PR, the latest version contains the discussed changes from theads of https://github.com/apache/spark/pull/42223:
- I removed the physical scan equality check to make this PR simpler,
- merge is allowed only if the cost differences between the merged and original plans are low: https://github.com/apache/spark/pull/42223#issuecomment-1664916455 or there is no filter on one of the sides
cc @beliefer, @cloud-fan
Hey, is this part of generalized subquery fusion? https://www.usenix.org/conference/osdi20/presentation/sarthi
Hey, is this part of generalized subquery fusion? https://www.usenix.org/conference/osdi20/presentation/sarthi
No, this PR is not based on the above paper but our goals seems to be similar. This PR merges scalar subquery plans only, but unfortunately it got stuck due to lack of reviews. But, if it ever gets accepted I would like to take the approach futher and do full common subplan elimination/merge...
@peter-toth So exciting to see that you're still updating this PR!!
Is this pr base on spark 3.5? And support datasource v2? Could you help to merge this pr to master
@cloud-fan, @beliefer do you think we can move forward with this PR?
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!