datafusion icon indicating copy to clipboard operation
datafusion copied to clipboard

Support ORDER BY in AggregateUDF

Open alamb opened this issue 1 year ago • 6 comments

Is your feature request related to a problem or challenge?

Some built in aggregates (such as FIRST_VALUE, LAST_VALUE and ARRAY_AGG) support an optional ORDER BY argument that defines the order they see their input. For example:

❯ create table foo(x int, y int) as values (1, 100),(2, 100),(0, 200);
0 rows in set. Query took 0.003 seconds.

-- note the `ORDER BY x` in the argument to `FIRST_VALUE`
❯ select FIRST_VALUE(x ORDER BY x) from foo GROUP BY y;
+--------------------+
| FIRST_VALUE(foo.x) |
+--------------------+
| 1                  |
| 0                  |
+--------------------+
2 rows in set. Query took 0.008 seconds.

This is not supported today in user defined aggregates

Describe the solution you'd like

I would like to be be able to create a user defined aggregate that can specify its input order.

This would roughly require:

  1. Extending the AggregateUDFImpl trait to communicate the ordering somehow .
  2. Updating the implementation of https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.AggregateExpr.html#method.order_bys
  3. writing an end to end test in https://github.com/apache/arrow-datafusion/blob/main/datafusion/core/tests/user_defined/user_defined_aggregates.rs showing it all working

Here are some other places that likely need to changed https://github.com/apache/arrow-datafusion/blob/b5db7187763bc4511aaffdd6d89b2f0908f17938/datafusion/core/src/physical_planner.rs#L242-L252

https://github.com/apache/arrow-datafusion/blob/b5db7187763bc4511aaffdd6d89b2f0908f17938/datafusion/core/src/physical_planner.rs#L1663-L1690

Maybe looking at how OrderSensitiveArrayAgg is implemented can help https://github.com/apache/arrow-datafusion/blob/5d70c32a9a4accf21e9f27ff5ed62666cbbcbe54/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs#L45

Describe alternatives you've considered

No response

Additional context

No response

alamb avatar Jan 24 '24 18:01 alamb

This is a neat project to work on, I think largely it would be an API design question

alamb avatar Jan 24 '24 18:01 alamb

Interested in this

jayzhan211 avatar Jan 25 '24 02:01 jayzhan211

I think we also need to update order_by in sql_function_to_expr https://github.com/apache/arrow-datafusion/blob/8a4bad46540598c6acdf432bde08c2a4c76c5039/datafusion/sql/src/expr/function.rs#L166-L172

jayzhan211 avatar Jan 26 '24 09:01 jayzhan211

@alamb I'm trying to add ordering_req in AggregateUDFImpl. But LexOrdering is physical_expr, importing it causes cyclic dependency.

    /// Returns the lexical ordering requirements of the aggregate expression.
    fn ordering_req(&self) -> &LexOrdering {
        &self.ordering_req
    }

I'm testing with FirstValueAccumulator which has argument ordering_req: LexOrdering. Both OrderSensitiveArrayAgg and FirstValue includes ordering_req: LexOrdering, so it seems reasonable to me that we can also have LexOrdering for UDF. However, I'm not sure how can I have LexOrdering for AggregateUDFImpl without cyclic dependency.

If ordering_req return with Vec<datafusion_expr::Expr> instead of LexOrdering, then UDF caller needs to convert Expr::Sort to LexOrdering by themself inside their accumulator like FirstValueUDFAccumulator::new(data_type: &DataType, ordering_req: Option<Vec<Expr>>), not sure does it make sense

jayzhan211 avatar Jan 28 '24 03:01 jayzhan211

It seems that #8793 has the similar issue https://github.com/apache/arrow-datafusion/issues/8793#issuecomment-1888724220. But I'm not sure can I also move trait PhysicalExpr and struct PhysicalSortExpr to datafusion_expr.

jayzhan211 avatar Jan 29 '24 06:01 jayzhan211

so it seems reasonable to me that we can also have LexOrdering for UDF. However, I'm not sure how can I have LexOrdering for AggregateUDFImpl without cyclic dependency.

Maybe we can have the AggregateUDF declare its needed ordering in terms of Exprs and then translate to LexOrdering as part of physical planning

Perhaps we can follow the model of

    pub file_sort_order: Vec<Vec<Expr>>,

In ListingOptions: https://docs.rs/datafusion/latest/datafusion/datasource/listing/struct.ListingOptions.html

alamb avatar Jan 29 '24 12:01 alamb

@alamb Do you know the rationale of AccumulatorFactoryFunction, why do we need a closure like argument for accumulator, but not directly passing Accumulator around (with Arc<dyn Accumulator> or Box<dyn Accumulator>)? Can we create Accumulator directly without any given arguments?

For example

        let accumulator: AccumulatorFactoryFunction =
            Arc::new(move |_| Ok(Box::new(Self::new(Arc::clone(&captured_state)))));

        // Directly construct accumulator and pass around it.
        let accumulator = Box::new(Self::new(Arc::clone(&captured_state)));
   or  let accumulator = Arc::new(Self::new(Arc::clone(&captured_state))); 

Rewrite to

    /// Old

    /// Return a new [`Accumulator`] that aggregates values for a specific
    /// group during query execution.
    fn accumulator(&self, arg: &DataType) -> Result<Box<dyn Accumulator>>;

    /// New: no arguments, and replace box with Arc to have share ownership
     fn accumulator(&self) -> Result<Arc<dyn Accumulator>>;

The current AccumulatorFactoryFunction requires given arguments, which is not flexible for arbitrary arguments for UDAF cases. I'm finding the way that do not require given arguments but get the accumulator directly

jayzhan211 avatar Mar 16 '24 03:03 jayzhan211

Do you know the rationale of AccumulatorFactoryFunction, why do we need a closure like argument for accumulator, but not directly passing Accumulator around (with Arc<dyn Accumulator> or Box<dyn Accumulator>)? Can we create Accumulator directly without any given arguments?

One thing that may be related is that the hash aggregator creates an Accumulator per group (not per aggregate function in the query).

And each Accumulator stores state for a particular group (so a single Accumulator instance can't be shared across groups, for example)

alamb avatar Mar 16 '24 11:03 alamb

Complete with #9874

jayzhan211 avatar Apr 05 '24 02:04 jayzhan211