spark icon indicating copy to clipboard operation
spark copied to clipboard

[SPARK-49261][SQL] Don't replace literals in aggregate expressions with group-by expressions

Open bersprockets opened this issue 5 months ago • 1 comments

What changes were proposed in this pull request?

Before this PR, RewriteDistinctAggregates could potentially replace literals in the aggregate expressions with output attributes from the Expand operator. This can occur when a group-by expression is a literal that happens by chance to match a literal used in an aggregate expression. E.g.:

create or replace temp view v1(a, b, c) as values
(1, 1.001d, 2), (2, 3.001d, 4), (2, 3.001, 4);

cache table v1;

select
  round(sum(b), 6) as sum1,
  count(distinct a) as count1,
  count(distinct c) as count2
from (
  select
    6 as gb,
    *
  from v1
)
group by a, gb;

In the optimized plan, you can see that the literal 6 in the round function invocation has been patched with an output attribute (6#163) from the Expand operator:

== Optimized Logical Plan ==
'Aggregate [a#123, 6#163], [round(first(sum(__auto_generated_subquery_name.b)#167, true) FILTER (WHERE (gid#162 = 0)), 6#163) AS sum1#114, count(__auto_generated_subquery_name.a#164) FILTER (WHERE (gid#162 = 1)) AS count1#115L, count(__auto_generated_subquery_name.c#165) FILTER (WHERE (gid#162 = 2)) AS count2#116L]
+- Aggregate [a#123, 6#163, __auto_generated_subquery_name.a#164, __auto_generated_subquery_name.c#165, gid#162], [a#123, 6#163, __auto_generated_subquery_name.a#164, __auto_generated_subquery_name.c#165, gid#162, sum(__auto_generated_subquery_name.b#166) AS sum(__auto_generated_subquery_name.b)#167]
   +- Expand [[a#123, 6, null, null, 0, b#124], [a#123, 6, a#123, null, 1, null], [a#123, 6, null, c#125, 2, null]], [a#123, 6#163, __auto_generated_subquery_name.a#164, __auto_generated_subquery_name.c#165, gid#162, __auto_generated_subquery_name.b#166]
      +- InMemoryRelation [a#123, b#124, c#125], StorageLevel(disk, memory, deserialized, 1 replicas)
            +- LocalTableScan [a#6, b#7, c#8]

This is because the literal 6 was used in the group-by expressions (referred to as gb in the query, and renamed 6#163 in the Expand operator's output attributes).

After this PR, foldable expressions in the aggregate expressions are kept as-is.

Why are the changes needed?

Some expressions require a foldable argument. In the above example, the round function requires a foldable expression as the scale argument. Because the scale argument is patched with an attribute, RoundBase#checkInputDataTypes returns an error, which leaves the Aggregate operator unresolved:

[INTERNAL_ERROR] Invalid call to dataType on unresolved object SQLSTATE: XX000
org.apache.spark.sql.catalyst.analysis.UnresolvedException: [INTERNAL_ERROR] Invalid call to dataType on unresolved object SQLSTATE: XX000
	at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(unresolved.scala:255)
	at org.apache.spark.sql.catalyst.types.DataTypeUtils$.$anonfun$fromAttributes$1(DataTypeUtils.scala:241)
	at scala.collection.immutable.List.map(List.scala:247)
	at scala.collection.immutable.List.map(List.scala:79)
	at org.apache.spark.sql.catalyst.types.DataTypeUtils$.fromAttributes(DataTypeUtils.scala:241)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.schema$lzycompute(QueryPlan.scala:428)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.schema(QueryPlan.scala:428)
	at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:474)
        ...

Does this PR introduce any user-facing change?

No.

How was this patch tested?

New tests.

Was this patch authored or co-authored using generative AI tooling?

No.

bersprockets avatar Aug 26 '24 19:08 bersprockets