cudf icon indicating copy to clipboard operation
cudf copied to clipboard

[BUG]: Incorrect result `group_by().mean()` with experimental streaming executor, multiple partitions, and missing values

Open TomAugspurger opened this issue 7 months ago • 1 comments

Describe the bug

The test python/cudf_polars/tests/test_groupby.py::test_groupby[no_maintain_order-col("key1")-col("uint16_with_null").sum()-col("uint16_with_null").mean().alias("mean")] fails with a small blocksize. The issue seems to be related to how we aggregate missing values.

Steps/Code to reproduce bug

import polars as pl
from cudf_polars.testing.asserts import assert_gpu_result_equal

df = pl.LazyFrame(
        {
            "key1": [1, 1, 1, 1],
            "uint16_with_null": pl.Series(
                [1, None, 2, None], dtype=pl.UInt16()
            ),
        }
    )

q = df.group_by("key1").agg(pl.col("uint16_with_null").mean())
assert_gpu_result_equal(q, engine=pl.GPUEngine(executor="streaming", executor_options={"max_rows_per_partition": 3}))

Fails with

AssertionError: DataFrames are different (value mismatch for column 'uint16_with_null')
[left]:  [1.5]
[right]: [0.75]

Expected behavior

No error.

TomAugspurger avatar Jun 12 '25 21:06 TomAugspurger

I think the sum is correct, but we are counting the wrong number of entries because mean is implemented by polars/single-partition cudf as sum(valid_values) / sum(n_valid_values), but in the multipartitioned case we're implementing it as sum(valid_values) / sum(n_values).

Maybe like this:

diff --git a/python/cudf_polars/cudf_polars/experimental/groupby.py b/python/cudf_polars/cudf_polars/experimental/groupby.py
index d6ca06db2d..bf32b0befa 100644
--- a/python/cudf_polars/cudf_polars/experimental/groupby.py
+++ b/python/cudf_polars/cudf_polars/experimental/groupby.py
@@ -107,7 +107,7 @@ def decompose(
                     Agg(dtype, "sum", None, child),
                     names=names,
                 ),
-                decompose(f"{next(names)}__mean_count", Len(dtype), names=names),
+                decompose(f"{next(names)}__mean_count", Agg(plc.DataType(plc.TypeId.INT32), "count", False, child), names=names),
             )
             selection = NamedExpr(
                 name,

wence- avatar Jun 13 '25 16:06 wence-