datafusion icon indicating copy to clipboard operation
datafusion copied to clipboard

The `limit` info lost in the AggregateExec when ser/deser the physical plan

Open liukun4515 opened this issue 1 year ago • 6 comments

Describe the bug

In the definition of AggregateExec, we have the limit argument to optimize the agg operation.

pub struct AggregateExec {
     .....
    /// Set if the output of this aggregation is truncated by a upstream sort/limit clause
    limit: Option<usize>,
    .....
}

But it is not added in the .proto file, when we use the proto to ser/deser the agg physical node, the limit will lost.

To Reproduce

No response

Expected behavior

No response

Additional context

add limit option in the proto file, and add implementation for the limit arg in the to_proto and from_proto

liukun4515 avatar May 23 '24 03:05 liukun4515

cc @alamb I will fix this

liukun4515 avatar May 23 '24 03:05 liukun4515

my sql is like:

select
  LO_SUPPKEY
from
  SSB_1G.LINEORDER
GROUP BY
  LO_SUPPKEY
limit 20  offset 10

The stand-alone physical plan is: The agg exec add the limit value in the plan after the rule of LimitedDistinctAggregation, but it does not take effect when we send the physical plan to remote side.

GlobalLimitExec: skip=10, fetch=20
  CoalescePartitionsExec
    LocalLimitExec: fetch=30
      AggregateExec: mode=FinalPartitioned, gby=[LO_SUPPKEY@0 as LO_SUPPKEY], aggr=[], lim=[30]
        CoalesceBatchesExec: target_batch_size=8192
          RepartitionExec: partitioning=Hash([LO_SUPPKEY@0], 16), input_partitions=16
            RepartitionExec: partitioning=RoundRobinBatch(16), input_partitions=2
              AggregateExec: mode=Partial, gby=[LO_SUPPKEY@0 as LO_SUPPKEY], aggr=[], lim=[30]
                ProjectionExec: expr=[2@0 as LO_SUPPKEY]
                  ParquetExec: file_groups={2 groups:

liukun4515 avatar May 23 '24 05:05 liukun4515

But other issue i found in the AggregateExec, when we push the limit to the agg exec and will select the

        if let Some(limit) = self.limit {
            warn!("agg exec: {}", self.is_unordered_unfiltered_group_by_distinct());
            if !self.is_unordered_unfiltered_group_by_distinct() {
                warn!("agg exec: create GroupedPriorityQueue");
                return Ok(StreamType::GroupedPriorityQueue(
                    GroupedTopKAggregateStream::new(self, context, partition, limit)?,
                ));
            }
        }

GroupedTopKAggregateStream.

The implementation of GroupedTopKAggregateStream get the right result for the SQL, but the efficiency is not good, because we don't care about the order and don't need to consume all of downstream data

liukun4515 avatar May 23 '24 06:05 liukun4515

But other issue i found in the AggregateExec, when we push the limit to the agg exec and will select the

        if let Some(limit) = self.limit {
            warn!("agg exec: {}", self.is_unordered_unfiltered_group_by_distinct());
            if !self.is_unordered_unfiltered_group_by_distinct() {
                warn!("agg exec: create GroupedPriorityQueue");
                return Ok(StreamType::GroupedPriorityQueue(
                    GroupedTopKAggregateStream::new(self, context, partition, limit)?,
                ));
            }
        }

GroupedTopKAggregateStream.

The implementation of GroupedTopKAggregateStream get the right result for the SQL, but the efficiency is not good, because we don't care about the order and don't need to consume all of downstream data

In our sql:

select
  LO_SUPPKEY
from
  SSB_1G.LINEORDER
GROUP BY
  LO_SUPPKEY
limit 20  offset 10

There is no sort/order and agg expression cause, we don't need to use the GroupedTopKAggregateStream struct to get the result. The GroupedTopKAggregateStream is not efficient for the SQL.

The GroupedTopKAggregateStream will consume all of the data and use the PriorityQueue to store and sort all data

liukun4515 avatar May 23 '24 09:05 liukun4515

GroupedTopKAggregateStream

I believe @avantgardnerio / @thinkharderdev worked on this code, as part of https://github.com/apache/datafusion/pull/7192

I agree it is strange to be using that operator given a SQL without an aggregate. I thought it only kicked in if there was a MIN or MAX aggregate 🤔

alamb avatar May 23 '24 09:05 alamb

GroupedTopKAggregateStream

I believe @avantgardnerio / @thinkharderdev worked on this code, as part of #7192

I agree it is strange to be using that operator given a SQL without an aggregate. I thought it only kicked in if there was a MIN or MAX aggregate 🤔

I'm surprised this is being invoked, because:

        // ensure the sort direction matches aggregate function
        let (field, desc) = aggr.get_minmax_desc()?;
        if desc != order.options.descending {
            return None;
        }

and

    /// Finds the DataType and SortDirection for this Aggregate, if there is one
    pub fn get_minmax_desc(&self) -> Option<(Field, bool)> {
        let agg_expr = self.aggr_expr.as_slice().first()?;
        if let Some(max) = agg_expr.as_any().downcast_ref::<Max>() {
            Some((max.field().ok()?, true))
        } else if let Some(min) = agg_expr.as_any().downcast_ref::<Min>() {
            Some((min.field().ok()?, false))
        } else {
            None
        }
    }

avantgardnerio avatar May 23 '24 12:05 avantgardnerio

GroupedTopKAggregateStream

I believe @avantgardnerio / @thinkharderdev worked on this code, as part of #7192

I agree it is strange to be using that operator given a SQL without an aggregate. I thought it only kicked in if there was a MIN or MAX aggregate 🤔

it's normal, if just want to get the distinct value for one or several column, and using the bellow sql:

select distinct column from table limit n

select column from table group by column limit n

The usages is from our cases, customer want to get the distinct value

liukun4515 avatar May 27 '24 09:05 liukun4515

I think pr https://github.com/apache/datafusion/pull/7192 introduced the top_k agg with the priority queue in the AggregateExec , and it is used to optimize the case like bellow pattern:

select column, min(xx) from table group by column order by min(xx)

But in the pr https://github.com/apache/datafusion/pull/8038 introduced the new rule of push limit for distinct column which use the is_unordered_unfiltered_group_by_distinct to check the condition without the sort condition in the plan. This rule is used to optimize the case like:

select distinct column from table
select column from table group by column

liukun4515 avatar May 27 '24 09:05 liukun4515

I will file a pr to resolve the ser/deser issue.

liukun4515 avatar May 27 '24 10:05 liukun4515