datafusion icon indicating copy to clipboard operation
datafusion copied to clipboard

Make it easier to create WindowFunctions with the Expr API

Open alamb opened this issue 2 years ago • 16 comments

Is your feature request related to a problem or challenge?

Follow on to #5781

There are at least three things named WindowFunction in DataFusion -- Expr::WindowFunction, window_function::WindowFunction and expr::WindowFunction

https://docs.rs/datafusion-expr/26.0.0/datafusion_expr/index.html?search=WindowFunction

Constructing an Expr::WindowFunction to pass to LogicalPlanBuilder::window is quite challenging

Describe the solution you'd like

I would like to make this process easier with a builder style:

for lead(foo) OVER(PARTITION BY bar) for example:

let expr = lead(col("foo"))
  .with_partition_by(col("bar"))

Describe alternatives you've considered

No response

Additional context

No response

alamb avatar Jun 22 '23 17:06 alamb

Here is another example from https://github.com/apache/datafusion/pull/10345 / @timsaucer showing how non easy it is to create a window function via the expr API

use datafusion::{logical_expr::{expr::WindowFunction, BuiltInWindowFunction, WindowFrame, WindowFunctionDefinition}, prelude::*};

#[tokio::main]
async fn main() -> datafusion::error::Result<()> {

    let ctx = SessionContext::new();
    let mut df = ctx.read_csv("/Users/tsaucer/working/testing_ballista/lead_lag/example.csv", CsvReadOptions::default()).await?;

    df = df.with_column("array_col", make_array(vec![col("a"), col("b"), col("c")]))?;

    df.clone().show().await?;

    let lag_expr = Expr::WindowFunction(WindowFunction::new(
        WindowFunctionDefinition::BuiltInWindowFunction(
            BuiltInWindowFunction::Lead,
        ),
        vec![col("array_col")],
        vec![],
        vec![],
        WindowFrame::new(None),
        None,
    ));

    df = df.select(vec![col("a"), col("b"), col("c"), col("array_col"), lag_expr.alias("lagged")])?;

    df.show().await?;

    Ok(())
}

It would be great if instead of

    let lag_expr = Expr::WindowFunction(WindowFunction::new(
        WindowFunctionDefinition::BuiltInWindowFunction(
            BuiltInWindowFunction::Lead,
        ),
        vec![col("array_col")],
        vec![],
        vec![],
        WindowFrame::new(None),
        None,
    ));

It looked more like

    let lag_expr = lead(
        vec![col("array_col")],
        vec![],
        vec![],
        WindowFrame::new(None),
        None,
    ));

Maybe even better like a builder style

    let lag_expr = lead(col("array_col")).build()

Which would permit adding the various OVER clauses like

    let lag_expr = lead(col("array_col"))
      .partition_by(vec![])
      .order_by(vec![])
      .build()

Maybe there are some inspirations in the polars API too: https://docs.pola.rs/user-guide/expressions/window/#group-by-aggregations-in-selection

alamb avatar May 02 '24 11:05 alamb

🤔 it seems like spark's API is like

count("dt").over(w).alias("count")).show()

https://stackoverflow.com/questions/32769328/how-to-use-window-functions-in-pyspark-using-dataframes

So maybe for DataFusion it could look like

   let w = Window::new()
     .partition_by(col("id"))
     .order_by(col("dt"));

    let lag_expr = lag(col("array_col"))
       .over(w)

alamb avatar May 02 '24 11:05 alamb

Note I have some code in https://github.com/apache/datafusion/pull/6746 that had some part of it (along with an example)

alamb avatar May 02 '24 11:05 alamb

I am willing to help with this task.

shanretoo avatar May 18 '24 14:05 shanretoo

Great! I've rebased @alamb 's branch and added the changes I suggested. I was about to start testing the code and then I was going to write up the unit tests. My work in progress is here: https://github.com/timsaucer/datafusion/tree/feature/easier_window_funcs There was a little bit of changes I needed to make around the null_options. I got distracted by a task in the datafusion-python repo but I was hoping to tackle this very soon.

timsaucer avatar May 18 '24 15:05 timsaucer

Thanks for your update! I'll work on the tests.

shanretoo avatar May 19 '24 00:05 shanretoo

FYI, my work is in: https://github.com/shanretoo/datafusion/tree/feat-window-fn

shanretoo avatar May 19 '24 00:05 shanretoo

@timsaucer I have fixed the calls of expr::WindowFunction to meet the changes and add tests for those window functions in dataframe_functions.rs. Let me know if I missed anything.

shanretoo avatar May 21 '24 15:05 shanretoo

Oh, great. Have you been able to run the example code above using the new easy interface?

timsaucer avatar May 21 '24 16:05 timsaucer

You can check it in the unit test: test_fn_lead.

shanretoo avatar May 22 '24 01:05 shanretoo

Thank you. I pulled your branch and many of the tests are failing for me even though the functions are returning correct values when I add additional debug statements. I think what's happening here is that because we have the partition_by there is no guarantee what order the results come back as. On my machine the unit tests are returning the partitions on column C in order 10 then 1. I'm guessing on yours it was the opposite.

There are a couple of things I think we can do to resolve this. One way would be to make a new macro for testing these partitioned functions. I could do something like

macro_rules! assert_sorted_fn_batches {
    ($EXPR:expr, $EXPECTED: expr, $SORTBY: expr) => {
        let df = create_test_table().await?;
        let df = df.select($EXPR)?.sort($SORTBY)?.limit(0, Some(10))?;
        let batches = df.collect().await?;

        assert_batches_eq!($EXPECTED, &batches);
    };
}

And then the lead function test would become


async fn test_fn_lead() -> Result<()> {

    let expr = lead(col("b"), Some(1), Some(ScalarValue::Int32(Some(-1))))
        .with_partition_by(vec![col("c")])
        .with_order_by(vec![col("b").sort(true, true)])
        .build()
        .alias("lead_b");

    let expected = [
        "+----+--------+",
        "| c  | lead_b |",
        "+----+--------+",
        "| 1  | 10     |",
        "| 1  | 10     |",
        "| 1  | -1     |",
        "| 10 | -1     |",
        "+----+--------+",
    ];

    let select_expr = vec![col("c"), expr];
    let sort_by = vec![col("c").sort(true, true)];

    assert_sorted_fn_batches!(select_expr, expected, sort_by);

    Ok(())
}

I've added an alias just because I think it makes the test more readable. If we wanted to get really explicit we could also output column A, and sort by columns A and C then we would have guaranteed the correctness because each row would be unique.

timsaucer avatar May 22 '24 12:05 timsaucer

The one thing I think we're missing is the other variants for these. I don't think it's covered in other unit tests that I can find. So for example, for lead we would want to validate:

  • that setting a different shift offset works as expected
  • That setting no shift offset default works (this can be the basic test)
  • Setting no default value gives nulls
  • With and without partition_by
  • Testing with and without order_by (required by some window functions, so only test with)
  • Testing with and without null treatment - I'm not sure which of the functions this impacts
  • Testing with and without window frames

What do you think? I might try to write a macro around all these variants.

I'm now unblocked on the other task I was working on, so I can pick it up if you'd like or I'm happy to work on other things. Please let me know.

timsaucer avatar May 22 '24 12:05 timsaucer

Sorry, my fault. I haven't taken into account the ordering issue. Maybe we could add a following match arm in the macro to omit the order_by parameter and add the output column A to ensure the correctness. What do you think?

macro_rules! assert_sorted_fn_batches {
    ($EXPR:expr, $EXPECTED: expr) => {
        let sort_by = $EXPR
            .iter()
            .map(|e| {
                let alias = e.name_for_alias().expect("failed to get an alias");
                col(alias).sort(true, true)
            })
            .collect::<Vec<_>>();
        assert_sorted_fn_batches!($EXPR, $EXPECTED, sort_by);
    };

shanretoo avatar May 22 '24 15:05 shanretoo

Have you checked tests in sqllogictest? If we want to make sure all the variants work as expected, I think we should add those tests in sqllogictest. And for the unit tests here, we can just check the situations that might have different results, for example, those default values we set in the builder functions. What do you think?

You can take over this and I'm happy to help when needed.

shanretoo avatar May 22 '24 15:05 shanretoo

I think you're doing a great job, and good point on the sqllogictest. TBH I find those tests harder to wrap my head around than the rust tests, but that's more personal preference.

About the test function, I realize we can probably make it simpler:

macro_rules! assert_unordered_fn_batches {
    ($EXPRS:expr, $EXPECTED: expr) => {
        let df = create_test_table().await?;
        let df = df.select($EXPRS)?.limit(0, Some(10))?;
        let batches = df.collect().await?;

        assert_batches_sorted_eq!($EXPECTED, &batches);
    };
}
#[tokio::test]
async fn test_fn_lead() -> Result<()> {

    let expr = lead(col("b"), Some(1), Some(ScalarValue::Int32(Some(-1))))
        .with_partition_by(vec![col("c")])
        .with_order_by(vec![col("b").sort(true, true)])
        .build()
        .alias("lead_b");

    let expected = [
        "+-----------+----+--------+",
        "| a         | c  | lead_b |",
        "+-----------+----+--------+",
        "| 123AbcDef | 10 | -1     |",
        "| CBAdef    | 1  | -1     |",
        "| abc123    | 1  | 10     |",
        "| abcDEF    | 1  | 10     |",
        "+-----------+----+--------+",
    ];

    let select_expr = vec![col("a"), col("c"), expr];

    assert_unordered_fn_batches!(select_expr, expected);

    Ok(())
}

What do you think?

timsaucer avatar May 22 '24 15:05 timsaucer

Looks good. It is clearer to understand the results in this way.

shanretoo avatar May 23 '24 00:05 shanretoo

Update here is that @jayzhan211 and I have been working on a similar API for creating Aggregate exprs on https://github.com/apache/datafusion/pull/10560. I am quite pleased with how it worked out. Perhaps we can follow a similar model for the window functions

alamb avatar Jun 08 '24 18:06 alamb

In case anyone is following along, @jayzhan211 added a really nice trait for working with aggregate functions. Maybe we can do something similar for window functions eventually

https://github.com/apache/datafusion/blob/e693ed7a3c3b36405f0a34887e6f8b49d4e97152/datafusion/expr/src/udaf.rs#L614-L653

alamb avatar Jul 07 '24 11:07 alamb

I've started looking at this and coming up against one blocker that prevents just following the exact pattern.

My first thought was to implement a trait like

pub trait WindowExt {
    fn order_by(self, order_by: Vec<Expr>) -> WindowBuilder;
    fn partition_by(self, partitions: Vec<Expr>) -> WindowBuilder;
    fn window_frame(self, window_frame: WindowFrame) -> WindowBuilder;
    fn null_treatment(self, null_treatment: NullTreatment) -> WindowBuilder;
}

The problem with this is that we would have two traits implemented on Expr that have two identical function names, order_by and null_treatment. I could give them different names, but that isn't a great user experience. Plus there's the fact that all aggregate functions can be used as window functions.

My current thinking is that instead of doing this, I should rename AggregateExt to something like ExprExt. This trait would have something like

pub trait ExprExt {
    fn order_by(self, order_by: Vec<Expr>) -> ExprBuilder;
    fn filter(self, filter: Expr) -> ExprBuilder;
    fn distinct(self) -> ExprBuilder;
    fn partition_by(self, partitions: Vec<Expr>) -> ExprBuilder;
    fn window_frame(self, window_frame: WindowFrame) -> ExprBuilder;
    fn null_treatment(self, null_treatment: NullTreatment) -> ExprBuilder;
}

Then the ExprBuilder would do something like

pub struct ExprBuilder {
    expr_data: ExprBuilderData,
    order_by: Option<Vec<Expr>>,
    filter: Option<Expr>,
    distinct: bool,
    null_treatment: Option<NullTreatment>,
}

And finally

enum ExprBuilderData {
    AggregateBuilderData(AggregateFunction),
    WindowBuilderData(WindowFunction),
}

(case statement remove from original comment)

I haven't dug too much deeper into it, but these are my initial design ideas. @jayzhan211 and @alamb what do you think?

timsaucer avatar Jul 18 '24 11:07 timsaucer

I'm also wondering if instead of carrying the data around in the builder, we can just update the member within expr_data as we go. That is, initialize it to defaults and update the values as the builder is called.

timsaucer avatar Jul 18 '24 11:07 timsaucer

The overall idea looks good to me, but I'm not sure about the Case you mentioned, is there any Expr that is also possible be benefited from builder mode?

My current idea is specific to function only, but if there is other non-function expr, we could extend it like ExprExt, ExprBuidler with the related methods they need.

pub trait FunctionExt {
    /// Add `ORDER BY <order_by>`
    ///
    /// Note: `order_by` must be [`Expr::Sort`]
    fn order_by(self, order_by: Vec<Expr>) -> AggregateBuilder;
    /// Add `FILTER <filter>`
    fn filter(self, filter: Expr) -> AggregateBuilder;
    /// Add `DISTINCT`
    fn distinct(self) -> AggregateBuilder;
    /// Add `RESPECT NULLS` or `IGNORE NULLS`
    fn null_treatment(self, null_treatment: NullTreatment) -> AggregateBuilder;

    fn partiion_by:
    fn window_frame:
    ...
}

pub struct FuncBuilder {
    fun: Option<FuncKind>,
    order_by: Option<Vec<Expr>>,
    filter: Option<Expr>,
    distinct: bool,
    null_treatment: Option<NullTreatment>,
    partition_by,
    window_frame,
    ....
}

pub enum FuncKind {
    Aggregate(AggregateFunction),
    Window(WindowFunction),
}

jayzhan211 avatar Jul 18 '24 11:07 jayzhan211

Good point. I was just thinking about it and came on here to remove that from my comment! So we are well aligned. Great suggestions. I'll move forward on working on this tomorrow.

timsaucer avatar Jul 18 '24 11:07 timsaucer

I started a new branch off main with these changes. Tomorrow I'll review the previous branch @shanretoo was working on to make sure I didn't miss any unit tests he added. Otherwise all of the cargo tests pass for me with this change.

timsaucer avatar Jul 19 '24 13:07 timsaucer