datafusion icon indicating copy to clipboard operation
datafusion copied to clipboard

Introduce expr builder for aggregate function

Open jayzhan211 opened this issue 1 year ago • 5 comments

Which issue does this PR close?

Closes #10545.

Rationale for this change

After this PR, there are two kinds of expression API. One is the normal one like count(expr), convenient for expression that has few arguments. Another is builder mode, useful for expressions that expects multiple kind of arguments, like first_value_builder(expr).order_by().build()

What changes are included in this PR?

Are these changes tested?

Are there any user-facing changes?

jayzhan211 avatar May 17 '24 13:05 jayzhan211

I plan to add distinct macro in the follow up PR for count.

jayzhan211 avatar May 17 '24 13:05 jayzhan211

This looks super cool @jayzhan211

alamb avatar May 17 '24 20:05 alamb

THanks @jayzhan211 -- I will plan to review this tomorrow.

alamb avatar May 22 '24 00:05 alamb

which means both adding many new functions as well as that they won't work for user defined aggregate functions

I think they can build the function with macro in function-aggregate, so they can have their function with ExprBuilder extension. But then I need to move the macro to datafusion-expr 🤔

maybe make an extension trait

It could be a good idea

jayzhan211 avatar May 23 '24 01:05 jayzhan211

Maybe we can just extend function for AggregateFunction 🤔

pub fn call(&self, args: Vec<Expr>) -> AggregateFunction

impl AggregateFunction {
  fn build() -> Expr
}

jayzhan211 avatar May 23 '24 11:05 jayzhan211

This is really cool @jayzhan211 -- other than some documentation and examples I think this looks great

I left some other comments inline

One thing that I think might be confusing with this API is that this would not error:

use AggregateUDFExprBuilder;
let expr = col("a")
  .order_by(col("b"))

I wonder if we could make it looks more like this:

let expr = col("a")
  .order_by(col("b"))
  .build()?

To have a change to throw an error

So a more full featured example would look like

// FIRST_VALUE(ORDER BY b FILTER c > 1)
let expr = first_value("a")
  .order_by(col("b"))
  .filter(col("c").gt(lit(1)) // note no ? here
  .build()? // only ? here

I'm not sure if we really need the error check since we don't have any now

    pub fn new_udf(
        udf: Arc<crate::AggregateUDF>,
        args: Vec<Expr>,
        distinct: bool,
        filter: Option<Box<Expr>>,
        order_by: Option<Vec<Expr>>,
        null_treatment: Option<NullTreatment>,
    ) -> Self {
        Self {
            func_def: AggregateFunctionDefinition::UDF(udf),
            args,
            distinct,
            filter,
            order_by,
            null_treatment,
        }
    }

What should we do in build() -> Result<Expr>?

jayzhan211 avatar Jun 06 '24 22:06 jayzhan211

I'm not sure if we really need the error check since we don't have any now

I guess what I was thinking is that the reason there is no check now is because the rust compiler will ensure the types are correct (specifically that they are AggregateUDF.

With the builder API as it is written now, you can call order_by on any arbitrary Expr (not just the appropriate Expr::AggregateFunction variant)

So I was imagining if any of the methods had been called on an invalid Expr, build() would return an error

Maybe something like

/// Traits to help build aggregate functions
trait AggregateExt {
    // return a builder
    fn order_by(self, order_by: Vec<Expr>) -> AggregateBuilder;
    fn filter(self, filter: Box<Expr>) -> AggregateBuilder;
    fn null_treatment(self, null_treatment: NullTreatment) -> AggregateBuilder;
    fn distinct(self) -> AggregateBuilder;
}

pub struct AggregateBuilder {
  // if set to none, builder errors
  udf: Option<Arc<AggregateUdf>>,
  order_by: Vec<Expr>,
....
}

impl AggregateBuilder {
    fn order_by(self, order_by: Vec<Expr>) -> AggregateBuilder {
      self.order_by = order_by;
      self
    }
    fn filter(self, filter: Box<Expr>) -> AggregateBuilder {..}
    fn null_treatment(self, null_treatment: NullTreatment) -> AggregateBuilder {..}
    fn distinct(self) -> AggregateBuilder {..}
   // builds up any in progress aggregate
    fn build(self) -> Result<Expr> {
      let Some(udf) = self.udf else {
        return plan_err!("Expr of type XXX is not an aggregate")
      } 
      udf.order_by = self.order_by;
      ...
      Ok(Expr::AggregateFunction(udf))
  }
}

impl AggregateExt for Expr {
    fn order_by(self, order_by: Vec<Expr>) -> AggregateBuilder {
        match self {
            Expr::AggregateFunction(mut udaf) => {
                AggregateBuilder { udf: Some(udaf) }
            }
            // wrong type passed -- error when build is called
            _ => {
                AggregateBuilder { udf: None }
            }
        }
    }

alamb avatar Jun 07 '24 10:06 alamb

I'm not sure if we really need the error check since we don't have any now

I guess what I was thinking is that the reason there is no check now is because the rust compiler will ensure the types are correct (specifically that they are AggregateUDF.

With the builder API as it is written now, you can call order_by on any arbitrary Expr (not just the appropriate Expr::AggregateFunction variant)

So I was imagining if any of the methods had been called on an invalid Expr, build() would return an error

Maybe something like

/// Traits to help build aggregate functions
trait AggregateExt {
    // return a builder
    fn order_by(self, order_by: Vec<Expr>) -> AggregateBuilder;
    fn filter(self, filter: Box<Expr>) -> AggregateBuilder;
    fn null_treatment(self, null_treatment: NullTreatment) -> AggregateBuilder;
    fn distinct(self) -> AggregateBuilder;
}

pub struct AggregateBuilder {
  // if set to none, builder errors
  udf: Option<Arc<AggregateUdf>>,
  order_by: Vec<Expr>,
....
}

impl AggregateBuilder {
    fn order_by(self, order_by: Vec<Expr>) -> AggregateBuilder {
      self.order_by = order_by;
      self
    }
    fn filter(self, filter: Box<Expr>) -> AggregateBuilder {..}
    fn null_treatment(self, null_treatment: NullTreatment) -> AggregateBuilder {..}
    fn distinct(self) -> AggregateBuilder {..}
   // builds up any in progress aggregate
    fn build(self) -> Result<Expr> {
      let Some(udf) = self.udf else {
        return plan_err!("Expr of type XXX is not an aggregate")
      } 
      udf.order_by = self.order_by;
      ...
      Ok(Expr::AggregateFunction(udf))
  }
}

impl AggregateExt for Expr {
    fn order_by(self, order_by: Vec<Expr>) -> AggregateBuilder {
        match self {
            Expr::AggregateFunction(mut udaf) => {
                AggregateBuilder { udf: Some(udaf) }
            }
            // wrong type passed -- error when build is called
            _ => {
                AggregateBuilder { udf: None }
            }
        }
    }

I'm not sure is it worth to introduce build() just for handling non Expr::AggregateFunction cases 🤔

jayzhan211 avatar Jun 07 '24 12:06 jayzhan211

I have some ideas about additional comments / documentation that I would be happy to help add

Sure!

jayzhan211 avatar Jun 07 '24 13:06 jayzhan211

I have some ideas about additional comments / documentation that I would be happy to help add

Sure!

Thank you for your patience @jayzhan211 -- I just pushed a bunch of docs and tests (and a small API refinement):

  1. Consolidated the example into the expr_api.rs examples
  2. Simplified the api for filter from filter(Box<Expr>) to just filter(Expr)
  3. Added documentation and examples to the trait
  4. Added tests
  5. Checked for SortExprs in order_by

alamb avatar Jun 08 '24 18:06 alamb

It looks pretty nice now! Thanks @alamb

jayzhan211 avatar Jun 09 '24 05:06 jayzhan211