The `limit` info lost in the AggregateExec when ser/deser the physical plan
Describe the bug
In the definition of AggregateExec, we have the limit argument to optimize the agg operation.
pub struct AggregateExec {
.....
/// Set if the output of this aggregation is truncated by a upstream sort/limit clause
limit: Option<usize>,
.....
}
But it is not added in the .proto file, when we use the proto to ser/deser the agg physical node, the limit will lost.
To Reproduce
No response
Expected behavior
No response
Additional context
add limit option in the proto file, and add implementation for the limit arg in the to_proto and from_proto
cc @alamb I will fix this
my sql is like:
select
LO_SUPPKEY
from
SSB_1G.LINEORDER
GROUP BY
LO_SUPPKEY
limit 20 offset 10
The stand-alone physical plan is:
The agg exec add the limit value in the plan after the rule of LimitedDistinctAggregation, but it does not take effect when we send the physical plan to remote side.
GlobalLimitExec: skip=10, fetch=20
CoalescePartitionsExec
LocalLimitExec: fetch=30
AggregateExec: mode=FinalPartitioned, gby=[LO_SUPPKEY@0 as LO_SUPPKEY], aggr=[], lim=[30]
CoalesceBatchesExec: target_batch_size=8192
RepartitionExec: partitioning=Hash([LO_SUPPKEY@0], 16), input_partitions=16
RepartitionExec: partitioning=RoundRobinBatch(16), input_partitions=2
AggregateExec: mode=Partial, gby=[LO_SUPPKEY@0 as LO_SUPPKEY], aggr=[], lim=[30]
ProjectionExec: expr=[2@0 as LO_SUPPKEY]
ParquetExec: file_groups={2 groups:
But other issue i found in the AggregateExec, when we push the limit to the agg exec and will select the
if let Some(limit) = self.limit {
warn!("agg exec: {}", self.is_unordered_unfiltered_group_by_distinct());
if !self.is_unordered_unfiltered_group_by_distinct() {
warn!("agg exec: create GroupedPriorityQueue");
return Ok(StreamType::GroupedPriorityQueue(
GroupedTopKAggregateStream::new(self, context, partition, limit)?,
));
}
}
GroupedTopKAggregateStream.
The implementation of GroupedTopKAggregateStream get the right result for the SQL, but the efficiency is not good, because we don't care about the order and don't need to consume all of downstream data
But other issue i found in the
AggregateExec, when we push the limit to the agg exec and will select theif let Some(limit) = self.limit { warn!("agg exec: {}", self.is_unordered_unfiltered_group_by_distinct()); if !self.is_unordered_unfiltered_group_by_distinct() { warn!("agg exec: create GroupedPriorityQueue"); return Ok(StreamType::GroupedPriorityQueue( GroupedTopKAggregateStream::new(self, context, partition, limit)?, )); } }
GroupedTopKAggregateStream.The implementation of
GroupedTopKAggregateStreamget the right result for the SQL, but the efficiency is not good, because we don't care about the order and don't need to consume all of downstream data
In our sql:
select
LO_SUPPKEY
from
SSB_1G.LINEORDER
GROUP BY
LO_SUPPKEY
limit 20 offset 10
There is no sort/order and agg expression cause, we don't need to use the GroupedTopKAggregateStream struct to get the result. The GroupedTopKAggregateStream is not efficient for the SQL.
The GroupedTopKAggregateStream will consume all of the data and use the PriorityQueue to store and sort all data
GroupedTopKAggregateStream
I believe @avantgardnerio / @thinkharderdev worked on this code, as part of https://github.com/apache/datafusion/pull/7192
I agree it is strange to be using that operator given a SQL without an aggregate. I thought it only kicked in if there was a MIN or MAX aggregate 🤔
GroupedTopKAggregateStream
I believe @avantgardnerio / @thinkharderdev worked on this code, as part of #7192
I agree it is strange to be using that operator given a SQL without an aggregate. I thought it only kicked in if there was a
MINorMAXaggregate 🤔
I'm surprised this is being invoked, because:
// ensure the sort direction matches aggregate function
let (field, desc) = aggr.get_minmax_desc()?;
if desc != order.options.descending {
return None;
}
and
/// Finds the DataType and SortDirection for this Aggregate, if there is one
pub fn get_minmax_desc(&self) -> Option<(Field, bool)> {
let agg_expr = self.aggr_expr.as_slice().first()?;
if let Some(max) = agg_expr.as_any().downcast_ref::<Max>() {
Some((max.field().ok()?, true))
} else if let Some(min) = agg_expr.as_any().downcast_ref::<Min>() {
Some((min.field().ok()?, false))
} else {
None
}
}
GroupedTopKAggregateStream
I believe @avantgardnerio / @thinkharderdev worked on this code, as part of #7192
I agree it is strange to be using that operator given a SQL without an aggregate. I thought it only kicked in if there was a
MINorMAXaggregate 🤔
it's normal, if just want to get the distinct value for one or several column, and using the bellow sql:
select distinct column from table limit n
select column from table group by column limit n
The usages is from our cases, customer want to get the distinct value
I think pr https://github.com/apache/datafusion/pull/7192 introduced the top_k agg with the priority queue in the AggregateExec , and it is used to optimize the case like bellow pattern:
select column, min(xx) from table group by column order by min(xx)
But in the pr https://github.com/apache/datafusion/pull/8038 introduced the new rule of push limit for distinct column which use the is_unordered_unfiltered_group_by_distinct to check the condition without the sort condition in the plan. This rule is used to optimize the case like:
select distinct column from table
select column from table group by column
I will file a pr to resolve the ser/deser issue.