cudf
cudf copied to clipboard
[BUG]: `KeyError` in `group_by(expression).agg()` with experimental streaming executor, multiple partitions
Describe the bug
The test python/cudf_polars/tests/test_groupby.py::test_groupby[maintain_order-[(col("key1")) == (col("key2"))]-col("int32").sum()]
fails with a KeyError when using a small blocksize / multiple partitions. Here's a simplified example
Steps/Code to reproduce bug
import polars as pl
from cudf_polars.dsl.translate import Translator
from cudf_polars.experimental.parallel import evaluate_streaming
from cudf_polars.utils.config import ConfigOptions
from cudf_polars.testing.asserts import assert_gpu_result_equal
df = pl.LazyFrame(
{
"key1": [1, 1, 1, 2, 3, 1, 4, 6, 7],
"key2": [2, 2, 2, 2, 6, 1, 4, 6, 8],
"int32": pl.Series([1, 2, 3, 4, 5, 6, 7, 8, 9], dtype=pl.Int32()),
}
)
engine = pl.GPUEngine(executor="streaming", executor_options={"max_rows_per_partition": 3})
q = df.group_by(pl.col("key1") == pl.col("key2")).agg(pl.col("int32").sum())
ir = Translator(q._ldf.visit(), engine=engine).translate_ir()
evaluate_streaming(ir, ConfigOptions.from_polars_engine(engine)).to_polars()
errors with
---------------------------------------------------------------------------
KeyError Traceback (most recent call last)
Cell In[9], line 4
2 q = df.group_by(pl.col("key1") == pl.col("key2")).agg(pl.col("int32").sum())
3 ir = Translator(q._ldf.visit(), engine=engine).translate_ir()
----> 4 evaluate_streaming(ir, ConfigOptions.from_polars_engine(engine)).to_polars()
File ~/gh/rapidsai/cudf/python/cudf_polars/cudf_polars/experimental/parallel.py:228, in evaluate_streaming(ir, config_options)
224 ir, partition_info = lower_ir_graph(ir, config_options)
226 graph, key = task_graph(ir, partition_info, config_options)
--> 228 return get_scheduler(config_options)(graph, key)
File ~/gh/rapidsai/cudf/python/cudf_polars/cudf_polars/experimental/scheduler.py:149, in synchronous_scheduler(graph, key, cache)
146 refcount = Counter(chain.from_iterable(dependencies.values()))
148 for k in toposort(graph, dependencies):
--> 149 cache[k] = _execute_task(graph[k], cache)
150 for dep in dependencies[k]:
151 refcount[dep] -= 1
File ~/gh/rapidsai/cudf/python/cudf_polars/cudf_polars/experimental/scheduler.py:51, in _execute_task(arg, cache)
49 """Execute a compute task."""
50 if istask(arg):
---> 51 return arg[0](*(_execute_task(a, cache) for a in arg[1:]))
52 elif is_hashable(arg):
53 return cache.get(arg, arg)
File ~/gh/rapidsai/cudf/python/cudf_polars/cudf_polars/dsl/ir.py:1411, in GroupBy.do_evaluate(cls, keys_in, agg_requests, maintain_order, zlice, df)
1400 @classmethod
1401 @nvtx_annotate_cudf_polars(message="GroupBy")
1402 def do_evaluate(
(...) 1408 df: DataFrame,
1409 ) -> DataFrame:
1410 """Evaluate and return a dataframe."""
-> 1411 keys = broadcast(*(k.evaluate(df) for k in keys_in), target_length=df.num_rows)
1412 sorted = (
1413 plc.types.Sorted.YES
1414 if all(k.is_sorted for k in keys)
1415 else plc.types.Sorted.NO
1416 )
1417 grouper = plc.groupby.GroupBy(
1418 plc.Table([k.obj for k in keys]),
1419 null_handling=plc.types.NullPolicy.INCLUDE,
(...) 1422 null_precedence=[k.null_order for k in keys],
1423 )
File ~/gh/rapidsai/cudf/python/cudf_polars/cudf_polars/dsl/ir.py:1411, in <genexpr>(.0)
1400 @classmethod
1401 @nvtx_annotate_cudf_polars(message="GroupBy")
1402 def do_evaluate(
(...) 1408 df: DataFrame,
1409 ) -> DataFrame:
1410 """Evaluate and return a dataframe."""
-> 1411 keys = broadcast(*(k.evaluate(df) for k in keys_in), target_length=df.num_rows)
1412 sorted = (
1413 plc.types.Sorted.YES
1414 if all(k.is_sorted for k in keys)
1415 else plc.types.Sorted.NO
1416 )
1417 grouper = plc.groupby.GroupBy(
1418 plc.Table([k.obj for k in keys]),
1419 null_handling=plc.types.NullPolicy.INCLUDE,
(...) 1422 null_precedence=[k.null_order for k in keys],
1423 )
File ~/gh/rapidsai/cudf/python/cudf_polars/cudf_polars/dsl/expressions/base.py:200, in NamedExpr.evaluate(self, df, context)
178 def evaluate(
179 self, df: DataFrame, *, context: ExecutionContext = ExecutionContext.FRAME
180 ) -> Column:
181 """
182 Evaluate this expression given a dataframe for context.
183
(...) 198 name to a column produced from an expression.
199 """
--> 200 return self.value.evaluate(df, context=context).rename(self.name)
File ~/gh/rapidsai/cudf/python/cudf_polars/cudf_polars/dsl/expressions/base.py:110, in Expr.evaluate(self, df, context)
80 def evaluate(
81 self, df: DataFrame, *, context: ExecutionContext = ExecutionContext.FRAME
82 ) -> Column:
83 """
84 Evaluate this expression given a dataframe for context.
85
(...) 108 are not perfect.
109 """
--> 110 return self.do_evaluate(df, context=context)
File ~/gh/rapidsai/cudf/python/cudf_polars/cudf_polars/dsl/expressions/binaryop.py:89, in BinOp.do_evaluate(self, df, context)
85 def do_evaluate(
86 self, df: DataFrame, *, context: ExecutionContext = ExecutionContext.FRAME
87 ) -> Column:
88 """Evaluate this expression given a dataframe for context."""
---> 89 left, right = (child.evaluate(df, context=context) for child in self.children)
90 lop = left.obj
91 rop = right.obj
File ~/gh/rapidsai/cudf/python/cudf_polars/cudf_polars/dsl/expressions/binaryop.py:89, in <genexpr>(.0)
85 def do_evaluate(
86 self, df: DataFrame, *, context: ExecutionContext = ExecutionContext.FRAME
87 ) -> Column:
88 """Evaluate this expression given a dataframe for context."""
---> 89 left, right = (child.evaluate(df, context=context) for child in self.children)
90 lop = left.obj
91 rop = right.obj
File ~/gh/rapidsai/cudf/python/cudf_polars/cudf_polars/dsl/expressions/base.py:110, in Expr.evaluate(self, df, context)
80 def evaluate(
81 self, df: DataFrame, *, context: ExecutionContext = ExecutionContext.FRAME
82 ) -> Column:
83 """
84 Evaluate this expression given a dataframe for context.
85
(...) 108 are not perfect.
109 """
--> 110 return self.do_evaluate(df, context=context)
File ~/gh/rapidsai/cudf/python/cudf_polars/cudf_polars/dsl/expressions/base.py:238, in Col.do_evaluate(self, df, context)
235 """Evaluate this expression given a dataframe for context."""
236 # Deliberately remove the name here so that we guarantee
237 # evaluation of the IR produces names.
--> 238 return df.column_map[self.name].rename(None)
KeyError: 'key2'
Expected behavior
No error / match polars.
What columns are in df.column_map at that point?
ipdb> pp df.column_map
{'int32': <cudf_polars.containers.column.Column object at 0x7f77fefe9df0>,
'key1': <cudf_polars.containers.column.Column object at 0x7f77fefe9e20>}
ipdb>
And here's the graph we're working on (working on ('groupby-2928048646827579712', 0) at the time of the error).
ipdb> pp graph
{('dataframescan--1543724835873278681', 0): (<bound method DataFrameScan.do_evaluate of <class 'cudf_polars.dsl.ir.DataFrameScan'>>,
{'int32': <DataType(polars=Int32, plc=<type_id.INT32: 3>)>,
'key1': <DataType(polars=Int64, plc=<type_id.INT64: 4>)>,
'key2': <DataType(polars=Int64, plc=<type_id.INT64: 4>)>},
shape: (3, 3)
┌──────┬──────┬───────┐
│ key1 ┆ key2 ┆ int32 │
│ --- ┆ --- ┆ --- │
│ i64 ┆ i64 ┆ i32 │
╞══════╪══════╪═══════╡
│ 4 ┆ 4 ┆ 7 │
│ 6 ┆ 6 ┆ 8 │
│ 7 ┆ 8 ┆ 9 │
└──────┴──────┴───────┘,
('int32', 'key1', 'key2')),
('dataframescan--3749640043763656821', 0): (<bound method DataFrameScan.do_evaluate of <class 'cudf_polars.dsl.ir.DataFrameScan'>>,
{'int32': <DataType(polars=Int32, plc=<type_id.INT32: 3>)>,
'key1': <DataType(polars=Int64, plc=<type_id.INT64: 4>)>,
'key2': <DataType(polars=Int64, plc=<type_id.INT64: 4>)>},
shape: (3, 3)
┌──────┬──────┬───────┐
│ key1 ┆ key2 ┆ int32 │
│ --- ┆ --- ┆ --- │
│ i64 ┆ i64 ┆ i32 │
╞══════╪══════╪═══════╡
│ 1 ┆ 2 ┆ 1 │
│ 1 ┆ 2 ┆ 2 │
│ 1 ┆ 2 ┆ 3 │
└──────┴──────┴───────┘,
('int32', 'key1', 'key2')),
('dataframescan-6146510607387301110', 0): (<bound method DataFrameScan.do_evaluate of <class 'cudf_polars.dsl.ir.DataFrameScan'>>,
{'int32': <DataType(polars=Int32, plc=<type_id.INT32: 3>)>,
'key1': <DataType(polars=Int64, plc=<type_id.INT64: 4>)>,
'key2': <DataType(polars=Int64, plc=<type_id.INT64: 4>)>},
shape: (3, 3)
┌──────┬──────┬───────┐
│ key1 ┆ key2 ┆ int32 │
│ --- ┆ --- ┆ --- │
│ i64 ┆ i64 ┆ i32 │
╞══════╪══════╪═══════╡
│ 2 ┆ 2 ┆ 4 │
│ 3 ┆ 6 ┆ 5 │
│ 1 ┆ 1 ┆ 6 │
└──────┴──────┴───────┘,
('int32', 'key1', 'key2')),
('groupby-1302311846582023722', 0): (<bound method GroupBy.do_evaluate of <class 'cudf_polars.dsl.ir.GroupBy'>>,
(NamedExpr(key1, BinOp(<DataType(polars=Boolean, plc=<type_id.BOOL8: 11>)>, <binary_operator.EQUAL: 21>, Col(<DataType(polars=Int64, plc=<type_id.INT64: 4>)>, 'key1'), Col(<DataType(polars=Int64, plc=<type_id.INT64: 4>)>, 'key2'))),),
(NamedExpr(int32, Agg(<DataType(polars=Int32, plc=<type_id.INT32: 3>)>, 'sum', None, Col(<DataType(polars=Int32, plc=<type_id.INT32: 3>)>, 'int32'))),),
False,
None,
('union-5195194595515093715', 0)),
('groupby-1302311846582023722', 1): (<bound method GroupBy.do_evaluate of <class 'cudf_polars.dsl.ir.GroupBy'>>,
(NamedExpr(key1, BinOp(<DataType(polars=Boolean, plc=<type_id.BOOL8: 11>)>, <binary_operator.EQUAL: 21>, Col(<DataType(polars=Int64, plc=<type_id.INT64: 4>)>, 'key1'), Col(<DataType(polars=Int64, plc=<type_id.INT64: 4>)>, 'key2'))),),
(NamedExpr(int32, Agg(<DataType(polars=Int32, plc=<type_id.INT32: 3>)>, 'sum', None, Col(<DataType(polars=Int32, plc=<type_id.INT32: 3>)>, 'int32'))),),
False,
None,
('union-5195194595515093715', 1)),
('groupby-1302311846582023722', 2): (<bound method GroupBy.do_evaluate of <class 'cudf_polars.dsl.ir.GroupBy'>>,
(NamedExpr(key1, BinOp(<DataType(polars=Boolean, plc=<type_id.BOOL8: 11>)>, <binary_operator.EQUAL: 21>, Col(<DataType(polars=Int64, plc=<type_id.INT64: 4>)>, 'key1'), Col(<DataType(polars=Int64, plc=<type_id.INT64: 4>)>, 'key2'))),),
(NamedExpr(int32, Agg(<DataType(polars=Int32, plc=<type_id.INT32: 3>)>, 'sum', None, Col(<DataType(polars=Int32, plc=<type_id.INT32: 3>)>, 'int32'))),),
False,
None,
('union-5195194595515093715', 2)),
('groupby-2928048646827579712', 0): (<bound method GroupBy.do_evaluate of <class 'cudf_polars.dsl.ir.GroupBy'>>,
(NamedExpr(key1, BinOp(<DataType(polars=Boolean, plc=<type_id.BOOL8: 11>)>, <binary_operator.EQUAL: 21>, Col(<DataType(polars=Int64, plc=<type_id.INT64: 4>)>, 'key1'), Col(<DataType(polars=Int64, plc=<type_id.INT64: 4>)>, 'key2'))),),
(NamedExpr(int32, Agg(<DataType(polars=Int32, plc=<type_id.INT32: 3>)>, 'sum', None, Col(<DataType(polars=Int32, plc=<type_id.INT32: 3>)>, 'int32'))),),
False,
None,
('repartition-4112511186720287685', 0)),
('repartition-4112511186720287685', 0): (<function _concat at 0x7f77ff0bde40>,
('groupby-1302311846582023722', 0),
('groupby-1302311846582023722', 1),
('groupby-1302311846582023722', 2)),
('select--5712801464197171428', 0): (<bound method Select.do_evaluate of <class 'cudf_polars.dsl.ir.Select'>>,
(NamedExpr(key1, Col(<DataType(polars=Boolean, plc=<type_id.BOOL8: 11>)>, 'key1')),
NamedExpr(int32, UnaryFunction(<DataType(polars=Int32, plc=<type_id.INT32: 3>)>, 'fill_null', (), Cast(<DataType(polars=Int32, plc=<type_id.INT32: 3>)>, Col(<DataType(polars=Int64, plc=<type_id.INT64: 4>)>, 'int32')), Literal(<DataType(polars=Int32, plc=<type_id.INT32: 3>)>, 0)))),
True,
('select-6924220335052416367', 0)),
('select-6924220335052416367', 0): (<bound method Select.do_evaluate of <class 'cudf_polars.dsl.ir.Select'>>,
(NamedExpr(key1, Col(<DataType(polars=Boolean, plc=<type_id.BOOL8: 11>)>, 'key1')),
NamedExpr(int32, Col(<DataType(polars=Int32, plc=<type_id.INT32: 3>)>, 'int32'))),
False,
('groupby-2928048646827579712', 0)),
('union-5195194595515093715', 0): ('dataframescan--3749640043763656821', 0),
('union-5195194595515093715', 1): ('dataframescan-6146510607387301110', 0),
('union-5195194595515093715', 2): ('dataframescan--1543724835873278681', 0)}