Grouping by multiple columns with `dropna=False` gives unexpected results
What happened:
I am attempting to do a groupby on multiple columns with dropna=False, and I find that this still drops null values:
import dask.dataframe as dd
import pandas as pd
df = pd.DataFrame(
{
"a": [1, 2, 3, 4, None, None, 7, 8],
"b": [1, None, 1, 3, None, 3, 1, 3],
"c": [4, 5, 6, 3, 2, 1, 0, 0],
}
)
ddf = dd.from_pandas(df, npartitions=2)
res = ddf.groupby(["a", "b"], dropna=False).sum()
res.compute()
c
a b
1.0 1.0 4
3.0 1.0 6
4.0 3.0 3
7.0 1.0 0
8.0 3.0 0
When attempting this same operation with a series groupby, we get an error during computation:
res = ddf.groupby(["a", "b"], dropna=False).c.sum()
res.compute()
Trackback
TypeError Traceback (most recent call last)
Input In [16], in <cell line: 16>()
12 ddf = dd.from_pandas(df, npartitions=2)
14 res = ddf.groupby(["a", "b"], dropna=False).c.sum()
---> 16 res.compute()
File ~/dask/dask/base.py:292, in DaskMethodsMixin.compute(self, **kwargs)
268 def compute(self, **kwargs):
269 """Compute this dask collection
270
271 This turns a lazy Dask collection into its in-memory equivalent.
(...)
290 dask.base.compute
291 """
--> 292 (result,) = compute(self, traverse=False, **kwargs)
293 return result
File ~/dask/dask/base.py:575, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
572 keys.append(x.__dask_keys__())
573 postcomputes.append(x.__dask_postcompute__())
--> 575 results = schedule(dsk, keys, **kwargs)
576 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
File ~/dask/dask/threaded.py:81, in get(dsk, result, cache, num_workers, pool, **kwargs)
78 elif isinstance(pool, multiprocessing.pool.Pool):
79 pool = MultiprocessingPoolExecutor(pool)
---> 81 results = get_async(
82 pool.submit,
83 pool._max_workers,
84 dsk,
85 result,
86 cache=cache,
87 get_id=_thread_get_id,
88 pack_exception=pack_exception,
89 **kwargs,
90 )
92 # Cleanup pools associated to dead threads
93 with pools_lock:
File ~/dask/dask/local.py:508, in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs)
506 _execute_task(task, data) # Re-execute locally
507 else:
--> 508 raise_exception(exc, tb)
509 res, worker_id = loads(res_info)
510 state["cache"][key] = res
File ~/dask/dask/local.py:316, in reraise(exc, tb)
314 if exc.__traceback__ is not tb:
315 raise exc.with_traceback(tb)
--> 316 raise exc
File ~/dask/dask/local.py:221, in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
219 try:
220 task, data = loads(task_info)
--> 221 result = _execute_task(task, data)
222 id = get_id()
223 result = dumps((result, id))
File ~/dask/dask/core.py:119, in _execute_task(arg, cache, dsk)
115 func, args = arg[0], arg[1:]
116 # Note: Don't assign the subtask results to a variable. numpy detects
117 # temporaries by their reference count and can execute certain
118 # operations in-place.
--> 119 return func(*(_execute_task(a, cache) for a in args))
120 elif not ishashable(arg):
121 return arg
File ~/compose/etc/conda/cuda_11.5/envs/rapids/lib/python3.9/site-packages/toolz/functoolz.py:630, in pipe(data, *funcs)
610 """ Pipe a value through a sequence of functions
611
612 I.e. ``pipe(data, f, g, h)`` is equivalent to ``h(g(f(data)))``
(...)
627 thread_last
628 """
629 for func in funcs:
--> 630 data = func(data)
631 return data
File ~/dask/dask/dataframe/groupby.py:295, in _groupby_aggregate(df, aggfunc, levels, dropna, sort, observed, **kwargs)
292 dropna = {"dropna": dropna} if dropna is not None else {}
293 observed = {"observed": observed} if observed is not None else {}
--> 295 grouped = df.groupby(level=levels, sort=sort, **observed, **dropna)
296 return aggfunc(grouped, **kwargs)
File ~/compose/etc/conda/cuda_11.5/envs/rapids/lib/python3.9/site-packages/pandas/core/series.py:1884, in Series.groupby(self, by, axis, level, as_index, sort, group_keys, squeeze, observed, dropna)
1880 axis = self._get_axis_number(axis)
1882 # error: Argument "squeeze" to "SeriesGroupBy" has incompatible type
1883 # "Union[bool, NoDefault]"; expected "bool"
-> 1884 return SeriesGroupBy(
1885 obj=self,
1886 keys=by,
1887 axis=axis,
1888 level=level,
1889 as_index=as_index,
1890 sort=sort,
1891 group_keys=group_keys,
1892 squeeze=squeeze, # type: ignore[arg-type]
1893 observed=observed,
1894 dropna=dropna,
1895 )
File ~/compose/etc/conda/cuda_11.5/envs/rapids/lib/python3.9/site-packages/pandas/core/groupby/groupby.py:889, in GroupBy.__init__(self, obj, keys, axis, level, grouper, exclusions, selection, as_index, sort, group_keys, squeeze, observed, mutated, dropna)
886 if grouper is None:
887 from pandas.core.groupby.grouper import get_grouper
--> 889 grouper, exclusions, obj = get_grouper(
890 obj,
891 keys,
892 axis=axis,
893 level=level,
894 sort=sort,
895 observed=observed,
896 mutated=self.mutated,
897 dropna=self.dropna,
898 )
900 self.obj = obj
901 self.axis = obj._get_axis_number(axis)
File ~/compose/etc/conda/cuda_11.5/envs/rapids/lib/python3.9/site-packages/pandas/core/groupby/grouper.py:879, in get_grouper(obj, key, axis, level, sort, observed, mutated, validate, dropna)
871 raise ValueError(
872 f"Length of grouper ({len(gpr)}) and axis ({obj.shape[axis]}) "
873 "must be same length"
874 )
876 # create the Grouping
877 # allow us to passing the actual Grouping as the gpr
878 ping = (
--> 879 Grouping(
880 group_axis,
881 gpr,
882 obj=obj,
883 level=level,
884 sort=sort,
885 observed=observed,
886 in_axis=in_axis,
887 dropna=dropna,
888 )
889 if not isinstance(gpr, Grouping)
890 else gpr
891 )
893 groupings.append(ping)
895 if len(groupings) == 0 and len(obj):
File ~/compose/etc/conda/cuda_11.5/envs/rapids/lib/python3.9/site-packages/pandas/core/groupby/grouper.py:493, in Grouping.__init__(self, index, grouper, obj, level, sort, observed, in_axis, dropna)
485 mapper = self.grouping_vector
486 # In extant tests, the new self.grouping_vector matches
487 # `index.get_level_values(ilevel)` whenever
488 # mapper is None and isinstance(index, MultiIndex)
489 (
490 self.grouping_vector, # Index
491 self._codes,
492 self._group_index,
--> 493 ) = index._get_grouper_for_level(mapper, ilevel)
495 # a passed Grouper like, directly get the grouper in the same way
496 # as single grouper groupby, use the group_info to get codes
497 elif isinstance(self.grouping_vector, Grouper):
498 # get the new grouper; we already have disambiguated
499 # what key/level refer to exactly, don't need to
500 # check again as we have by this point converted these
501 # to an actual value (rather than a pd.Grouper)
File ~/compose/etc/conda/cuda_11.5/envs/rapids/lib/python3.9/site-packages/pandas/core/indexes/multi.py:1483, in MultiIndex._get_grouper_for_level(self, mapper, level)
1480 if mapper is not None:
1481 # Handle group mapping function and return
1482 level_values = self.levels[level].take(indexer)
-> 1483 grouper = level_values.map(mapper)
1484 return grouper, None, None
1486 codes, uniques = algos.factorize(indexer, sort=True)
File ~/compose/etc/conda/cuda_11.5/envs/rapids/lib/python3.9/site-packages/pandas/core/indexes/base.py:5506, in Index.map(self, mapper, na_action)
5486 """
5487 Map values using input correspondence (a dict, Series, or function).
5488
(...)
5502 a MultiIndex will be returned.
5503 """
5504 from pandas.core.indexes.multi import MultiIndex
-> 5506 new_values = self._map_values(mapper, na_action=na_action)
5508 attributes = self._get_attributes_dict()
5510 # we can return a MultiIndex
File ~/compose/etc/conda/cuda_11.5/envs/rapids/lib/python3.9/site-packages/pandas/core/base.py:870, in IndexOpsMixin._map_values(self, mapper, na_action)
867 raise ValueError(msg)
869 # mapper is a function
--> 870 new_values = map_f(values, mapper)
872 return new_values
File ~/compose/etc/conda/cuda_11.5/envs/rapids/lib/python3.9/site-packages/pandas/_libs/lib.pyx:2859, in pandas._libs.lib.map_infer()
TypeError: 'numpy.ndarray' object is not callable
What you expected to happen: Pandas gives the following outputs:
df.groupby(["a", "b"], dropna=False).sum()
c
a b
1.0 1.0 4
2.0 NaN 5
3.0 1.0 6
4.0 3.0 3
7.0 1.0 0
8.0 3.0 0
NaN 3.0 1
NaN 2
df.groupby(["a", "b"], dropna=False).c.sum()
a b
1.0 1.0 4
2.0 NaN 5
3.0 1.0 6
4.0 3.0 3
7.0 1.0 0
8.0 3.0 0
NaN 3.0 1
NaN 2
Name: c, dtype: int64
Environment:
- Dask version: latest
main - Python version: 3.9
- Operating System: ubuntu 18.04
- Install method (conda, pip, source): source
Hmm this is an interesting one. Thanks for writing it up. It looks like the issue is in the aggregate method. I can't quite figure out what's going wrong though.
I thought I'd be able to reproduce with this in pandas (which is what the aggregate method is supposed to do), but no luck
inner = df.groupby(["a", "b"], dropna=False).sum()
inner.groupby(["a", "b"], sort=False, level=[0, 1], dropna=False).sum()
Looks like thing that's breaking the aggregation here is the concatenation of the chunks preceding it? Doing the following in Pandas gets me the same result as Dask:
import pandas as pd
df1 = pd.DataFrame(
{
"a": [1, 2, 3, 4],
"b": [1, None, 1, 3],
"c": [4, 5, 6, 3],
}
)
df2 = pd.DataFrame(
{
"a": [None, None, 7, 8],
"b": [None, 3, 1, 3],
"c": [2, 1, 0, 0],
}
)
res1 = df1.groupby(["a", "b"], dropna=False).sum()
res2 = df2.groupby(["a", "b"], dropna=False).sum()
res = pd.concat([res1, res2]).groupby(["a", "b"], dropna=False).sum()
Opened an issue tracking this - https://github.com/pandas-dev/pandas/issues/46783
A fix was submitted and merged for this in https://github.com/pandas-dev/pandas/pull/47186, and should be included in the 1.5 release - I will go ahead and verify that things are working here with the latest source of Pandas.
Assuming things work now, how would we like to handle the required Pandas dependency? Is it okay to bump the Pandas min version for dask.dataframe to 1.5, or do we want to leave that more flexible and have a check on Pandas version for this specific groupby functionality?
Can verify that https://github.com/pandas-dev/pandas/pull/47186 has resolved the issue for dataframe groupbys - I am still getting the same traceback for series groupbys though.
Think I've isolated the series groupby failures to Pandas; as part of _groupby_aggregate, we are attempting to do a groupby on a series with a multi-index that has nulls in the first level, which seems to fail regardless of dropna:
import pandas as pd
arr = [list(range(4)) * 4, list(range(2)) * 8]
# insert nulls into first level of multi-index
arr[0][2] = None
ser = pd.Series(
list(range(16)),
index=pd.MultiIndex.from_arrays(arr, names=("a", "b"))
)
ser.groupby(level=[0, 1])
Will open up another issue to track this in Pandas.
EDIT:
Opened https://github.com/pandas-dev/pandas/issues/47348 to track this bug
Thanks for keeping on top of this Charles!
With the above pandas issues resolved, I can confirm these issues no longer occur with pandas 1.5.0rc0.
Knowing this, is there anything we want to change on Dask's end w.r.t. pandas constraints? I imagine we don't want to bump the pandas minimum dependency, not sure if it makes sense to introduce some compatibility code here to block certain groupbys if an older pandas version is detected.
Yeah we definitely don't want to bump the minimum. If it's easy to bring in some compatibility code that seems like a good approach.
Opened #9468 to add warnings around the groupby behavior in pandas < 1.5.0
Closing this since there do not seem to be any errors with pandas>=1.5. Feel free to re-open if I'm missing something.