spark icon indicating copy to clipboard operation
spark copied to clipboard

[SPARK-40193][SQL] Merge subquery plans with different filters

Open peter-toth opened this issue 3 years ago • 15 comments

What changes were proposed in this pull request?

After https://github.com/apache/spark/pull/32298 we were able to merge scalar subquery plans. This PR is a follow-up improvement to the merging logic to be able to combine Filter nodes with different conditions if those conditions can be merged in an ancestor Aggregate node.

Consider the following query with 2 subqueries:

SELECT
  (SELECT avg(a) FROM t WHERE c = 1)
  (SELECT sum(a) FROM t WHERE c = 2)

where the subqueries can be merged to:

SELECT
  avg(a) FILTER (WHERE c = 1),
  sum(b) FILTER (WHERE c = 2)
FORM t
WHERE c = 1 OR c = 2

After this PR the 2 subqueries are merged to this optimized form:

== Optimized Logical Plan ==
Project [scalar-subquery#260 [].avg(a) AS scalarsubquery()#277, scalar-subquery#261 [].sum(b) AS scalarsubquery()#278L]
:  :- Project [named_struct(avg(a), avg(a)#268, sum(b), sum(b)#271L) AS mergedValue#286]
:  :  +- Aggregate [avg(a#264) FILTER (WHERE propagatedFilter#285) AS avg(a)#268, sum(b#265) FILTER (WHERE propagatedFilter#284) AS sum(b)#271L]
:  :     +- Project [a#264, b#265, (isnotnull(c#266) AND (c#266 = 2)) AS propagatedFilter#284, (isnotnull(c#266) AND (c#266 = 1)) AS propagatedFilter#285]
:  :        +- Filter ((isnotnull(c#266) AND (c#266 = 1)) OR (isnotnull(c#266) AND (c#266 = 2)))
:  :           +- Relation spark_catalog.default.t[a#264,b#265,c#266] parquet
:  +- Project [named_struct(avg(a), avg(a)#268, sum(b), sum(b)#271L) AS mergedValue#286]
:     +- Aggregate [avg(a#264) FILTER (WHERE propagatedFilter#285) AS avg(a)#268, sum(b#265) FILTER (WHERE propagatedFilter#284) AS sum(b)#271L]
:        +- Project [a#264, b#265, (isnotnull(c#266) AND (c#266 = 2)) AS propagatedFilter#284, (isnotnull(c#266) AND (c#266 = 1)) AS propagatedFilter#285]
:           +- Filter ((isnotnull(c#266) AND (c#266 = 1)) OR (isnotnull(c#266) AND (c#266 = 2)))
:              +- Relation spark_catalog.default.t[a#264,b#265,c#266] parquet
+- OneRowRelation

and physical form:

== Physical Plan ==
*(1) Project [Subquery scalar-subquery#260, [id=#148].avg(a) AS scalarsubquery()#277, ReusedSubquery Subquery scalar-subquery#260, [id=#148].sum(b) AS scalarsubquery()#278L]
:  :- Subquery scalar-subquery#260, [id=#148]
:  :  +- *(2) Project [named_struct(avg(a), avg(a)#268, sum(b), sum(b)#271L) AS mergedValue#286]
:  :     +- *(2) HashAggregate(keys=[], functions=[avg(a#264), sum(b#265)], output=[avg(a)#268, sum(b)#271L])
:  :        +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=143]
:  :           +- *(1) HashAggregate(keys=[], functions=[partial_avg(a#264) FILTER (WHERE propagatedFilter#285), partial_sum(b#265) FILTER (WHERE propagatedFilter#284)], output=[sum#288, count#289L, sum#290L])
:  :              +- *(1) Project [a#264, b#265, (isnotnull(c#266) AND (c#266 = 2)) AS propagatedFilter#284, (isnotnull(c#266) AND (c#266 = 1)) AS propagatedFilter#285]
:  :                 +- *(1) Filter ((isnotnull(c#266) AND (c#266 = 1)) OR (isnotnull(c#266) AND (c#266 = 2)))
:  :                    +- *(1) ColumnarToRow
:  :                       +- FileScan parquet spark_catalog.default.t[a#264,b#265,c#266] Batched: true, DataFilters: [((isnotnull(c#266) AND (c#266 = 1)) OR (isnotnull(c#266) AND (c#266 = 2)))], Format: Parquet, Location: ..., PartitionFilters: [], PushedFilters: [Or(And(IsNotNull(c),EqualTo(c,1)),And(IsNotNull(c),EqualTo(c,2)))], ReadSchema: struct<a:int,b:int,c:int>
:  +- ReusedSubquery Subquery scalar-subquery#260, [id=#148]
+- *(1) Scan OneRowRelation[]

The PR introduces 2 configs:

  • spark.sql.planMerge.filterPropagation.enabled to disable filter merge and
  • spark.sql.planMerge.filterPropagation.maxCost to control how complex plans are allowed to be merged.

Why are the changes needed?

Performance improvement.

[info] TPCDS Snappy:                             Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] q9 - Merge different filters off                   9526           9634          97          0.0   244257993.6       1.0X
[info] q9 - Merge different filters on                    3798           3881         133          0.0    97381735.1       2.5X

The performance improvement in case of q9 comes from merging 15 subqueries into 1 subquery (https://github.com/apache/spark/pull/32298 was able to merge 15 subqueries into 5).

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Existing and new UTs.

peter-toth avatar Aug 23 '22 14:08 peter-toth

cc @cloud-fan, @sigmod

peter-toth avatar Aug 23 '22 14:08 peter-toth

@peter-toth Could you fix these conflicts. I want test this PR. Thank you!

beliefer avatar Nov 07 '22 02:11 beliefer

@peter-toth Could you fix these conflicts. I want test this PR. Thank you!

I've updated the PR with the latest master.

peter-toth avatar Nov 07 '22 09:11 peter-toth

@beliefer, I made a mistake previously with merging master (with SPARK-40618 changes) into this PR so I had to force-push to https://github.com/apache/spark/pull/37630/commits/56c287fc510b07869d18ba40228d61657543e6c6. Please check the latest version.

peter-toth avatar Nov 07 '22 14:11 peter-toth

We tested this PR and the results is: image

cc @sigmod too.

beliefer avatar Nov 16 '22 01:11 beliefer

@peter-toth Could you fix the conflicts again?

beliefer avatar Nov 16 '22 01:11 beliefer

@peter-toth Could you fix the conflicts again?

Sure, done.

peter-toth avatar Nov 17 '22 14:11 peter-toth

Tested this pr using 10TB TPC-DS, the latency of q9 has been reduced by 83.39% in my production environment.

Master SPARK-40193 Percentage
q9 88895.32683 ms 14766.8049 ms 83.39%

also cc @wangyum FYI

LuciferYang avatar Apr 18 '23 13:04 LuciferYang

I extracted the first commit of this PR, that just moves MergeScalarSubqueries from spark-catalyst to spark-sql, to https://github.com/apache/spark/pull/40932 to make the actual change of this PR clearer once that PR has been merged.

peter-toth avatar Apr 24 '23 15:04 peter-toth

I've updated this PR, the latest version contains the discussed changes from theads of https://github.com/apache/spark/pull/42223:

  • I removed the physical scan equality check to make this PR simpler,
  • merge is allowed only if the cost differences between the merged and original plans are low: https://github.com/apache/spark/pull/42223#issuecomment-1664916455 or there is no filter on one of the sides

cc @beliefer, @cloud-fan

peter-toth avatar Aug 22 '23 17:08 peter-toth

Hey, is this part of generalized subquery fusion? https://www.usenix.org/conference/osdi20/presentation/sarthi

benjamin-j-c avatar Dec 14 '23 20:12 benjamin-j-c

Hey, is this part of generalized subquery fusion? https://www.usenix.org/conference/osdi20/presentation/sarthi

No, this PR is not based on the above paper but our goals seems to be similar. This PR merges scalar subquery plans only, but unfortunately it got stuck due to lack of reviews. But, if it ever gets accepted I would like to take the approach futher and do full common subplan elimination/merge...

peter-toth avatar Dec 14 '23 21:12 peter-toth

@peter-toth So exciting to see that you're still updating this PR!!

Is this pr base on spark 3.5? And support datasource v2? Could you help to merge this pr to master

unigof avatar Dec 15 '23 07:12 unigof

@cloud-fan, @beliefer do you think we can move forward with this PR?

peter-toth avatar Dec 15 '23 11:12 peter-toth

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

github-actions[bot] avatar Oct 10 '24 00:10 github-actions[bot]