polars icon indicating copy to clipboard operation
polars copied to clipboard

max_exprs not working with apply_multiple in 26.1

Open AnatolyBuga opened this issue 2 years ago • 9 comments

Polars version checks

  • [X] I have checked that this issue has not already been reported.

  • [X] I have confirmed this bug exists on the latest version of Polars.

Issue description

When Input(s) of an AggregationContext is AggState::AggregatedFlat- then issues all kinds of issues pop up.

Reproducible example

#[test]
/// This test is for an expr whose input has already been aggregated
/// double input case
fn test_agg_max_exprs_double() -> PolarsResult<()> {
    let df = df![
            "rf" => ["App", "App", "Gg", "App"],
            "x" => ["Hey", "There", "Ante", "R"],
        ].unwrap();
    let grp = df.lazy()
        .groupby_stable([col("rf")]);
    let e = max_exprs(&[aggregated_flat(), aggregated_flat()]);
    let lf = grp
        .agg([e]);

    let res = lf.collect()?;
    assert_eq!(res.shape(), (2, 2));


    fn aggregated_flat() -> Expr {
        apply_multiple(
            move |_| {
                        // Some heavy compute logic here
                        Ok(Some(Series::new("res", [1.])))
            },
            &[
                col("x")
            ],
            GetOutput::from_type(DataType::Float64),
            true
        )
    }

    Ok(())
}
#[test]
/// This test is for an expr whose input has already been aggregated
/// single input case
fn test_agg_max_exprs_single() -> PolarsResult<()> {
    let df = df![
            "rf" => ["App", "App", "Gg", "App"],
            "x" => ["Hey", "There", "Ante", "R"],
        ].unwrap();
    let grp = df.lazy()
        .groupby_stable([col("rf")]);
    let e = max_exprs(&[aggregated_flat()]);
    let lf = grp
        .agg([e]);

    let res = lf.collect()?;
    assert_eq!(res.shape(), (2, 2));


    fn aggregated_flat() -> Expr {
        apply_multiple(
            move |_| {
                        // Some heavy compute logic here
                        Ok(Some(Series::new("res", [1.])))
            },
            &[
                col("x")
            ],
            GetOutput::from_type(DataType::Float64),
            true
        )
    }

    Ok(())
}

Expected behavior

Expecting no errors

Installed versions

Polars 26.1 lazy

AnatolyBuga avatar Jan 08 '23 10:01 AnatolyBuga

Yes, it should be used. It should explode an aggregation returning a series to the inner type.

ritchie46 avatar Feb 04 '23 15:02 ritchie46

  1. let mut ac = self.inputs[0].evaluate_on_groups(df, groups, state)?; of polars\polars-lazy\src\physical_plan\expressions\apply.rs , here input is apply with returns_scalar = True and therefore is AggState::AggregatedFlat.

As such, on this line this code gets executed:

if matches!(ac.agg_state(), AggState::AggregatedFlat(_)) {
    let msg = format!(
        "Cannot aggregate {:?}. The column is already aggregated.",
         self.expr
    );
    return Err(expression_err!(msg, self.expr, ComputeError));
}

It makes sence though, if AggregationContext has already been aggregated, to just return it:

if matches!(ac.agg_state(), AggState::AggregatedFlat(_)) {
    return Ok(ac)
}

This doesn't seem to cause anything to break.

AnatolyBuga avatar Feb 10 '23 20:02 AnatolyBuga

@ritchie46

  1. Is harder. let len = iters[0].size_hint().0; returns 0 and therefore this code gets executed, which seems to be incorrect (correct me if I am wrong):
if len == 0 {
                            let out = Series::full_null(field.name(), 0, &field.dtype);

                            drop(iters);
                            // take the first aggregation context that as that is the input series
                            let mut ac = acs.swap_remove(0);
                            ac.with_series(out, true);
                            return Ok(ac);
                        }

It doesn't seem expected to me, given that iters[0].item is a non empty UnstableSeries. But I might be wrong.

Would it make sence to impl size_hint for FlatIter like you already have for LitIter? ie

fn size_hint(&self) -> (usize, Option<usize>) {
        (self.len, Some(self.len))
    }

AnatolyBuga avatar Feb 10 '23 21:02 AnatolyBuga

Are you sure the aggregated_flat is correct? I don't really understand how that return value is valid. I am doubting if we should have apply_multiple as public as you can easily set it to an invalid state.

Are you able to reproduce the issue in python?

ritchie46 avatar Feb 11 '23 19:02 ritchie46

Thanks @ritchie46 ,

Are you sure the aggregated_flat is correct? I don't really understand how that return value is valid. I am doubting if we should have apply_multiple as public as you can easily set it to an invalid state.

apply_multiple is absolutely great, please don't make it private :D I agree that output_type and returns_scalar need to be set correctly to make it work, but it's just a matter of documenting it well, and for the caller to use it appropriately.

The reason apply_multiple with returns_scalar is great is that if you have a logic where for each group you'd return a single number (or a single len Series) it allows you do that. (my real exmaple would be boring, but a bold example - imagine something like computing matrix determinant(which is a single number) for each group independently).

This hopefully should give you an understanding of what I am trying to mimic in aggregated_flat. It really is there just to return an AggregatedFlat Expr .

So what I was trying to do is to make an AggregationContext where an input is of kind AggregatedFlat work.

Are you able to reproduce the issue in python?

In this case I am using pure rust, I am sure it would be the same issue in python (since it will end up executing the same rust code anyway). But one of python tests(test_error_on_double_agg) is checks that something like that should return an error:

lf
.groupby("a")
 .agg([pl.col("b").min().max()])

^With my workaround the second Expr (.max()) would be ignored pretty much.

AnatolyBuga avatar Feb 11 '23 19:02 AnatolyBuga

On mobile now. But shouldn't aggregated_flat not return a length equal to the groups?

Currently it returns a single valued series, where there are 2 groups. (It might be that it doesn't matter)

So if I understand correctly you want to first aggregate to an aggregated series (aggregate_flat) and then use max_exprs? What is the utility of that later max_exprs?

ritchie46 avatar Feb 11 '23 19:02 ritchie46

@ritchie46 , no worries, it's Saturday :)

On mobile now. But shouldn't aggregated_flat not return a length equal to the groups? Currently it returns a single valued series, where there are 2 groups. (It might be that it doesn't matter)

aggregated_flat with returns_scalar = false should return lengths equal to length of the groups, correct.

But, my understanding, is that when returns_scalar = true then it expects a Series of len 1 (ie an aggregated series). Right? That's because it is equal to the field auto_explode of AnonymousFunction (which then gets mapped to AggState::AggregatedFlat).

Or did I get all that wrong :/ ?

So if I understand correctly you want to first aggregate to an aggregated series (aggregate_flat) and then use max_exprs? What is the utility of that later max_exprs?

That max_exprs is just an exmaple of an Expr which takes an input, and in this example due to above, the input happens to be already AggregatedFlat. Like imagine if I had max_exprs([aggregated_flat_1(), aggregated_flat_2()]) where aggregated_flat_1 would return 1 and second one would return 2. (again bold examples, but just to illustrate).

AnatolyBuga avatar Feb 11 '23 20:02 AnatolyBuga

Right. Thanks for the explanation. I have to check the code for this to verify. I come back on this. Have to think on it a bit. Have a good night! :)

ritchie46 avatar Feb 11 '23 20:02 ritchie46

Many thanks, much appreciated! :)

AnatolyBuga avatar Feb 11 '23 20:02 AnatolyBuga