ibis icon indicating copy to clipboard operation
ibis copied to clipboard

bug: pandas backend error on `case()` + grouped aggregation

Open timothydijamco opened this issue 3 years ago • 4 comments

case() + ungrouped aggregation

This works OK.

In [1]: import pandas as pd

In [2]: import ibis

In [3]: backend = ibis.backends.pandas.Backend()

In [4]: conn = backend.connect({})

In [5]: table = conn.from_dataframe(pd.DataFrame({
   ...:     'key': [1, 1, 2, 2],
   ...:     'value': [10, 30, 20, 40],
   ...: }), 't1')

In [6]: case_expr = (
   ...:     ibis.case().when(
   ...:         table['value'] < 25,
   ...:         table['value'],
   ...:     ).else_(ibis.null()).end()
   ...: )

In [7]: expr_ungrouped = table.aggregate(case_expr.max())

In [8]: expr_ungrouped.execute()
Out[8]: 
    max
0  20.0

case() + grouped aggregation

This hits an error on the Pandas backend.

In [7]: expr = table.groupby('key').aggregate(case_expr.max())

In [8]: expr.execute()
/home/vagrant/ibis-pip/lib/python3.9/site-packages/numpy/lib/function_base.py:793: VisibleDeprecationWarning: Creating an ndarray from ragged nested sequences (which is a list-or-tuple of lists-or-tuples-or ndarrays with different lengths or shapes) is deprecated. If you meant to do this, you must specify 'dtype=object' when creating the ndarray.
  choicelist = [np.asarray(choice) for choice in choicelist]
/home/vagrant/ibis-pip/lib/python3.9/site-packages/numpy/lib/stride_tricks.py:537: VisibleDeprecationWarning: Creating an ndarray from ragged nested sequences (which is a list-or-tuple of lists-or-tuples-or ndarrays with different lengths or shapes) is deprecated. If you meant to do this, you must specify 'dtype=object' when creating the ndarray.
  args = [np.array(_m, copy=False, subok=subok) for _m in args]
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Input In [8], in <cell line: 1>()
----> 1 expr.execute()

File ~/ibis-pip/lib/python3.9/site-packages/ibis/expr/types.py:275, in Expr.execute(self, limit, timecontext, params, **kwargs)
    245 def execute(
    246     self,
    247     limit='default',
   (...)
    250     **kwargs,
    251 ):
    252     """
    253     If this expression is based on physical tables in a database backend,
    254     execute it against that backend.
   (...)
    273       Result of compiling expression and executing in backend
    274     """
--> 275     return self._find_backend().execute(
    276         self, limit=limit, timecontext=timecontext, params=params, **kwargs
    277     )

File ~/ibis-pip/lib/python3.9/site-packages/ibis/backends/pandas/__init__.py:119, in Backend.execute(self, query, params, limit, **kwargs)
    113 if not isinstance(query, ir.Expr):
    114     raise TypeError(
    115         "`query` has type {!r}, expected ibis.expr.types.Expr".format(
    116             type(query).__name__
    117         )
    118     )
--> 119 return execute_and_reset(query, params=params, **kwargs)

File ~/ibis-pip/lib/python3.9/site-packages/ibis/backends/pandas/core.py:487, in execute_and_reset(expr, params, scope, timecontext, aggcontext, **kwargs)
    441 def execute_and_reset(
    442     expr,
    443     params=None,
   (...)
    447     **kwargs,
    448 ):
    449     """Execute an expression against data that are bound to it. If no data
    450     are bound, raise an Exception.
    451 
   (...)
    485         * If no data are bound to the input expression
    486     """
--> 487     result = execute(
    488         expr,
    489         params=params,
    490         scope=scope,
    491         timecontext=timecontext,
    492         aggcontext=aggcontext,
    493         **kwargs,
    494     )
    495     if isinstance(result, pd.DataFrame):
    496         schema = expr.schema()

File ~/ibis-pip/lib/python3.9/site-packages/multipledispatch/dispatcher.py:278, in Dispatcher.__call__(self, *args, **kwargs)
    276     self._cache[types] = func
    277 try:
--> 278     return func(*args, **kwargs)
    280 except MDNotImplementedError:
    281     funcs = self.dispatch_iter(*types)

File ~/ibis-pip/lib/python3.9/site-packages/ibis/backends/pandas/trace.py:138, in trace.<locals>.traced_func(*args, **kwargs)
    135 trace_enabled = get_option(_TRACE_CONFIG)
    137 if not trace_enabled:
--> 138     return func(*args, **kwargs)
    139 else:
    140     start = datetime.now()

File ~/ibis-pip/lib/python3.9/site-packages/ibis/backends/pandas/core.py:432, in main_execute(expr, params, scope, timecontext, aggcontext, **kwargs)
    430 params = {k.op() if hasattr(k, 'op') else k: v for k, v in params.items()}
    431 scope = scope.merge_scope(Scope(params, timecontext))
--> 432 return execute_with_scope(
    433     expr,
    434     scope,
    435     timecontext=timecontext,
    436     aggcontext=aggcontext,
    437     **kwargs,
    438 )

File ~/ibis-pip/lib/python3.9/site-packages/ibis/backends/pandas/core.py:218, in execute_with_scope(expr, scope, timecontext, aggcontext, clients, **kwargs)
    209 pre_executed_scope = pre_execute(
    210     op,
    211     *clients,
   (...)
    215     **kwargs,
    216 )
    217 new_scope = scope.merge_scope(pre_executed_scope)
--> 218 result = execute_until_in_scope(
    219     expr,
    220     new_scope,
    221     timecontext=timecontext,
    222     aggcontext=aggcontext,
    223     clients=clients,
    224     # XXX: we *explicitly* pass in scope and not new_scope here so that
    225     # post_execute sees the scope of execute_with_scope, not the scope of
    226     # execute_until_in_scope
    227     post_execute_=functools.partial(
    228         post_execute,
    229         scope=scope,
    230         timecontext=timecontext,
    231         aggcontext=aggcontext,
    232         clients=clients,
    233         **kwargs,
    234     ),
    235     **kwargs,
    236 ).get_value(op, timecontext)
    237 return result

File ~/ibis-pip/lib/python3.9/site-packages/ibis/backends/pandas/trace.py:138, in trace.<locals>.traced_func(*args, **kwargs)
    135 trace_enabled = get_option(_TRACE_CONFIG)
    137 if not trace_enabled:
--> 138     return func(*args, **kwargs)
    139 else:
    140     start = datetime.now()

File ~/ibis-pip/lib/python3.9/site-packages/ibis/backends/pandas/core.py:359, in execute_until_in_scope(expr, scope, timecontext, aggcontext, clients, post_execute_, **kwargs)
    352 # pass our computed arguments to this node's execute_node implementation
    353 data = [
    354     new_scope.get_value(arg.op(), timecontext)
    355     if hasattr(arg, 'op')
    356     else arg
    357     for (arg, timecontext) in zip(computable_args, arg_timecontexts)
    358 ]
--> 359 result = execute_node(
    360     op,
    361     *data,
    362     scope=scope,
    363     timecontext=timecontext,
    364     aggcontext=aggcontext,
    365     clients=clients,
    366     **kwargs,
    367 )
    368 computed = post_execute_(op, result, timecontext=timecontext)
    369 return Scope({op: computed}, timecontext)

File ~/ibis-pip/lib/python3.9/site-packages/multipledispatch/dispatcher.py:278, in Dispatcher.__call__(self, *args, **kwargs)
    276     self._cache[types] = func
    277 try:
--> 278     return func(*args, **kwargs)
    280 except MDNotImplementedError:
    281     funcs = self.dispatch_iter(*types)

File ~/ibis-pip/lib/python3.9/site-packages/ibis/backends/pandas/trace.py:138, in trace.<locals>.traced_func(*args, **kwargs)
    135 trace_enabled = get_option(_TRACE_CONFIG)
    137 if not trace_enabled:
--> 138     return func(*args, **kwargs)
    139 else:
    140     start = datetime.now()

File ~/ibis-pip/lib/python3.9/site-packages/ibis/backends/pandas/execution/generic.py:438, in execute_aggregation_dataframe(op, data, scope, timecontext, **kwargs)
    434     source = data
    436 scope = scope.merge_scope(Scope({op.table.op(): source}, timecontext))
--> 438 pieces = [
    439     coerce_to_output(
    440         execute(metric, scope=scope, timecontext=timecontext, **kwargs),
    441         metric,
    442     )
    443     for metric in op.metrics
    444 ]
    446 result = pd.concat(pieces, axis=1)
    448 # If grouping, need a reset to get the grouping key back as a column

File ~/ibis-pip/lib/python3.9/site-packages/ibis/backends/pandas/execution/generic.py:440, in <listcomp>(.0)
    434     source = data
    436 scope = scope.merge_scope(Scope({op.table.op(): source}, timecontext))
    438 pieces = [
    439     coerce_to_output(
--> 440         execute(metric, scope=scope, timecontext=timecontext, **kwargs),
    441         metric,
    442     )
    443     for metric in op.metrics
    444 ]
    446 result = pd.concat(pieces, axis=1)
    448 # If grouping, need a reset to get the grouping key back as a column

File ~/ibis-pip/lib/python3.9/site-packages/multipledispatch/dispatcher.py:278, in Dispatcher.__call__(self, *args, **kwargs)
    276     self._cache[types] = func
    277 try:
--> 278     return func(*args, **kwargs)
    280 except MDNotImplementedError:
    281     funcs = self.dispatch_iter(*types)

File ~/ibis-pip/lib/python3.9/site-packages/ibis/backends/pandas/trace.py:138, in trace.<locals>.traced_func(*args, **kwargs)
    135 trace_enabled = get_option(_TRACE_CONFIG)
    137 if not trace_enabled:
--> 138     return func(*args, **kwargs)
    139 else:
    140     start = datetime.now()

File ~/ibis-pip/lib/python3.9/site-packages/ibis/backends/pandas/core.py:432, in main_execute(expr, params, scope, timecontext, aggcontext, **kwargs)
    430 params = {k.op() if hasattr(k, 'op') else k: v for k, v in params.items()}
    431 scope = scope.merge_scope(Scope(params, timecontext))
--> 432 return execute_with_scope(
    433     expr,
    434     scope,
    435     timecontext=timecontext,
    436     aggcontext=aggcontext,
    437     **kwargs,
    438 )

File ~/ibis-pip/lib/python3.9/site-packages/ibis/backends/pandas/core.py:218, in execute_with_scope(expr, scope, timecontext, aggcontext, clients, **kwargs)
    209 pre_executed_scope = pre_execute(
    210     op,
    211     *clients,
   (...)
    215     **kwargs,
    216 )
    217 new_scope = scope.merge_scope(pre_executed_scope)
--> 218 result = execute_until_in_scope(
    219     expr,
    220     new_scope,
    221     timecontext=timecontext,
    222     aggcontext=aggcontext,
    223     clients=clients,
    224     # XXX: we *explicitly* pass in scope and not new_scope here so that
    225     # post_execute sees the scope of execute_with_scope, not the scope of
    226     # execute_until_in_scope
    227     post_execute_=functools.partial(
    228         post_execute,
    229         scope=scope,
    230         timecontext=timecontext,
    231         aggcontext=aggcontext,
    232         clients=clients,
    233         **kwargs,
    234     ),
    235     **kwargs,
    236 ).get_value(op, timecontext)
    237 return result

File ~/ibis-pip/lib/python3.9/site-packages/ibis/backends/pandas/trace.py:138, in trace.<locals>.traced_func(*args, **kwargs)
    135 trace_enabled = get_option(_TRACE_CONFIG)
    137 if not trace_enabled:
--> 138     return func(*args, **kwargs)
    139 else:
    140     start = datetime.now()

File ~/ibis-pip/lib/python3.9/site-packages/ibis/backends/pandas/core.py:327, in execute_until_in_scope(expr, scope, timecontext, aggcontext, clients, post_execute_, **kwargs)
    321 if len(arg_timecontexts) != len(computable_args):
    322     raise com.IbisError(
    323         'arg_timecontexts differ with computable_arg in length '
    324         f'for type:\n{type(op).__name__}.'
    325     )
--> 327 scopes = [
    328     execute_until_in_scope(
    329         arg,
    330         new_scope,
    331         timecontext=timecontext,
    332         aggcontext=aggcontext,
    333         post_execute_=post_execute_,
    334         clients=clients,
    335         **kwargs,
    336     )
    337     if hasattr(arg, 'op')
    338     else Scope({arg: arg}, timecontext)
    339     for (arg, timecontext) in zip(computable_args, arg_timecontexts)
    340 ]
    342 # if we're unable to find data then raise an exception
    343 if not scopes and computable_args:

File ~/ibis-pip/lib/python3.9/site-packages/ibis/backends/pandas/core.py:328, in <listcomp>(.0)
    321 if len(arg_timecontexts) != len(computable_args):
    322     raise com.IbisError(
    323         'arg_timecontexts differ with computable_arg in length '
    324         f'for type:\n{type(op).__name__}.'
    325     )
    327 scopes = [
--> 328     execute_until_in_scope(
    329         arg,
    330         new_scope,
    331         timecontext=timecontext,
    332         aggcontext=aggcontext,
    333         post_execute_=post_execute_,
    334         clients=clients,
    335         **kwargs,
    336     )
    337     if hasattr(arg, 'op')
    338     else Scope({arg: arg}, timecontext)
    339     for (arg, timecontext) in zip(computable_args, arg_timecontexts)
    340 ]
    342 # if we're unable to find data then raise an exception
    343 if not scopes and computable_args:

File ~/ibis-pip/lib/python3.9/site-packages/ibis/backends/pandas/trace.py:138, in trace.<locals>.traced_func(*args, **kwargs)
    135 trace_enabled = get_option(_TRACE_CONFIG)
    137 if not trace_enabled:
--> 138     return func(*args, **kwargs)
    139 else:
    140     start = datetime.now()

File ~/ibis-pip/lib/python3.9/site-packages/ibis/backends/pandas/core.py:359, in execute_until_in_scope(expr, scope, timecontext, aggcontext, clients, post_execute_, **kwargs)
    352 # pass our computed arguments to this node's execute_node implementation
    353 data = [
    354     new_scope.get_value(arg.op(), timecontext)
    355     if hasattr(arg, 'op')
    356     else arg
    357     for (arg, timecontext) in zip(computable_args, arg_timecontexts)
    358 ]
--> 359 result = execute_node(
    360     op,
    361     *data,
    362     scope=scope,
    363     timecontext=timecontext,
    364     aggcontext=aggcontext,
    365     clients=clients,
    366     **kwargs,
    367 )
    368 computed = post_execute_(op, result, timecontext=timecontext)
    369 return Scope({op: computed}, timecontext)

File ~/ibis-pip/lib/python3.9/site-packages/multipledispatch/dispatcher.py:278, in Dispatcher.__call__(self, *args, **kwargs)
    276     self._cache[types] = func
    277 try:
--> 278     return func(*args, **kwargs)
    280 except MDNotImplementedError:
    281     funcs = self.dispatch_iter(*types)

File ~/ibis-pip/lib/python3.9/site-packages/ibis/backends/pandas/trace.py:138, in trace.<locals>.traced_func(*args, **kwargs)
    135 trace_enabled = get_option(_TRACE_CONFIG)
    137 if not trace_enabled:
--> 138     return func(*args, **kwargs)
    139 else:
    140     start = datetime.now()

File ~/ibis-pip/lib/python3.9/site-packages/ibis/backends/pandas/execution/generic.py:1088, in execute_searched_case(op, whens, thens, otherwise, **kwargs)
   1086 if otherwise is None:
   1087     otherwise = np.nan
-> 1088 raw = np.select(whens, thens, otherwise)
   1089 return wrap_case_result(raw, op.to_expr())

File <__array_function__ internals>:180, in select(*args, **kwargs)

File ~/ibis-pip/lib/python3.9/site-packages/numpy/lib/function_base.py:820, in select(condlist, choicelist, default)
    818 for i, cond in enumerate(condlist):
    819     if cond.dtype.type is not np.bool_:
--> 820         raise TypeError(
    821             'invalid entry {} in condlist: should be boolean ndarray'.format(i))
    823 if choicelist[0].ndim == 0:
    824     # This may be common, so avoid the call.
    825     result_shape = condlist[0].shape

TypeError: invalid entry 0 in condlist: should be boolean ndarray

On this line, whens and thens are both lists containing SeriesGroupBys, which np.select does not know how to handle. I think we need to add a new branch to execute_searched_case to handle the grouped aggregation case. https://github.com/ibis-project/ibis/blob/1f2b3fae8f8259da64004e1efc3849c59939081a/ibis/backends/pandas/execution/generic.py#L1103-L1108

timothydijamco avatar Apr 05 '22 03:04 timothydijamco

@timothydijamco Thanks for the (as usual!) great report.

If you want to put up a PR to address the issue this week then we can get it into 3.0, otherwise we can get it into 3.1 or 3.0.1.

cpcloud avatar Apr 05 '22 11:04 cpcloud

@timothydijamco Are you planning to get to this before 3.1? We'd like to release in a week or two.

cpcloud avatar Jul 01 '22 16:07 cpcloud

I don't expect to get to it by then–thanks for checking!

timothydijamco avatar Jul 01 '22 17:07 timothydijamco

@timothydijamco Any chance you might be able to wrap this up for 3.2? If not, it'll go into 4.0!

cpcloud avatar Sep 09 '22 12:09 cpcloud

The issue seems to have been resolved in version 5.1.0, it no longer raises any exceptions.

Code:

import pandas as pd
import ibis

df = pd.DataFrame({'key': [1, 1, 2, 2], 'value': [10, 30, 20, 40], })
table = ibis.memtable(df, name="t1")

case_expr = (
    ibis.case().when(
        table['value'] < 25,
        table['value'],
    ).else_(ibis.null()).end()
)

expr_ungrouped = table.aggregate(case_expr.max())
result = expr_ungrouped.execute()
print(result)

expr = table.group_by('key').aggregate(case_expr.max())
result = expr.execute()
print(result)

Output

   Max(SearchedCase(None))
0                       20
   key  Max(SearchedCase(None))
0    1                       10
1    2                       20

mesejo avatar Apr 28 '23 11:04 mesejo

@mesejo It looks like you're running with the default backend (DuckDB) which won't exhibit this problem.

If you add

ibis.set_backend("pandas")

just after import ibis you'll see the failure.

cpcloud avatar Apr 30 '23 11:04 cpcloud