Binops between `dd.Series` and NumPy arrays fail while inferring meta
Minimal Complete Verifiable Example:
import dask.dataframe as dd
import numpy as np
arr = np.arange(10)
series = dd.from_array(arr)
series + arr
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
~/dev/dask/dask/dataframe/utils.py in raise_on_meta_error(funcname, udf)
175 try:
--> 176 yield
177 except Exception as e:
~/dev/dask/dask/dataframe/core.py in elemwise(op, *args, **kwargs)
5332 with raise_on_meta_error(funcname(op)):
-> 5333 meta = partial_by_order(*parts, function=op, other=other)
5334
~/dev/dask/dask/utils.py in partial_by_order(*args, **kwargs)
1169 args2.insert(i, arg)
-> 1170 return function(*args2, **kwargs)
1171
~/miniconda3/envs/dask-dev/lib/python3.8/site-packages/pandas/core/ops/common.py in new_method(self, other)
64
---> 65 return method(self, other)
66
~/miniconda3/envs/dask-dev/lib/python3.8/site-packages/pandas/core/ops/__init__.py in wrapper(left, right)
342 rvalues = extract_array(right, extract_numpy=True)
--> 343 result = arithmetic_op(lvalues, rvalues, op)
344
~/miniconda3/envs/dask-dev/lib/python3.8/site-packages/pandas/core/ops/array_ops.py in arithmetic_op(left, right, op)
189 with np.errstate(all="ignore"):
--> 190 res_values = na_arithmetic_op(lvalues, rvalues, op)
191
~/miniconda3/envs/dask-dev/lib/python3.8/site-packages/pandas/core/ops/array_ops.py in na_arithmetic_op(left, right, op, is_cmp)
142 try:
--> 143 result = expressions.evaluate(op, left, right)
144 except TypeError:
~/miniconda3/envs/dask-dev/lib/python3.8/site-packages/pandas/core/computation/expressions.py in evaluate(op, a, b, use_numexpr)
232 if use_numexpr:
--> 233 return _evaluate(op, op_str, a, b) # type: ignore
234 return _evaluate_standard(op, op_str, a, b)
~/miniconda3/envs/dask-dev/lib/python3.8/site-packages/pandas/core/computation/expressions.py in _evaluate_standard(op, op_str, a, b)
67 with np.errstate(all="ignore"):
---> 68 return op(a, b)
69
ValueError: operands could not be broadcast together with shapes (2,) (10,)
The above exception was the direct cause of the following exception:
ValueError Traceback (most recent call last)
<ipython-input-7-cc129ab42671> in <module>
----> 1 series + arr
~/dev/dask/dask/dataframe/core.py in <lambda>(self, other)
1561 return lambda self, other: elemwise(op, other, self)
1562 else:
-> 1563 return lambda self, other: elemwise(op, self, other)
1564
1565 def rolling(self, window, min_periods=None, center=False, win_type=None, axis=0):
~/dev/dask/dask/dataframe/core.py in elemwise(op, *args, **kwargs)
5331 ]
5332 with raise_on_meta_error(funcname(op)):
-> 5333 meta = partial_by_order(*parts, function=op, other=other)
5334
5335 result = new_dd_object(graph, _name, meta, divisions)
~/miniconda3/envs/dask-dev/lib/python3.8/contextlib.py in __exit__(self, type, value, traceback)
129 value = type()
130 try:
--> 131 self.gen.throw(type, value, traceback)
132 except StopIteration as exc:
133 # Suppress StopIteration *unless* it's the same exception that
~/dev/dask/dask/dataframe/utils.py in raise_on_meta_error(funcname, udf)
195 )
196 msg = msg.format(" in `{0}`".format(funcname) if funcname else "", repr(e), tb)
--> 197 raise ValueError(msg) from e
198
199
ValueError: Metadata inference failed in `add`.
Original error is below:
------------------------
ValueError('operands could not be broadcast together with shapes (2,) (10,) ')
Traceback:
---------
File "/Users/gabe/dev/dask/dask/dataframe/utils.py", line 176, in raise_on_meta_error
yield
File "/Users/gabe/dev/dask/dask/dataframe/core.py", line 5333, in elemwise
meta = partial_by_order(*parts, function=op, other=other)
File "/Users/gabe/dev/dask/dask/utils.py", line 1170, in partial_by_order
return function(*args2, **kwargs)
File "/Users/gabe/miniconda3/envs/dask-dev/lib/python3.8/site-packages/pandas/core/ops/common.py", line 65, in new_method
return method(self, other)
File "/Users/gabe/miniconda3/envs/dask-dev/lib/python3.8/site-packages/pandas/core/ops/__init__.py", line 343, in wrapper
result = arithmetic_op(lvalues, rvalues, op)
File "/Users/gabe/miniconda3/envs/dask-dev/lib/python3.8/site-packages/pandas/core/ops/array_ops.py", line 190, in arithmetic_op
res_values = na_arithmetic_op(lvalues, rvalues, op)
File "/Users/gabe/miniconda3/envs/dask-dev/lib/python3.8/site-packages/pandas/core/ops/array_ops.py", line 143, in na_arithmetic_op
result = expressions.evaluate(op, left, right)
File "/Users/gabe/miniconda3/envs/dask-dev/lib/python3.8/site-packages/pandas/core/computation/expressions.py", line 233, in evaluate
return _evaluate(op, op_str, a, b) # type: ignore
File "/Users/gabe/miniconda3/envs/dask-dev/lib/python3.8/site-packages/pandas/core/computation/expressions.py", line 68, in _evaluate_standard
return op(a, b)
Anything else we need to know?:
We have a _maybe_from_pandas coercion for the input arguments—perhaps there should also be an equivalent _maybe_from_array? FWIW, this works as expected:
>>> import dask.array as da
>>> darr = da.from_array(arr)
>>> series + darr
Dask Series Structure:
npartitions=1
0 int64
9 ...
dtype: int64
Dask Name: add, 3 tasks
So turning any arrays in the inputs into dask arrays might be sufficient.
Environment:
- Dask version: 56d1891c15320872b815f1ab2f8e41867b231e5e
- Python version: 3.8.8
- Operating System: macOS
- Install method (conda, pip, source): source
It looks to me like this would work. Although I guess you might want to specify that it should only result in one partition (like how the from_pandas one does).
diff --git a/dask/dataframe/core.py b/dask/dataframe/core.py
index 0ebdda9d..6f09bdae 100644
--- a/dask/dataframe/core.py
+++ b/dask/dataframe/core.py
@@ -5384,11 +5384,13 @@ def handle_out(out, result):
def _maybe_from_pandas(dfs):
- from .io import from_pandas
+ from .io import from_pandas, from_array
dfs = [
from_pandas(df, 1)
if (is_series_like(df) or is_dataframe_like(df)) and not is_dask_collection(df)
+ else from_array(df)
+ if is_arraylike(df)
else df
for df in dfs
]
Yes we had the same idea, see https://github.com/gjoseph92/dask/commit/b9a0d927541de969ebf6053fbbbcca65afe9cf28 :)
It does work when the DataFrame has 1 partition. However see the note on that commit: because array <--> dataframe operations require both inputs to have the same chunk pattern, we'd need to know the number of (aligned) partitions of the other input DataFrames first, so we could pass that into from_array.