cudf icon indicating copy to clipboard operation
cudf copied to clipboard

[BUG]: `KeyError` in `group_by(expression).agg()` with experimental streaming executor, multiple partitions

Open TomAugspurger opened this issue 7 months ago • 2 comments

Describe the bug

The test python/cudf_polars/tests/test_groupby.py::test_groupby[maintain_order-[(col("key1")) == (col("key2"))]-col("int32").sum()] fails with a KeyError when using a small blocksize / multiple partitions. Here's a simplified example

Steps/Code to reproduce bug

import polars as pl

from cudf_polars.dsl.translate import Translator
from cudf_polars.experimental.parallel import evaluate_streaming
from cudf_polars.utils.config import ConfigOptions
from cudf_polars.testing.asserts import assert_gpu_result_equal


df = pl.LazyFrame(
    {
        "key1": [1, 1, 1, 2, 3, 1, 4, 6, 7],
        "key2": [2, 2, 2, 2, 6, 1, 4, 6, 8],
        "int32": pl.Series([1, 2, 3, 4, 5, 6, 7, 8, 9], dtype=pl.Int32()),
   }
)


engine = pl.GPUEngine(executor="streaming", executor_options={"max_rows_per_partition": 3})
q = df.group_by(pl.col("key1") == pl.col("key2")).agg(pl.col("int32").sum())
ir = Translator(q._ldf.visit(), engine=engine).translate_ir()
evaluate_streaming(ir, ConfigOptions.from_polars_engine(engine)).to_polars()

errors with

---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
Cell In[9], line 4
      2 q = df.group_by(pl.col("key1") == pl.col("key2")).agg(pl.col("int32").sum())
      3 ir = Translator(q._ldf.visit(), engine=engine).translate_ir()
----> 4 evaluate_streaming(ir, ConfigOptions.from_polars_engine(engine)).to_polars()

File ~/gh/rapidsai/cudf/python/cudf_polars/cudf_polars/experimental/parallel.py:228, in evaluate_streaming(ir, config_options)
    224 ir, partition_info = lower_ir_graph(ir, config_options)
    226 graph, key = task_graph(ir, partition_info, config_options)
--> 228 return get_scheduler(config_options)(graph, key)

File ~/gh/rapidsai/cudf/python/cudf_polars/cudf_polars/experimental/scheduler.py:149, in synchronous_scheduler(graph, key, cache)
    146 refcount = Counter(chain.from_iterable(dependencies.values()))
    148 for k in toposort(graph, dependencies):
--> 149     cache[k] = _execute_task(graph[k], cache)
    150     for dep in dependencies[k]:
    151         refcount[dep] -= 1

File ~/gh/rapidsai/cudf/python/cudf_polars/cudf_polars/experimental/scheduler.py:51, in _execute_task(arg, cache)
     49 """Execute a compute task."""
     50 if istask(arg):
---> 51     return arg[0](*(_execute_task(a, cache) for a in arg[1:]))
     52 elif is_hashable(arg):
     53     return cache.get(arg, arg)

File ~/gh/rapidsai/cudf/python/cudf_polars/cudf_polars/dsl/ir.py:1411, in GroupBy.do_evaluate(cls, keys_in, agg_requests, maintain_order, zlice, df)
   1400 @classmethod
   1401 @nvtx_annotate_cudf_polars(message="GroupBy")
   1402 def do_evaluate(
   (...)   1408     df: DataFrame,
   1409 ) -> DataFrame:
   1410     """Evaluate and return a dataframe."""
-> 1411     keys = broadcast(*(k.evaluate(df) for k in keys_in), target_length=df.num_rows)
   1412     sorted = (
   1413         plc.types.Sorted.YES
   1414         if all(k.is_sorted for k in keys)
   1415         else plc.types.Sorted.NO
   1416     )
   1417     grouper = plc.groupby.GroupBy(
   1418         plc.Table([k.obj for k in keys]),
   1419         null_handling=plc.types.NullPolicy.INCLUDE,
   (...)   1422         null_precedence=[k.null_order for k in keys],
   1423     )

File ~/gh/rapidsai/cudf/python/cudf_polars/cudf_polars/dsl/ir.py:1411, in <genexpr>(.0)
   1400 @classmethod
   1401 @nvtx_annotate_cudf_polars(message="GroupBy")
   1402 def do_evaluate(
   (...)   1408     df: DataFrame,
   1409 ) -> DataFrame:
   1410     """Evaluate and return a dataframe."""
-> 1411     keys = broadcast(*(k.evaluate(df) for k in keys_in), target_length=df.num_rows)
   1412     sorted = (
   1413         plc.types.Sorted.YES
   1414         if all(k.is_sorted for k in keys)
   1415         else plc.types.Sorted.NO
   1416     )
   1417     grouper = plc.groupby.GroupBy(
   1418         plc.Table([k.obj for k in keys]),
   1419         null_handling=plc.types.NullPolicy.INCLUDE,
   (...)   1422         null_precedence=[k.null_order for k in keys],
   1423     )

File ~/gh/rapidsai/cudf/python/cudf_polars/cudf_polars/dsl/expressions/base.py:200, in NamedExpr.evaluate(self, df, context)
    178 def evaluate(
    179     self, df: DataFrame, *, context: ExecutionContext = ExecutionContext.FRAME
    180 ) -> Column:
    181     """
    182     Evaluate this expression given a dataframe for context.
    183 
   (...)    198     name to a column produced from an expression.
    199     """
--> 200     return self.value.evaluate(df, context=context).rename(self.name)

File ~/gh/rapidsai/cudf/python/cudf_polars/cudf_polars/dsl/expressions/base.py:110, in Expr.evaluate(self, df, context)
     80 def evaluate(
     81     self, df: DataFrame, *, context: ExecutionContext = ExecutionContext.FRAME
     82 ) -> Column:
     83     """
     84     Evaluate this expression given a dataframe for context.
     85 
   (...)    108         are not perfect.
    109     """
--> 110     return self.do_evaluate(df, context=context)

File ~/gh/rapidsai/cudf/python/cudf_polars/cudf_polars/dsl/expressions/binaryop.py:89, in BinOp.do_evaluate(self, df, context)
     85 def do_evaluate(
     86     self, df: DataFrame, *, context: ExecutionContext = ExecutionContext.FRAME
     87 ) -> Column:
     88     """Evaluate this expression given a dataframe for context."""
---> 89     left, right = (child.evaluate(df, context=context) for child in self.children)
     90     lop = left.obj
     91     rop = right.obj

File ~/gh/rapidsai/cudf/python/cudf_polars/cudf_polars/dsl/expressions/binaryop.py:89, in <genexpr>(.0)
     85 def do_evaluate(
     86     self, df: DataFrame, *, context: ExecutionContext = ExecutionContext.FRAME
     87 ) -> Column:
     88     """Evaluate this expression given a dataframe for context."""
---> 89     left, right = (child.evaluate(df, context=context) for child in self.children)
     90     lop = left.obj
     91     rop = right.obj

File ~/gh/rapidsai/cudf/python/cudf_polars/cudf_polars/dsl/expressions/base.py:110, in Expr.evaluate(self, df, context)
     80 def evaluate(
     81     self, df: DataFrame, *, context: ExecutionContext = ExecutionContext.FRAME
     82 ) -> Column:
     83     """
     84     Evaluate this expression given a dataframe for context.
     85 
   (...)    108         are not perfect.
    109     """
--> 110     return self.do_evaluate(df, context=context)

File ~/gh/rapidsai/cudf/python/cudf_polars/cudf_polars/dsl/expressions/base.py:238, in Col.do_evaluate(self, df, context)
    235 """Evaluate this expression given a dataframe for context."""
    236 # Deliberately remove the name here so that we guarantee
    237 # evaluation of the IR produces names.
--> 238 return df.column_map[self.name].rename(None)

KeyError: 'key2'

Expected behavior

No error / match polars.

TomAugspurger avatar Jun 12 '25 21:06 TomAugspurger

What columns are in df.column_map at that point?

wence- avatar Jun 13 '25 15:06 wence-

ipdb> pp df.column_map
{'int32': <cudf_polars.containers.column.Column object at 0x7f77fefe9df0>,
 'key1': <cudf_polars.containers.column.Column object at 0x7f77fefe9e20>}
ipdb>

And here's the graph we're working on (working on ('groupby-2928048646827579712', 0) at the time of the error).

ipdb> pp graph
{('dataframescan--1543724835873278681', 0): (<bound method DataFrameScan.do_evaluate of <class 'cudf_polars.dsl.ir.DataFrameScan'>>,
                                             {'int32': <DataType(polars=Int32, plc=<type_id.INT32: 3>)>,
                                              'key1': <DataType(polars=Int64, plc=<type_id.INT64: 4>)>,
                                              'key2': <DataType(polars=Int64, plc=<type_id.INT64: 4>)>},
                                             shape: (3, 3)
┌──────┬──────┬───────┐
│ key1 ┆ key2 ┆ int32 │
│ ---  ┆ ---  ┆ ---   │
│ i64  ┆ i64  ┆ i32   │
╞══════╪══════╪═══════╡
│ 4    ┆ 4    ┆ 7     │
│ 6    ┆ 6    ┆ 8     │
│ 7    ┆ 8    ┆ 9     │
└──────┴──────┴───────┘,
                                             ('int32', 'key1', 'key2')),
 ('dataframescan--3749640043763656821', 0): (<bound method DataFrameScan.do_evaluate of <class 'cudf_polars.dsl.ir.DataFrameScan'>>,
                                             {'int32': <DataType(polars=Int32, plc=<type_id.INT32: 3>)>,
                                              'key1': <DataType(polars=Int64, plc=<type_id.INT64: 4>)>,
                                              'key2': <DataType(polars=Int64, plc=<type_id.INT64: 4>)>},
                                             shape: (3, 3)
┌──────┬──────┬───────┐
│ key1 ┆ key2 ┆ int32 │
│ ---  ┆ ---  ┆ ---   │
│ i64  ┆ i64  ┆ i32   │
╞══════╪══════╪═══════╡
│ 1    ┆ 2    ┆ 1     │
│ 1    ┆ 2    ┆ 2     │
│ 1    ┆ 2    ┆ 3     │
└──────┴──────┴───────┘,
                                             ('int32', 'key1', 'key2')),
 ('dataframescan-6146510607387301110', 0): (<bound method DataFrameScan.do_evaluate of <class 'cudf_polars.dsl.ir.DataFrameScan'>>,
                                            {'int32': <DataType(polars=Int32, plc=<type_id.INT32: 3>)>,
                                             'key1': <DataType(polars=Int64, plc=<type_id.INT64: 4>)>,
                                             'key2': <DataType(polars=Int64, plc=<type_id.INT64: 4>)>},
                                            shape: (3, 3)
┌──────┬──────┬───────┐
│ key1 ┆ key2 ┆ int32 │
│ ---  ┆ ---  ┆ ---   │
│ i64  ┆ i64  ┆ i32   │
╞══════╪══════╪═══════╡
│ 2    ┆ 2    ┆ 4     │
│ 3    ┆ 6    ┆ 5     │
│ 1    ┆ 1    ┆ 6     │
└──────┴──────┴───────┘,
                                            ('int32', 'key1', 'key2')),
 ('groupby-1302311846582023722', 0): (<bound method GroupBy.do_evaluate of <class 'cudf_polars.dsl.ir.GroupBy'>>,
                                      (NamedExpr(key1, BinOp(<DataType(polars=Boolean, plc=<type_id.BOOL8: 11>)>, <binary_operator.EQUAL: 21>, Col(<DataType(polars=Int64, plc=<type_id.INT64: 4>)>, 'key1'), Col(<DataType(polars=Int64, plc=<type_id.INT64: 4>)>, 'key2'))),),
                                      (NamedExpr(int32, Agg(<DataType(polars=Int32, plc=<type_id.INT32: 3>)>, 'sum', None, Col(<DataType(polars=Int32, plc=<type_id.INT32: 3>)>, 'int32'))),),
                                      False,
                                      None,
                                      ('union-5195194595515093715', 0)),
 ('groupby-1302311846582023722', 1): (<bound method GroupBy.do_evaluate of <class 'cudf_polars.dsl.ir.GroupBy'>>,
                                      (NamedExpr(key1, BinOp(<DataType(polars=Boolean, plc=<type_id.BOOL8: 11>)>, <binary_operator.EQUAL: 21>, Col(<DataType(polars=Int64, plc=<type_id.INT64: 4>)>, 'key1'), Col(<DataType(polars=Int64, plc=<type_id.INT64: 4>)>, 'key2'))),),
                                      (NamedExpr(int32, Agg(<DataType(polars=Int32, plc=<type_id.INT32: 3>)>, 'sum', None, Col(<DataType(polars=Int32, plc=<type_id.INT32: 3>)>, 'int32'))),),
                                      False,
                                      None,
                                      ('union-5195194595515093715', 1)),
 ('groupby-1302311846582023722', 2): (<bound method GroupBy.do_evaluate of <class 'cudf_polars.dsl.ir.GroupBy'>>,
                                      (NamedExpr(key1, BinOp(<DataType(polars=Boolean, plc=<type_id.BOOL8: 11>)>, <binary_operator.EQUAL: 21>, Col(<DataType(polars=Int64, plc=<type_id.INT64: 4>)>, 'key1'), Col(<DataType(polars=Int64, plc=<type_id.INT64: 4>)>, 'key2'))),),
                                      (NamedExpr(int32, Agg(<DataType(polars=Int32, plc=<type_id.INT32: 3>)>, 'sum', None, Col(<DataType(polars=Int32, plc=<type_id.INT32: 3>)>, 'int32'))),),
                                      False,
                                      None,
                                      ('union-5195194595515093715', 2)),
 ('groupby-2928048646827579712', 0): (<bound method GroupBy.do_evaluate of <class 'cudf_polars.dsl.ir.GroupBy'>>,
                                      (NamedExpr(key1, BinOp(<DataType(polars=Boolean, plc=<type_id.BOOL8: 11>)>, <binary_operator.EQUAL: 21>, Col(<DataType(polars=Int64, plc=<type_id.INT64: 4>)>, 'key1'), Col(<DataType(polars=Int64, plc=<type_id.INT64: 4>)>, 'key2'))),),
                                      (NamedExpr(int32, Agg(<DataType(polars=Int32, plc=<type_id.INT32: 3>)>, 'sum', None, Col(<DataType(polars=Int32, plc=<type_id.INT32: 3>)>, 'int32'))),),
                                      False,
                                      None,
                                      ('repartition-4112511186720287685', 0)),
 ('repartition-4112511186720287685', 0): (<function _concat at 0x7f77ff0bde40>,
                                          ('groupby-1302311846582023722', 0),
                                          ('groupby-1302311846582023722', 1),
                                          ('groupby-1302311846582023722', 2)),
 ('select--5712801464197171428', 0): (<bound method Select.do_evaluate of <class 'cudf_polars.dsl.ir.Select'>>,
                                      (NamedExpr(key1, Col(<DataType(polars=Boolean, plc=<type_id.BOOL8: 11>)>, 'key1')),
                                       NamedExpr(int32, UnaryFunction(<DataType(polars=Int32, plc=<type_id.INT32: 3>)>, 'fill_null', (), Cast(<DataType(polars=Int32, plc=<type_id.INT32: 3>)>, Col(<DataType(polars=Int64, plc=<type_id.INT64: 4>)>, 'int32')), Literal(<DataType(polars=Int32, plc=<type_id.INT32: 3>)>, 0)))),
                                      True,
                                      ('select-6924220335052416367', 0)),
 ('select-6924220335052416367', 0): (<bound method Select.do_evaluate of <class 'cudf_polars.dsl.ir.Select'>>,
                                     (NamedExpr(key1, Col(<DataType(polars=Boolean, plc=<type_id.BOOL8: 11>)>, 'key1')),
                                      NamedExpr(int32, Col(<DataType(polars=Int32, plc=<type_id.INT32: 3>)>, 'int32'))),
                                     False,
                                     ('groupby-2928048646827579712', 0)),
 ('union-5195194595515093715', 0): ('dataframescan--3749640043763656821', 0),
 ('union-5195194595515093715', 1): ('dataframescan-6146510607387301110', 0),
 ('union-5195194595515093715', 2): ('dataframescan--1543724835873278681', 0)}

TomAugspurger avatar Jun 13 '25 22:06 TomAugspurger