dask icon indicating copy to clipboard operation
dask copied to clipboard

Grouping by multiple columns with `dropna=False` gives unexpected results

Open charlesbluca opened this issue 3 years ago • 10 comments

What happened: I am attempting to do a groupby on multiple columns with dropna=False, and I find that this still drops null values:

import dask.dataframe as dd
import pandas as pd

df = pd.DataFrame(
    {
        "a": [1, 2, 3, 4, None, None, 7, 8],
        "b": [1, None, 1, 3, None, 3, 1, 3],
        "c": [4, 5, 6, 3, 2, 1, 0, 0],
    }
)

ddf = dd.from_pandas(df, npartitions=2)

res = ddf.groupby(["a", "b"], dropna=False).sum()

res.compute()
         c
a   b     
1.0 1.0  4
3.0 1.0  6
4.0 3.0  3
7.0 1.0  0
8.0 3.0  0

When attempting this same operation with a series groupby, we get an error during computation:

res = ddf.groupby(["a", "b"], dropna=False).c.sum()

res.compute()
Trackback
TypeError                                 Traceback (most recent call last)
Input In [16], in <cell line: 16>()
     12 ddf = dd.from_pandas(df, npartitions=2)
     14 res = ddf.groupby(["a", "b"], dropna=False).c.sum()
---> 16 res.compute()

File ~/dask/dask/base.py:292, in DaskMethodsMixin.compute(self, **kwargs)
    268 def compute(self, **kwargs):
    269     """Compute this dask collection
    270 
    271     This turns a lazy Dask collection into its in-memory equivalent.
   (...)
    290     dask.base.compute
    291     """
--> 292     (result,) = compute(self, traverse=False, **kwargs)
    293     return result

File ~/dask/dask/base.py:575, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    572     keys.append(x.__dask_keys__())
    573     postcomputes.append(x.__dask_postcompute__())
--> 575 results = schedule(dsk, keys, **kwargs)
    576 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File ~/dask/dask/threaded.py:81, in get(dsk, result, cache, num_workers, pool, **kwargs)
     78     elif isinstance(pool, multiprocessing.pool.Pool):
     79         pool = MultiprocessingPoolExecutor(pool)
---> 81 results = get_async(
     82     pool.submit,
     83     pool._max_workers,
     84     dsk,
     85     result,
     86     cache=cache,
     87     get_id=_thread_get_id,
     88     pack_exception=pack_exception,
     89     **kwargs,
     90 )
     92 # Cleanup pools associated to dead threads
     93 with pools_lock:

File ~/dask/dask/local.py:508, in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs)
    506         _execute_task(task, data)  # Re-execute locally
    507     else:
--> 508         raise_exception(exc, tb)
    509 res, worker_id = loads(res_info)
    510 state["cache"][key] = res

File ~/dask/dask/local.py:316, in reraise(exc, tb)
    314 if exc.__traceback__ is not tb:
    315     raise exc.with_traceback(tb)
--> 316 raise exc

File ~/dask/dask/local.py:221, in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    219 try:
    220     task, data = loads(task_info)
--> 221     result = _execute_task(task, data)
    222     id = get_id()
    223     result = dumps((result, id))

File ~/dask/dask/core.py:119, in _execute_task(arg, cache, dsk)
    115     func, args = arg[0], arg[1:]
    116     # Note: Don't assign the subtask results to a variable. numpy detects
    117     # temporaries by their reference count and can execute certain
    118     # operations in-place.
--> 119     return func(*(_execute_task(a, cache) for a in args))
    120 elif not ishashable(arg):
    121     return arg

File ~/compose/etc/conda/cuda_11.5/envs/rapids/lib/python3.9/site-packages/toolz/functoolz.py:630, in pipe(data, *funcs)
    610 """ Pipe a value through a sequence of functions
    611 
    612 I.e. ``pipe(data, f, g, h)`` is equivalent to ``h(g(f(data)))``
   (...)
    627     thread_last
    628 """
    629 for func in funcs:
--> 630     data = func(data)
    631 return data

File ~/dask/dask/dataframe/groupby.py:295, in _groupby_aggregate(df, aggfunc, levels, dropna, sort, observed, **kwargs)
    292 dropna = {"dropna": dropna} if dropna is not None else {}
    293 observed = {"observed": observed} if observed is not None else {}
--> 295 grouped = df.groupby(level=levels, sort=sort, **observed, **dropna)
    296 return aggfunc(grouped, **kwargs)

File ~/compose/etc/conda/cuda_11.5/envs/rapids/lib/python3.9/site-packages/pandas/core/series.py:1884, in Series.groupby(self, by, axis, level, as_index, sort, group_keys, squeeze, observed, dropna)
   1880 axis = self._get_axis_number(axis)
   1882 # error: Argument "squeeze" to "SeriesGroupBy" has incompatible type
   1883 # "Union[bool, NoDefault]"; expected "bool"
-> 1884 return SeriesGroupBy(
   1885     obj=self,
   1886     keys=by,
   1887     axis=axis,
   1888     level=level,
   1889     as_index=as_index,
   1890     sort=sort,
   1891     group_keys=group_keys,
   1892     squeeze=squeeze,  # type: ignore[arg-type]
   1893     observed=observed,
   1894     dropna=dropna,
   1895 )

File ~/compose/etc/conda/cuda_11.5/envs/rapids/lib/python3.9/site-packages/pandas/core/groupby/groupby.py:889, in GroupBy.__init__(self, obj, keys, axis, level, grouper, exclusions, selection, as_index, sort, group_keys, squeeze, observed, mutated, dropna)
    886 if grouper is None:
    887     from pandas.core.groupby.grouper import get_grouper
--> 889     grouper, exclusions, obj = get_grouper(
    890         obj,
    891         keys,
    892         axis=axis,
    893         level=level,
    894         sort=sort,
    895         observed=observed,
    896         mutated=self.mutated,
    897         dropna=self.dropna,
    898     )
    900 self.obj = obj
    901 self.axis = obj._get_axis_number(axis)

File ~/compose/etc/conda/cuda_11.5/envs/rapids/lib/python3.9/site-packages/pandas/core/groupby/grouper.py:879, in get_grouper(obj, key, axis, level, sort, observed, mutated, validate, dropna)
    871         raise ValueError(
    872             f"Length of grouper ({len(gpr)}) and axis ({obj.shape[axis]}) "
    873             "must be same length"
    874         )
    876     # create the Grouping
    877     # allow us to passing the actual Grouping as the gpr
    878     ping = (
--> 879         Grouping(
    880             group_axis,
    881             gpr,
    882             obj=obj,
    883             level=level,
    884             sort=sort,
    885             observed=observed,
    886             in_axis=in_axis,
    887             dropna=dropna,
    888         )
    889         if not isinstance(gpr, Grouping)
    890         else gpr
    891     )
    893     groupings.append(ping)
    895 if len(groupings) == 0 and len(obj):

File ~/compose/etc/conda/cuda_11.5/envs/rapids/lib/python3.9/site-packages/pandas/core/groupby/grouper.py:493, in Grouping.__init__(self, index, grouper, obj, level, sort, observed, in_axis, dropna)
    485     mapper = self.grouping_vector
    486     # In extant tests, the new self.grouping_vector matches
    487     #  `index.get_level_values(ilevel)` whenever
    488     #  mapper is None and isinstance(index, MultiIndex)
    489     (
    490         self.grouping_vector,  # Index
    491         self._codes,
    492         self._group_index,
--> 493     ) = index._get_grouper_for_level(mapper, ilevel)
    495 # a passed Grouper like, directly get the grouper in the same way
    496 # as single grouper groupby, use the group_info to get codes
    497 elif isinstance(self.grouping_vector, Grouper):
    498     # get the new grouper; we already have disambiguated
    499     # what key/level refer to exactly, don't need to
    500     # check again as we have by this point converted these
    501     # to an actual value (rather than a pd.Grouper)

File ~/compose/etc/conda/cuda_11.5/envs/rapids/lib/python3.9/site-packages/pandas/core/indexes/multi.py:1483, in MultiIndex._get_grouper_for_level(self, mapper, level)
   1480 if mapper is not None:
   1481     # Handle group mapping function and return
   1482     level_values = self.levels[level].take(indexer)
-> 1483     grouper = level_values.map(mapper)
   1484     return grouper, None, None
   1486 codes, uniques = algos.factorize(indexer, sort=True)

File ~/compose/etc/conda/cuda_11.5/envs/rapids/lib/python3.9/site-packages/pandas/core/indexes/base.py:5506, in Index.map(self, mapper, na_action)
   5486 """
   5487 Map values using input correspondence (a dict, Series, or function).
   5488 
   (...)
   5502     a MultiIndex will be returned.
   5503 """
   5504 from pandas.core.indexes.multi import MultiIndex
-> 5506 new_values = self._map_values(mapper, na_action=na_action)
   5508 attributes = self._get_attributes_dict()
   5510 # we can return a MultiIndex

File ~/compose/etc/conda/cuda_11.5/envs/rapids/lib/python3.9/site-packages/pandas/core/base.py:870, in IndexOpsMixin._map_values(self, mapper, na_action)
    867         raise ValueError(msg)
    869 # mapper is a function
--> 870 new_values = map_f(values, mapper)
    872 return new_values

File ~/compose/etc/conda/cuda_11.5/envs/rapids/lib/python3.9/site-packages/pandas/_libs/lib.pyx:2859, in pandas._libs.lib.map_infer()

TypeError: 'numpy.ndarray' object is not callable

What you expected to happen: Pandas gives the following outputs:

df.groupby(["a", "b"], dropna=False).sum()
         c
a   b     
1.0 1.0  4
2.0 NaN  5
3.0 1.0  6
4.0 3.0  3
7.0 1.0  0
8.0 3.0  0
NaN 3.0  1
    NaN  2
df.groupby(["a", "b"], dropna=False).c.sum()
a    b  
1.0  1.0    4
2.0  NaN    5
3.0  1.0    6
4.0  3.0    3
7.0  1.0    0
8.0  3.0    0
NaN  3.0    1
     NaN    2
Name: c, dtype: int64

Environment:

  • Dask version: latest main
  • Python version: 3.9
  • Operating System: ubuntu 18.04
  • Install method (conda, pip, source): source

charlesbluca avatar Mar 16 '22 18:03 charlesbluca

Hmm this is an interesting one. Thanks for writing it up. It looks like the issue is in the aggregate method. I can't quite figure out what's going wrong though.

I thought I'd be able to reproduce with this in pandas (which is what the aggregate method is supposed to do), but no luck

inner = df.groupby(["a", "b"], dropna=False).sum()
inner.groupby(["a", "b"], sort=False, level=[0, 1], dropna=False).sum()

jsignell avatar Mar 22 '22 16:03 jsignell

Looks like thing that's breaking the aggregation here is the concatenation of the chunks preceding it? Doing the following in Pandas gets me the same result as Dask:

import pandas as pd

df1 = pd.DataFrame(
    {
        "a": [1, 2, 3, 4],
        "b": [1, None, 1, 3],
        "c": [4, 5, 6, 3],
    }
)

df2 = pd.DataFrame(
    {
        "a": [None, None, 7, 8],
        "b": [None, 3, 1, 3],
        "c": [2, 1, 0, 0],
    }
)

res1 = df1.groupby(["a", "b"], dropna=False).sum()
res2 = df2.groupby(["a", "b"], dropna=False).sum()

res = pd.concat([res1, res2]).groupby(["a", "b"], dropna=False).sum()

charlesbluca avatar Mar 28 '22 15:03 charlesbluca

Opened an issue tracking this - https://github.com/pandas-dev/pandas/issues/46783

charlesbluca avatar Apr 15 '22 17:04 charlesbluca

A fix was submitted and merged for this in https://github.com/pandas-dev/pandas/pull/47186, and should be included in the 1.5 release - I will go ahead and verify that things are working here with the latest source of Pandas.

Assuming things work now, how would we like to handle the required Pandas dependency? Is it okay to bump the Pandas min version for dask.dataframe to 1.5, or do we want to leave that more flexible and have a check on Pandas version for this specific groupby functionality?

charlesbluca avatar Jun 14 '22 14:06 charlesbluca

Can verify that https://github.com/pandas-dev/pandas/pull/47186 has resolved the issue for dataframe groupbys - I am still getting the same traceback for series groupbys though.

charlesbluca avatar Jun 14 '22 15:06 charlesbluca

Think I've isolated the series groupby failures to Pandas; as part of _groupby_aggregate, we are attempting to do a groupby on a series with a multi-index that has nulls in the first level, which seems to fail regardless of dropna:

import pandas as pd

arr = [list(range(4)) * 4, list(range(2)) * 8]

# insert nulls into first level of multi-index
arr[0][2] = None

ser = pd.Series(
    list(range(16)),
    index=pd.MultiIndex.from_arrays(arr, names=("a", "b"))
)

ser.groupby(level=[0, 1])

Will open up another issue to track this in Pandas.

EDIT:

Opened https://github.com/pandas-dev/pandas/issues/47348 to track this bug

charlesbluca avatar Jun 14 '22 17:06 charlesbluca

Thanks for keeping on top of this Charles!

jsignell avatar Jun 20 '22 20:06 jsignell

With the above pandas issues resolved, I can confirm these issues no longer occur with pandas 1.5.0rc0.

Knowing this, is there anything we want to change on Dask's end w.r.t. pandas constraints? I imagine we don't want to bump the pandas minimum dependency, not sure if it makes sense to introduce some compatibility code here to block certain groupbys if an older pandas version is detected.

charlesbluca avatar Aug 30 '22 20:08 charlesbluca

Yeah we definitely don't want to bump the minimum. If it's easy to bring in some compatibility code that seems like a good approach.

jsignell avatar Sep 06 '22 13:09 jsignell

Opened #9468 to add warnings around the groupby behavior in pandas < 1.5.0

charlesbluca avatar Sep 06 '22 20:09 charlesbluca

Closing this since there do not seem to be any errors with pandas>=1.5. Feel free to re-open if I'm missing something.

rjzamora avatar May 06 '24 15:05 rjzamora