[BUG]: Duplicate values in unary ops with streaming executor and multiple partitions
Describe the bug
When performing a unary op (possible one that consists of just literals?) we get duplicate outputs.
Steps/Code to reproduce bug
import polars as pl
ldf = pl.LazyFrame({"a": list(range(10))})
q = ldf.select(pl.lit(2).pow(pl.lit(-3, dtype=pl.Float32)))
q.collect(engine=pl.GPUEngine(executor="streaming", executor_options={"max_rows_per_partition": 5}))
That outputs
shape: (2, 1)
┌─────────┐
│ literal │
│ --- │
│ f32 │
╞═════════╡
│ 0.125 │
│ 0.125 │
└─────────┘
Expected behavior
Just one row. I think the idea is to evaluate the expression containing the literals, which doesn't reference any columns?
In [4]: q.collect()
Out[4]:
shape: (1, 1)
┌─────────┐
│ literal │
│ --- │
│ f32 │
╞═════════╡
│ 0.125 │
└─────────┘
This snippet is probably failing for the same reason:
engine = pl.GPUEngine(executor="streaming", executor_options={"max_rows_per_partition": 2})
df = pl.LazyFrame({"a": ["1", "2", None, "4"]})
q = df.select(pl.col("a").cast(pl.Int8))
q.collect(engine=engine)
---------------------------------------------------------------------------
ComputeError Traceback (most recent call last)
Cell In[21], line 5
3 df = pl.LazyFrame({"a": ["1", "2", None, "4"]})
4 q = df.select(pl.col("a").cast(pl.Int8))
----> 5 q.collect(engine=engine)
File /raid/toaugspurger/envs/rapidsai/cudf/25.08/lib/python3.12/site-packages/polars/_utils/deprecation.py:93, in deprecate_streaming_parameter.<locals>.decorate.<locals>.wrapper(*args, **kwargs)
89 kwargs["engine"] = "in-memory"
91 del kwargs["streaming"]
---> 93 return function(*args, **kwargs)
File /raid/toaugspurger/envs/rapidsai/cudf/25.08/lib/python3.12/site-packages/polars/lazyframe/frame.py:2207, in LazyFrame.collect(self, type_coercion, _type_check, predicate_pushdown, projection_pushdown, simplify_expression, slice_pushdown, comm_subplan_elim, comm_subexpr_elim, cluster_with_columns, collapse_joins, no_optimization, engine, background, _check_order, _eager, **_kwargs)
2205 # Only for testing purposes
2206 callback = _kwargs.get("post_opt_callback", callback)
-> 2207 return wrap_df(ldf.collect(engine, callback))
ComputeError: InvalidOperationError: Conversion from `str` failed.
That works fine if there's a null in the first partition.
This was caught by failures in python/cudf_polars/tests/expressions/test_numeric_unaryops.py with a modified max_rows_per_partition. We should ensure that tests creating their own dataframes have a sufficient number of rows to hit the multi-partition path. For example, this similar binop test didn't fail because it only had 3 rows:
diff --git a/python/cudf_polars/tests/expressions/test_numeric_binops.py b/python/cudf_polars/tests/expressions/test_numeric_binops.py
index fa1ec3c19e..0ce967a0a0 100644
--- a/python/cudf_polars/tests/expressions/test_numeric_binops.py
+++ b/python/cudf_polars/tests/expressions/test_numeric_binops.py
@@ -106,7 +106,7 @@ def test_numeric_binop(df, binop):
@pytest.mark.parametrize("left_scalar", [False, True])
@pytest.mark.parametrize("right_scalar", [False, True])
def test_binop_with_scalar(left_scalar, right_scalar):
- df = pl.LazyFrame({"a": [1, 2, 3], "b": [5, 6, 7]})
+ df = pl.LazyFrame({"a": [1, 2, 3] * 2, "b": [5, 6, 7] * 2})
lop = pl.lit(2) if left_scalar else pl.col("a")
rop = pl.lit(6) if right_scalar else pl.col("b")
I bet this is because we're running the select on each partition and then concatting rather than discarding all but the first partition.
I bet this is because we're running the select on each partition and then concatting rather than discarding all but the first partition.
Yes exactly - When Select.exprs don't actually reference any columns, we need to replace the child IR with something that is single partition.