spark
spark copied to clipboard
[SPARK-49261][SQL] Don't replace literals in aggregate expressions with group-by expressions
What changes were proposed in this pull request?
Before this PR, RewriteDistinctAggregates
could potentially replace literals in the aggregate expressions with output attributes from the Expand
operator. This can occur when a group-by expression is a literal that happens by chance to match a literal used in an aggregate expression. E.g.:
create or replace temp view v1(a, b, c) as values
(1, 1.001d, 2), (2, 3.001d, 4), (2, 3.001, 4);
cache table v1;
select
round(sum(b), 6) as sum1,
count(distinct a) as count1,
count(distinct c) as count2
from (
select
6 as gb,
*
from v1
)
group by a, gb;
In the optimized plan, you can see that the literal 6 in the round
function invocation has been patched with an output attribute (6#163) from the Expand
operator:
== Optimized Logical Plan ==
'Aggregate [a#123, 6#163], [round(first(sum(__auto_generated_subquery_name.b)#167, true) FILTER (WHERE (gid#162 = 0)), 6#163) AS sum1#114, count(__auto_generated_subquery_name.a#164) FILTER (WHERE (gid#162 = 1)) AS count1#115L, count(__auto_generated_subquery_name.c#165) FILTER (WHERE (gid#162 = 2)) AS count2#116L]
+- Aggregate [a#123, 6#163, __auto_generated_subquery_name.a#164, __auto_generated_subquery_name.c#165, gid#162], [a#123, 6#163, __auto_generated_subquery_name.a#164, __auto_generated_subquery_name.c#165, gid#162, sum(__auto_generated_subquery_name.b#166) AS sum(__auto_generated_subquery_name.b)#167]
+- Expand [[a#123, 6, null, null, 0, b#124], [a#123, 6, a#123, null, 1, null], [a#123, 6, null, c#125, 2, null]], [a#123, 6#163, __auto_generated_subquery_name.a#164, __auto_generated_subquery_name.c#165, gid#162, __auto_generated_subquery_name.b#166]
+- InMemoryRelation [a#123, b#124, c#125], StorageLevel(disk, memory, deserialized, 1 replicas)
+- LocalTableScan [a#6, b#7, c#8]
This is because the literal 6 was used in the group-by expressions (referred to as gb in the query, and renamed 6#163 in the Expand
operator's output attributes).
After this PR, foldable expressions in the aggregate expressions are kept as-is.
Why are the changes needed?
Some expressions require a foldable argument. In the above example, the round
function requires a foldable expression as the scale argument. Because the scale argument is patched with an attribute, RoundBase#checkInputDataTypes
returns an error, which leaves the Aggregate
operator unresolved:
[INTERNAL_ERROR] Invalid call to dataType on unresolved object SQLSTATE: XX000
org.apache.spark.sql.catalyst.analysis.UnresolvedException: [INTERNAL_ERROR] Invalid call to dataType on unresolved object SQLSTATE: XX000
at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(unresolved.scala:255)
at org.apache.spark.sql.catalyst.types.DataTypeUtils$.$anonfun$fromAttributes$1(DataTypeUtils.scala:241)
at scala.collection.immutable.List.map(List.scala:247)
at scala.collection.immutable.List.map(List.scala:79)
at org.apache.spark.sql.catalyst.types.DataTypeUtils$.fromAttributes(DataTypeUtils.scala:241)
at org.apache.spark.sql.catalyst.plans.QueryPlan.schema$lzycompute(QueryPlan.scala:428)
at org.apache.spark.sql.catalyst.plans.QueryPlan.schema(QueryPlan.scala:428)
at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:474)
...
Does this PR introduce any user-facing change?
No.
How was this patch tested?
New tests.
Was this patch authored or co-authored using generative AI tooling?
No.