datafusion icon indicating copy to clipboard operation
datafusion copied to clipboard

feat: Implement quantile_cont()/quantile_disc() aggregate functions

Open 2010YOUY01 opened this issue 2 years ago • 6 comments

Which issue does this PR close?

NA

Rationale for this change

This PR would like to support quantile_cont() and quantile_disc() statistical aggregate functions.

❯ select quantile_cont(column1, 0.3) from (values (0), (50), (100));
+-------------------------------------+
| QUANTILE_CONT(column1,Float64(0.3)) |
+-------------------------------------+
| 30                                  |
+-------------------------------------+
1 row in set. Query took 0.010 seconds.

It will sort column1 in ASC order and calculate its 30% percentile. It has been implemented in DuckDB: https://duckdb.org/docs/sql/aggregates.html#statistical-aggregates

The more common equivalent syntax is:

SELECT percentile_cont(0.3) WITHIN GROUP (ORDER BY column1)
FROM test_scores;

this within group syntax is available in PostgreSQL, Spark, etc. https://www.postgresql.org/docs/9.5/functions-aggregate.html

What changes are included in this PR?

Reuse the original Accumulator in median() aggregate function (moved from median.rs->percentile.rs) to implement quantile_*() functions. Now the PercentileAccumulator is used in median/quantile_disc/quantile_cont aggregate functions.

I think this is one of the trickier aggregate functions: regular aggregate functions only maintain O(1) internal states inside Accumulator (e.g. AvgAccumulator only have to keep sum and count), while PercentileAccumulator have to keep a O(n) internal state. It will cache all data during aggregation, and sort the data and fetch the target percentile at final evaluation. This PR only extends the functionality to support new aggregate functions, but the underlying Accumulator is worth taking a look at for optimizations in the future.

Are these changes tested?

sqllogictests

Are there any user-facing changes?

No

2010YOUY01 avatar Aug 18 '23 21:08 2010YOUY01

I plan to review this PR today

alamb avatar Aug 22 '23 09:08 alamb

Thank you @2010YOUY01 . This PR, as all your others, is well written, documented and tested and is easy to read and understand. Thank you so much.

Sorting

I started looking at this implementation and I agree with you that trying to store all the values for the group is unlikely to work well once the data gets to a certain size

Thank you for this very detailed review feedback!

This order by approach is what I was thinking about, will try to do this approach. I'm trying to figure out how to do this query rewrite, do you remember any reference PR that did a query rewrite similar to this order by rewrite?

2010YOUY01 avatar Aug 22 '23 23:08 2010YOUY01

Does this belong in Datafusion core? Or does it belong as an add on?

With this level of specialization required, I wonder where shall we stop adding built in aggregate functions and where will we begin adding add on packages. I worry there is currently no agreed upon boundary yet.

Do you have time to think about breaking these more specialized aggregates into a separate crate (it could be in the datafusion repo), something like #7110 ?

Though how to separate existing functions requires further discussion, I think it is necessary to build an interface to put a set of functions into a separate crate. Besides reducing binary size, it can also make the extension management for UDFs more organized and user friendly. Will think about this issue later and see if there is any good way to do it

2010YOUY01 avatar Aug 23 '23 00:08 2010YOUY01

I'm trying to figure out how to do this query rewrite, do you remember any reference PR that did a query rewrite similar to this order by rewrite?

Perhaps https://github.com/apache/arrow-datafusion/blob/main/datafusion/optimizer/src/rewrite_disjunctive_predicate.rs or https://github.com/apache/arrow-datafusion/blob/main/datafusion/optimizer/src/unwrap_cast_in_comparison.rs would serve as good inspirations

Though how to separate existing functions requires further discussion, I think it is necessary to build an interface to put a set of functions into a separate crate. Besides reducing binary size, it can also make the extension management for UDFs more organized and user friendly. Will think about this issue later and see if there is any good way to do it

Thank you

alamb avatar Aug 23 '23 17:08 alamb

https://github.com/apache/arrow-datafusion/pull/7376 did several smart optimizations for median() For example a O(n) quick select in the final evaluate step for aggregation

For select median(l_partkey) from lineitem using sf10 parquet TPCH data: Before -- ~20s After -- ~4s Use multi-core sorting -- estimated ~2s

Now multi-core sorting approach seems unnecessary, however, the above query only spends ~1% of time doing quick select, and most time is spent doing data type conversion/copying I'll experiment if there is any way to make median() faster before finishing this PR

2010YOUY01 avatar Sep 18 '23 18:09 2010YOUY01

Thank you for your contribution. Unfortunately, this pull request is stale because it has been open 60 days with no activity. Please remove the stale label or comment or this will be closed in 7 days.

github-actions[bot] avatar May 04 '24 01:05 github-actions[bot]