dask icon indicating copy to clipboard operation
dask copied to clipboard

Binops between `dd.Series` and NumPy arrays fail while inferring meta

Open gjoseph92 opened this issue 4 years ago • 2 comments

Minimal Complete Verifiable Example:

import dask.dataframe as dd
import numpy as np

arr = np.arange(10)
series = dd.from_array(arr)
series + arr
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
~/dev/dask/dask/dataframe/utils.py in raise_on_meta_error(funcname, udf)
    175     try:
--> 176         yield
    177     except Exception as e:

~/dev/dask/dask/dataframe/core.py in elemwise(op, *args, **kwargs)
   5332         with raise_on_meta_error(funcname(op)):
-> 5333             meta = partial_by_order(*parts, function=op, other=other)
   5334 

~/dev/dask/dask/utils.py in partial_by_order(*args, **kwargs)
   1169         args2.insert(i, arg)
-> 1170     return function(*args2, **kwargs)
   1171 

~/miniconda3/envs/dask-dev/lib/python3.8/site-packages/pandas/core/ops/common.py in new_method(self, other)
     64 
---> 65         return method(self, other)
     66 

~/miniconda3/envs/dask-dev/lib/python3.8/site-packages/pandas/core/ops/__init__.py in wrapper(left, right)
    342         rvalues = extract_array(right, extract_numpy=True)
--> 343         result = arithmetic_op(lvalues, rvalues, op)
    344 

~/miniconda3/envs/dask-dev/lib/python3.8/site-packages/pandas/core/ops/array_ops.py in arithmetic_op(left, right, op)
    189         with np.errstate(all="ignore"):
--> 190             res_values = na_arithmetic_op(lvalues, rvalues, op)
    191 

~/miniconda3/envs/dask-dev/lib/python3.8/site-packages/pandas/core/ops/array_ops.py in na_arithmetic_op(left, right, op, is_cmp)
    142     try:
--> 143         result = expressions.evaluate(op, left, right)
    144     except TypeError:

~/miniconda3/envs/dask-dev/lib/python3.8/site-packages/pandas/core/computation/expressions.py in evaluate(op, a, b, use_numexpr)
    232         if use_numexpr:
--> 233             return _evaluate(op, op_str, a, b)  # type: ignore
    234     return _evaluate_standard(op, op_str, a, b)

~/miniconda3/envs/dask-dev/lib/python3.8/site-packages/pandas/core/computation/expressions.py in _evaluate_standard(op, op_str, a, b)
     67     with np.errstate(all="ignore"):
---> 68         return op(a, b)
     69 

ValueError: operands could not be broadcast together with shapes (2,) (10,) 

The above exception was the direct cause of the following exception:

ValueError                                Traceback (most recent call last)
<ipython-input-7-cc129ab42671> in <module>
----> 1 series + arr

~/dev/dask/dask/dataframe/core.py in <lambda>(self, other)
   1561             return lambda self, other: elemwise(op, other, self)
   1562         else:
-> 1563             return lambda self, other: elemwise(op, self, other)
   1564 
   1565     def rolling(self, window, min_periods=None, center=False, win_type=None, axis=0):

~/dev/dask/dask/dataframe/core.py in elemwise(op, *args, **kwargs)
   5331         ]
   5332         with raise_on_meta_error(funcname(op)):
-> 5333             meta = partial_by_order(*parts, function=op, other=other)
   5334 
   5335     result = new_dd_object(graph, _name, meta, divisions)

~/miniconda3/envs/dask-dev/lib/python3.8/contextlib.py in __exit__(self, type, value, traceback)
    129                 value = type()
    130             try:
--> 131                 self.gen.throw(type, value, traceback)
    132             except StopIteration as exc:
    133                 # Suppress StopIteration *unless* it's the same exception that

~/dev/dask/dask/dataframe/utils.py in raise_on_meta_error(funcname, udf)
    195         )
    196         msg = msg.format(" in `{0}`".format(funcname) if funcname else "", repr(e), tb)
--> 197         raise ValueError(msg) from e
    198 
    199 

ValueError: Metadata inference failed in `add`.

Original error is below:
------------------------
ValueError('operands could not be broadcast together with shapes (2,) (10,) ')

Traceback:
---------
  File "/Users/gabe/dev/dask/dask/dataframe/utils.py", line 176, in raise_on_meta_error
    yield
  File "/Users/gabe/dev/dask/dask/dataframe/core.py", line 5333, in elemwise
    meta = partial_by_order(*parts, function=op, other=other)
  File "/Users/gabe/dev/dask/dask/utils.py", line 1170, in partial_by_order
    return function(*args2, **kwargs)
  File "/Users/gabe/miniconda3/envs/dask-dev/lib/python3.8/site-packages/pandas/core/ops/common.py", line 65, in new_method
    return method(self, other)
  File "/Users/gabe/miniconda3/envs/dask-dev/lib/python3.8/site-packages/pandas/core/ops/__init__.py", line 343, in wrapper
    result = arithmetic_op(lvalues, rvalues, op)
  File "/Users/gabe/miniconda3/envs/dask-dev/lib/python3.8/site-packages/pandas/core/ops/array_ops.py", line 190, in arithmetic_op
    res_values = na_arithmetic_op(lvalues, rvalues, op)
  File "/Users/gabe/miniconda3/envs/dask-dev/lib/python3.8/site-packages/pandas/core/ops/array_ops.py", line 143, in na_arithmetic_op
    result = expressions.evaluate(op, left, right)
  File "/Users/gabe/miniconda3/envs/dask-dev/lib/python3.8/site-packages/pandas/core/computation/expressions.py", line 233, in evaluate
    return _evaluate(op, op_str, a, b)  # type: ignore
  File "/Users/gabe/miniconda3/envs/dask-dev/lib/python3.8/site-packages/pandas/core/computation/expressions.py", line 68, in _evaluate_standard
    return op(a, b)

Anything else we need to know?:

We have a _maybe_from_pandas coercion for the input arguments—perhaps there should also be an equivalent _maybe_from_array? FWIW, this works as expected:

>>> import dask.array as da
>>> darr = da.from_array(arr)
>>> series + darr
Dask Series Structure:
npartitions=1
0    int64
9      ...
dtype: int64
Dask Name: add, 3 tasks

So turning any arrays in the inputs into dask arrays might be sufficient.

Environment:

  • Dask version: 56d1891c15320872b815f1ab2f8e41867b231e5e
  • Python version: 3.8.8
  • Operating System: macOS
  • Install method (conda, pip, source): source

gjoseph92 avatar Jul 21 '21 23:07 gjoseph92

It looks to me like this would work. Although I guess you might want to specify that it should only result in one partition (like how the from_pandas one does).

diff --git a/dask/dataframe/core.py b/dask/dataframe/core.py
index 0ebdda9d..6f09bdae 100644
--- a/dask/dataframe/core.py
+++ b/dask/dataframe/core.py
@@ -5384,11 +5384,13 @@ def handle_out(out, result):
 
 
 def _maybe_from_pandas(dfs):
-    from .io import from_pandas
+    from .io import from_pandas, from_array
 
     dfs = [
         from_pandas(df, 1)
         if (is_series_like(df) or is_dataframe_like(df)) and not is_dask_collection(df)
+        else from_array(df)
+        if is_arraylike(df)
         else df
         for df in dfs
     ]

jsignell avatar Jul 22 '21 14:07 jsignell

Yes we had the same idea, see https://github.com/gjoseph92/dask/commit/b9a0d927541de969ebf6053fbbbcca65afe9cf28 :)

It does work when the DataFrame has 1 partition. However see the note on that commit: because array <--> dataframe operations require both inputs to have the same chunk pattern, we'd need to know the number of (aligned) partitions of the other input DataFrames first, so we could pass that into from_array.

gjoseph92 avatar Jul 22 '21 19:07 gjoseph92