spark icon indicating copy to clipboard operation
spark copied to clipboard

[SPARK-44571][SQL] Eliminate the Join by combine multiple Aggregates

Open beliefer opened this issue 2 years ago • 26 comments

What changes were proposed in this pull request?

Some queries contains multiple scalar subquery(aggregation without group by clause) and connected with join. The general form of joined aggregates that can be merged as follows:

<aggregation function> ::=
  SUM | AVG | MAX | ...

<aggregation subquery> ::=
  SELECT
    <aggregation function>(...)[ , <aggregation function>(...)[ , ...]]
  FROM [tab | query]

<joined aggregation > ::=
  SELECT *
  FROM (
    <aggregation subquery> [INNER | CROSS | LEFT | RIGHT | FULL OUTER] JOIN
    <aggregation subquery> [INNER | CROSS | LEFT | RIGHT | FULL OUTER] JOIN
      ...
    <aggregation subquery> [INNER | CROSS | LEFT | RIGHT | FULL OUTER] JOIN
    <aggregation subquery>
  )

For example,

SELECT *
FROM (SELECT
  avg(power) avg_power,
  count(power) count_power,
  count(DISTINCT power) count_distinct_power
FROM data
WHERE country = "USA"
  AND (id BETWEEN 1 AND 3
  OR city = "Berkeley"
  OR name = "Xiao")) B1,
  (SELECT
    avg(power) avg_power,
    count(power) count_power,
    count(DISTINCT power) count_distinct_power
  FROM data
  WHERE country = "China"
    AND (id BETWEEN 4 AND 5
    OR city = "Hangzhou"
    OR name = "Wenchen")) B2

We can optimize this SQL to the form shown below:

SELECT
  avg(power) avg_power FILTER (country = "USA" AND (id BETWEEN 1 AND 3 OR city = "Berkeley" OR name = "Xiao")),
  count(power) count_power FILTER (country = "USA" AND (id BETWEEN 1 AND 3 OR city = "Berkeley" OR name = "Xiao")),
  count(DISTINCT power) FILTER (country = "USA" AND (id BETWEEN 1 AND 3 OR city = "Berkeley" OR name = "Xiao")),
  avg(power) FILTER (country = "China" AND (id BETWEEN 4 AND 5 OR city = "Hangzhou" OR name = "Wenchen")),
  count(power) FILTER (country = "China" AND (id BETWEEN 4 AND 5 OR city = "Hangzhou" OR name = "Wenchen")),
  count(DISTINCT power) FILTER (country = "China" AND (id BETWEEN 4 AND 5 OR city = "Hangzhou" OR name = "Wenchen"))
FROM data
WHERE
(country = "USA"
  AND (id BETWEEN 1 AND 3
  OR city = "Berkeley"
  OR name = "Xiao")) OR
(country = "China"
    AND (id BETWEEN 4 AND 5
    OR city = "Hangzhou"
    OR name = "Wenchen"))

If we can merge the filters and aggregates, we can scan data source only once and eliminate the join so as avoid shuffle.

This PR also supports eliminate nested Join, please refer to: https://github.com/apache/spark/blob/master/sql/core/src/test/resources/tpcds/q28.sql

Obviously, this change will improve the performance.

This PR also reuse some functions come from MergeScalarSubqueries.

This PR also add some TreePattern for easy to check the cost of predicate.

Why are the changes needed?

Improve the performance for the case show above.

Does this PR introduce any user-facing change?

'No'. New feature.

How was this patch tested?

  1. new test cases
  2. new micro benchmark.
Benchmark CombineJoinedAggregates:                     Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
-------------------------------------------------------------------------------------------------------------------------------------
filter is not defined, CombineJoinedAggregates: false            730            819          69         28.7          34.8       1.0X
filter is not defined, CombineJoinedAggregates: true             618            632          14         33.9          29.5       1.2X
step is 1000000, CombineJoinedAggregates: false                  572            590          20         36.7          27.3       1.3X
step is 1000000, CombineJoinedAggregates: true)                  769            794          21         27.3          36.6       1.0X
step is 100000, CombineJoinedAggregates: false                   350            370          26         59.9          16.7       2.1X
step is 100000, CombineJoinedAggregates: true)                   231            241          10         90.7          11.0       3.2X
step is 10000, CombineJoinedAggregates: false                    314            340          26         66.8          15.0       2.3X
step is 10000, CombineJoinedAggregates: true)                    171            182           9        122.5           8.2       4.3X
step is 1000, CombineJoinedAggregates: false                     303            337          32         69.3          14.4       2.4X
step is 1000, CombineJoinedAggregates: true)                     162            171           9        129.4           7.7       4.5X
step is 100, CombineJoinedAggregates: false                      300            316          27         70.0          14.3       2.4X
step is 100, CombineJoinedAggregates: true)                      160            169           9        131.3           7.6       4.6X
step is 10, CombineJoinedAggregates: false                       297            325          33         70.6          14.2       2.5X
step is 10, CombineJoinedAggregates: true)                       170            203          36        123.5           8.1       4.3X
step is 1, CombineJoinedAggregates: false                        328            352          17         64.0          15.6       2.2X
step is 1, CombineJoinedAggregates: true)                        140            148           7        149.3           6.7       5.2X
Benchmark CombineJoinedAggregates:                     Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
-------------------------------------------------------------------------------------------------------------------------------------
Tree node number < 1, CombineJoinedAggregates: true)             398            503         109         52.7          19.0       1.0X
Tree node number < 9, CombineJoinedAggregates: true)             394            432          31         53.2          18.8       1.0X
Tree node number < 19, CombineJoinedAggregates: true)            399            427          47         52.6          19.0       1.0X
Tree node number < 29, CombineJoinedAggregates: true)            434            479         100         48.3          20.7       0.9X
Tree node number < 39, CombineJoinedAggregates: true)            480            499          24         43.7          22.9       0.8X
  1. manual test on TPC-DS TPC-DS data size: 2TB. This improvement is valid for TPC-DS q28 and no regression for other test cases.
TPC-DS Query Before(Seconds) After(Seconds) Speedup(Percent)
q28 109.665 43.938 249.59%

According to the micro benchmark, this improvement is worse than before if the filter has almost no selectivity.

beliefer avatar Jul 29 '23 08:07 beliefer

ping @cloud-fan @viirya cc @MaxGekk @gengliangwang

beliefer avatar Jul 31 '23 02:07 beliefer

Hi @beliefer, This is a nice optimization and awsome performance improvement to Q28, but I'm not sure the implementation is done the right way:

  • You probably know that MergeScalarSubqueries does a very similar plan merging in case of scalar subqueries so I feel this PR shouldn't reimplement the logic of MergeScalarSubqueries.tryMergePlans() but reuse the already available code and add the necessary improvements to it if needed. The plan merging code can also be extracted to a common place to share between the rules.
  • I guess probably the main reason why you reimplemented merge logic is to be able to propagate diferent filter conditions from Filter nodes up into Aggregate nodes as that is required for Q28, but currently that feature is missing from MergeScalarSubqueries.tryMergePlans(). IMO that improvement should be added to the existing code because Q9 could also benefit from that. Please note that I've already tried to add that in SPARK-40193 / https://github.com/apache/spark/pull/37630. Unfortunately I think this feature can be tricky as there might be cases when merging queries can introduce prformance degradation. E.g. such bad case is when we merge SELECT sum(a) FROM t WHERE p = 1 and SELECT sum(b) FROM t WHERE p = 2 into SELECT sum(a) FILTER (p = 1), sum(b) FILTER (p = 2) FROM t WHERE p = 1 OR p = 2 and p is a partitioning column as we need to process more, but we don't scan less data in the merged query. To avoid that I used a trick in https://github.com/apache/spark/pull/37630 to peek into the physical plan to check that only pushed data filters differ but partitioning and bucketing filters match. That trick made the implementation a bit complex, but we don't need to stick to that, we could also disable filter propagation during plan merging by default as this PR does. Anyway, my point is that probably the feature should be added to a common place (MergeScalarSubqueries.tryMergePlans()).

So I would suggest:

  • narrowing the scope of SPARK-44571 / this PR to merge joined aggregate queries using the existing merge logic and
  • using SPARK-40193 to handle filter propagation during plan merging. I'm also happy to revive https://github.com/apache/spark/pull/37630 if there is any interrest.

peter-toth avatar Jul 31 '23 09:07 peter-toth

@peter-toth Thank you for the review. For your first point, we surely could reuse some function if there are some code is similar to SPARK-40193 / https://github.com/apache/spark/pull/37630. If https://github.com/apache/spark/pull/37630 could be merged first, I will reuse them. Otherwise, this PR merged first, I will create follow up PR for reuse.

For your second point, I'm OK to pass Filter nodes up into Aggregate nodes if the tryMergePlans can be reused. The reason about prformance degradation is Filter haven't better or higher selective. Please refer the benchmark in the description. So I added the config and the default value is false.

Your suggestion is welcome. But I think what's the chance to reuse them? If https://github.com/apache/spark/pull/37630 merged first, I will reuse them directly. Otherwise, I will create follow up PR to fix.

beliefer avatar Jul 31 '23 11:07 beliefer

@cloud-fan The CI failure is unrelated.

beliefer avatar Aug 01 '23 11:08 beliefer

Instead of showing an example query, can you define the general form of joined aggregates that can be merged?

cloud-fan avatar Aug 01 '23 14:08 cloud-fan

BTW, in my https://github.com/apache/spark/pull/37630 I used a different heuristics to disable merging of aggregates with > different filter conditions. If the conditions contain any partitioning or bucketing columns then aggregates are not merged.

@peter-toth Could you tell me more? I can't found the treat for partitioning or bucketing columns.

beliefer avatar Aug 02 '23 08:08 beliefer

BTW, in my #37630 I used a different heuristics to disable merging of aggregates with > different filter conditions. If the conditions contain any partitioning or bucketing columns then aggregates are not merged.

@peter-toth Could you tell me more? I can't found the treat for partitioning or bucketing columns.

That heuristics made the whole PR complex. You can follow the logic of ScanCheck object and the case case (CHECKING, FileSourceScanPlan(_, newScan), FileSourceScanPlan(_, cachedScan)) => (https://github.com/apache/spark/pull/37630/files#diff-3d3aa853c51c01216a7f7307219544a48e8c14fabe1817850e58739208bc406aR277-R290) in tryMergePlans(). That case actually peeks into the physical plan to check if only pushed-down data filters differ (partitioning and bucketing filters do match).

BTW, I'm not saying that it is the right heuristics to decide if we should merge aggregates with different filters, it is just the one I was able to come up with... Anyways, expecting highly selective predicates seems a bit counter intuitive to me. And as I mentioned I'm also fine with disabling the feature with a config by default and let the users enable it for some of queries that benefit from it. If we decide to go that way it would simplify a lot my https://github.com/apache/spark/pull/37630 PR as well.

peter-toth avatar Aug 02 '23 09:08 peter-toth

@beliefer, I've updated my PR with comments to ellaborate on how it handles Filters: https://github.com/apache/spark/pull/37630/files#diff-3d3aa853c51c01216a7f7307219544a48e8c14fabe1817850e58739208bc406aR351-R524

peter-toth avatar Aug 02 '23 11:08 peter-toth

         case (CHECKING, FileSourceScanPlan(_, newScan), FileSourceScanPlan(_, cachedScan)) =>
           val (newScanToCompare, cachedScanToCompare) =
             if (conf.getConf(SQLConf.PLAN_MERGE_IGNORE_PUSHED_PUSHED_DATA_FILTERS)) {
               (newScan.copy(dataFilters = Seq.empty), cachedScan.copy(dataFilters = Seq.empty))
             } else {
               (newScan, cachedScan)
             }
           if (newScanToCompare.canonicalized == cachedScanToCompare.canonicalized) {
             // Physical plan is mergeable, but we still need to finish the logical merge to
             // propagate the filters
             tryMergePlans(newPlan, cachedPlan, DONE)
           } else {
             None
           }

I think the above code is not needed. Generally, we concat the predicates with OR, the origin filters still could be pushed down to file sources.

beliefer avatar Aug 02 '23 12:08 beliefer

         case (CHECKING, FileSourceScanPlan(_, newScan), FileSourceScanPlan(_, cachedScan)) =>
           val (newScanToCompare, cachedScanToCompare) =
             if (conf.getConf(SQLConf.PLAN_MERGE_IGNORE_PUSHED_PUSHED_DATA_FILTERS)) {
               (newScan.copy(dataFilters = Seq.empty), cachedScan.copy(dataFilters = Seq.empty))
             } else {
               (newScan, cachedScan)
             }
           if (newScanToCompare.canonicalized == cachedScanToCompare.canonicalized) {
             // Physical plan is mergeable, but we still need to finish the logical merge to
             // propagate the filters
             tryMergePlans(newPlan, cachedPlan, DONE)
           } else {
             None
           }

I think the above code is not needed. Generally, we concat the predicates with OR, the origin filters still could be pushed down to file sources.

Please comment on the other PR regarding the code of that PR. But my point is that it doesn't matter how the filter looks like (is it an OR condition or not). I enabled merging if only FileSourceScanExec.dataFilters differ between the 2 scans. If FileSourceScanExec.partitionFilters or FileSourceScanExec.optionalBucketSet differ then merging is disabled because partitioning and bucketing filters can be more selective in terms what files to scan...

peter-toth avatar Aug 02 '23 12:08 peter-toth

But my point is that it doesn't matter how the filter looks like (is it an OR condition or not). I enabled merging if only FileSourceScanExec.dataFilters differ between the 2 scans. If FileSourceScanExec.partitionFilters or FileSourceScanExec.optionalBucketSet differ then merging is disabled because partitioning and bucketing filters can be more selective in terms what files to scan...

In theory, whether it is data filters or partition filters, there is a possibility of data overlap when connected filters with or. Before merge the filters (e.g. p = 1, p = 2), assume each partition have one file, so we need to read two partition files. After merge the two filters, we still need to read two partition files. I think the overhead of scan partition files is the same. the different is the filter need to calculates more. e.g. p = 1 also need to treat the data come from p = 2. So, personally, I think the overhead of calculate is similar, no matter which filter is.

The main reason for filter merging is the amount of overlapping data. For example, F1 obtains 100 rows of data, and F2 obtains 50 rows of data. If the 100 rows and 50 rows completely overlap, this is the best situation. F1 on Aggregate1 still processes 100 rows of data, while F2 on Aggregate2 processes an additional 50 rows, resulting in a total of 100 rows of data. The worst case scenario is that the two do not overlap at all. So F1 on Aggregate1 needs to process an additional 50 rows, a total of 150 rows; F2 on Aggregate2 processes an additional 100 rows, totaling 150 rows.

beliefer avatar Aug 03 '23 09:08 beliefer

But my point is that it doesn't matter how the filter looks like (is it an OR condition or not). I enabled merging if only FileSourceScanExec.dataFilters differ between the 2 scans. If FileSourceScanExec.partitionFilters or FileSourceScanExec.optionalBucketSet differ then merging is disabled because partitioning and bucketing filters can be more selective in terms what files to scan...

In theory, whether it is data filters or partition filters, there is a possibility of data overlap when connected filters with or. Before merge the filters (e.g. p = 1, p = 2), assume each partition have one file, so we need to read two partition files. After merge the two filters, we still need to read two partition files. I think the overhead of scan partition files is the same. the different is the filter need to calculates more. e.g. p = 1 also need to treat the data come from p = 2. So, personally, I think the overhead of calculate is similar, no matter which filter is.

The main reason for filter merging is the amount of overlapping data. For example, F1 obtains 100 rows of data, and F2 obtains 50 rows of data. If the 100 rows and 50 rows completely overlap, this is the best situation. F1 on Aggregate1 still processes 100 rows of data, while F2 on Aggregate2 processes an additional 50 rows, resulting in a total of 100 rows of data. The worst case scenario is that the two do not overlap at all. So F1 on Aggregate1 needs to process an additional 50 rows, a total of 150 rows; F2 on Aggregate2 processes an additional 100 rows, totaling 150 rows.

Sorry @beliefer, I didn't explain all the reasoning behind my heuristics in https://github.com/apache/spark/pull/37630. I've updated https://github.com/apache/spark/pull/42223#discussion_r1282023520, please see the details there.

peter-toth avatar Aug 03 '23 10:08 peter-toth

For merging func1(...) ... WHERE cond1 and func2(...) ... WHERE cond2, we got

func1(...) FILTER cond1, func2(...) FILTER cond2 ... WHERE cond1 OR cond2

Assuming there is no overlapped scan (so almost no benefit), and the table has N rows. Previously, both cond1 and cond2 get evaluated at most N times (the scan gets prunned). If they are partition filters, then they are evaluated 0 times. Now, they get evaluated at most 2N times. It's likely more than 2 times than before as we prune less data from the scan. The worst case is partition filters. Before, they get evaluated 0 times, now they get evaluated (numRows matching cond1 OR cond2) times, plus some extra evaluation in the aggregate filter. I don't think putting the predicates in a Project helps as the problem is from scan prunning.

Given that it's hard to estimate the overlapped scan size, the only idea I can think of is to estimate the cost of the predicate. We only do the merge if the predicate is cheap to evaluate: only contains simple comparison and the expression tree size is small.

We can define some patterns when the overlapped scan size must be large, e.g. one side has no filter, or the filters in two aggregates are the same. For these cases, we can always apply the merge.

cloud-fan avatar Aug 04 '23 03:08 cloud-fan

I don't think putting the predicates in a Project helps as the problem is from scan prunning.

Sorry, my first comment about the extra project (https://github.com/apache/spark/pull/42223#discussion_r1283307075) was confusing. I got that argument, and I agreed with you (https://github.com/apache/spark/pull/42223#discussion_r1283361954).

But if we decided to merge the queries (based on any heuristics) then the extra project can help to avoid evaluating the filters 2 times in the merged query and actually can decrease data to be shuffled for the aggregation: https://github.com/apache/spark/pull/42223#discussion_r1284057348. IMO the extra project beween the filter and scan won't prevent the merged condition to be pushed-down to the scan so i don't see any drawbacks of it.

Given that it's hard to estimate the overlapped scan size, the only idea I can think of is to estimate the cost of the predicate. We only do the merge if the predicate is cheap to evaluate: only contains simple comparison and the expression tree size is small.

That makes sense to me and I don't have any better idea. Although I still think we can and should check if any filters are pushed down to scans in the original queries. If there are no pushed-down filters in any of the queries or pushed-down filters fully match then we are safe to merge as scans fully overlap. If there is a non-matching pushed-down filter then we can use the suggestged "expression is cheap" heuristics. (By "pushed-down filter" I mean all partition, bucket and data filters in the scan.)

peter-toth avatar Aug 04 '23 08:08 peter-toth

@peter-toth I agree that the extra project can help if we decided to merge. However, the plan pattern becomes complicated. Without the extra project, the merged aggregate is still Aggregate -> Filter -> Scan. We can just define a rule for merging two aggregates, and it can incrementally merge all joined aggregates. With the extra project, we need to define how to merge Aggregate -> Filter -> Project -> Scan + Aggregate -> Filter -> Project -> Scan, or Aggregate -> Filter -> Project -> Scan + Aggregate -> Filter -> Scan.

Also note that, WHERE cond1 OR cond2 can be better than FROM (SELECT cond1 AS b_col1, cond2 AS b_col2) WHERE b_col1 OR b_col2 , because OR has shortcut and cond2 is not always evaluated for each input row. common subexpression elimination might be a better approach here, if we can make it cross the operator boundary (or whole-stage-codegen can only do it?)

cloud-fan avatar Aug 04 '23 09:08 cloud-fan

@peter-toth I agree that the extra project can help if we decided to merge. However, the plan pattern becomes complicated. Without the extra project, the merged aggregate is still Aggregate -> Filter -> Scan. We can just define a rule for merging two aggregates, and it can incrementally merge all joined aggregates. With the extra project, we need to define how to merge Aggregate -> Filter -> Project -> Scan + Aggregate -> Filter -> Project -> Scan, or Aggregate -> Filter -> Project -> Scan + Aggregate -> Filter -> Scan.

Those patterns are already covered in the current code: https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala#L235-L255 and not touched in my aggregate merge PR: https://github.com/apache/spark/pull/37630/files#diff-3d3aa853c51c01216a7f7307219544a48e8c14fabe1817850e58739208bc406aR292-R318

Also note that, WHERE cond1 OR cond2 can be better than FROM (SELECT cond1 AS b_col1, cond2 AS b_col2) WHERE b_col1 OR b_col2 , because OR has shortcut and cond2 is not always evaluated for each input row. common subexpression elimination might be a better approach here, if we can make it cross the operator boundary (or whole-stage-codegen can only do it?)

~I'm nost sure WHERE cond1 OR cond2 is better because up in the aggregate cond2 is always evaluated for all rows.~ ~Sorry, got it now.~

Actually, due to the shortcut cond2 might not be evaluated in the filter node when cond1 is true, but up in the aggregate it will be evaluated anyways, won't it?

peter-toth avatar Aug 04 '23 09:08 peter-toth

@peter-toth Setting aside the previous discussion, after all, the current implementation is the simplest pattern, which is also an advantage that cannot be ignored. If adding a project can bring performance improvement, we can follow up and improve.

beliefer avatar Aug 04 '23 13:08 beliefer

@peter-toth Setting aside the previous discussion, after all, the current implementation is the simplest pattern, which is also an advantage that cannot be ignored. If adding a project can bring performance improvement, we can follow up and improve.

If we want to push this PR for some reason I'm not against it if the above mentioned correctness issue gets fixed (https://github.com/apache/spark/pull/42223#discussion_r1279051263). But honestly, I don't like the idea that we duplicate the code of MergeScalarSubqueries.tryMergePlans() with some modifications into this new rule just because we don't want to fix agregate merging there. IMO if we all are on the same page about the discussed solution in https://github.com/apache/spark/pull/42223#issuecomment-1665208447 then we should update MergeScalarSubqueries.tryMergePlans() (in https://github.com/apache/spark/pull/37630) and use the updated common merging logic in this PR.

peter-toth avatar Aug 04 '23 13:08 peter-toth

If we want to push this PR for some reason I'm not against it if the above mentioned correctness issue gets fixed (https://github.com/apache/spark/pull/42223#discussion_r1279051263).

Fixed.

But honestly, I don't like the idea that we duplicate the code of MergeScalarSubqueries.tryMergePlans() with some modifications into this new rule just because we don't want to fix agregate merging there. IMO if we all are on the same page about the discussed solution in https://github.com/apache/spark/pull/42223#issuecomment-1665208447 then we should update MergeScalarSubqueries.tryMergePlans() (in https://github.com/apache/spark/pull/37630) and use the updated common merging logic in this PR.

I'm also don't like the idea as you said. But it seems the code is different between this PR and https://github.com/apache/spark/pull/37630. Maybe we should advance these two PRs separately until a shared opportunity is discovered, and then reconstruct both.

beliefer avatar Aug 05 '23 06:08 beliefer

But honestly, I don't like the idea that we duplicate the code of MergeScalarSubqueries.tryMergePlans() with some modifications into this new rule just because we don't want to fix agregate merging there. IMO if we all are on the same page about the discussed solution in #42223 (comment) then we should update MergeScalarSubqueries.tryMergePlans() (in #37630) and use the updated common merging logic in this PR.

I'm also don't like the idea as you said. But it seems the code is different between this PR and #37630. Maybe we should advance these two PRs separately until a shared opportunity is discovered, and then reconstruct both.

IMO the codes are very similar and the basic algorightm is identical. Both methods are just traversing the plan and comparing nodes if/how they can be merged. I'm off for a week, but once returned I can update https://github.com/apache/spark/pull/37630 with the discussed and maybe remove the physical scan equality check part. Although that part is useful as in some cases we don't need to use any heuristics to allow merging (https://github.com/apache/spark/pull/42223#issuecomment-1665208447), it makes my PR complex. Maybe we can add it back later...

BTW this PR still doesn't use the "condition expression cost is low" heuristics that we discussed. The current "isLikelySelective" doesn't make sense (https://github.com/apache/spark/pull/42223#discussion_r1281846237) and has nothing to do with expression cost.

peter-toth avatar Aug 05 '23 12:08 peter-toth

I'm not sure if we can reuse https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala#L235-L255 directly. If the Project is not generated by this optimization, it might contain other expensive expressions and we should stop merging.

Maybe a better idea is to do a post-hoc update to extract filters to a Project.

cloud-fan avatar Aug 06 '23 05:08 cloud-fan

Although that part is useful as in some cases we don't need to use any heuristics to allow merging (https://github.com/apache/spark/pull/42223#issuecomment-1665208447), it makes my PR complex. Maybe we can add it > back later...

I agree.

BTW this PR still doesn't use the "condition expression cost is low" heuristics that we discussed. The current "isLikelySelective" doesn't make sense (https://github.com/apache/spark/pull/42223#discussion_r1281846237) and has nothing to do with expression cost.

Yes. I have finished it.

beliefer avatar Aug 07 '23 02:08 beliefer

ping @cloud-fan @peter-toth Could we continue to carry the work a step forward ?

beliefer avatar Aug 22 '23 10:08 beliefer

Sorry, last week was a bit hectic for me. I've already put together a change to https://github.com/apache/spark/pull/37630 to make it simpler and contain the discussed, but haven't got time to update the PR. I will do it soon.

peter-toth avatar Aug 22 '23 10:08 peter-toth

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

github-actions[bot] avatar Jan 16 '24 00:01 github-actions[bot]

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

github-actions[bot] avatar Apr 26 '24 00:04 github-actions[bot]

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

github-actions[bot] avatar Aug 07 '24 00:08 github-actions[bot]