dask-sql icon indicating copy to clipboard operation
dask-sql copied to clipboard

[BUG] rank() function doesn't work in dsql

Open DaceT opened this issue 4 years ago • 4 comments

What happened:

  • I'm running a query in dask-sql but I'm encountering a KeyError.

What you expected to happen:

  • I expected it to return the rank of each row in the output table as a column.

Minimal Complete Verifiable Example:

import cudf
from dask_sql import Context
dc = Context()

df = cudf.DataFrame({'Animal': ['cat', 'penguin', 'dog','spider'],
                        'Number_legs': [4, 2, 4, 8]})
dc.create_table('my_table', df)
dc.sql("select Animal, Number_legs, (rank() OVER (PARTITION BY Number_legs ORDER BY Animal)) as default_rank from my_table").compute()

Anything else we need to know?:

  • Full Stack Trace:
---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
~/miniconda3/envs/rapids-21.12/lib/python3.8/site-packages/dask_sql/physical/rel/logical/window.py in _extract_operations(self, window, df, dc, context)
    392             try:
--> 393                 operation = self.OPERATION_MAPPING[operator_name]
    394             except KeyError:  # pragma: no cover

KeyError: 'rank'

During handling of the above exception, another exception occurred:

AttributeError                            Traceback (most recent call last)
/tmp/ipykernel_45366/1186692474.py in <module>
      7                         'Number_legs': [4, 2, 4, 8]})
      8 dc.create_table('my_table', df)
----> 9 dc.sql("select Animal, Number_legs, (rank() OVER (PARTITION BY Number_legs ORDER BY Animal)) as default_rank from my_table").compute()

~/miniconda3/envs/rapids-21.12/lib/python3.8/site-packages/dask_sql/context.py in sql(self, sql, return_futures, dataframes)
    420         rel, select_names, _ = self._get_ral(sql)
    421 
--> 422         dc = RelConverter.convert(rel, context=self)
    423 
    424         if dc is None:

~/miniconda3/envs/rapids-21.12/lib/python3.8/site-packages/dask_sql/physical/rel/convert.py in convert(cls, rel, context)
     54             f"Processing REL {rel} using {plugin_instance.__class__.__name__}..."
     55         )
---> 56         df = plugin_instance.convert(rel, context=context)
     57         logger.debug(f"Processed REL {rel} into {LoggableDataFrame(df)}")
     58         return df

~/miniconda3/envs/rapids-21.12/lib/python3.8/site-packages/dask_sql/physical/rel/logical/window.py in convert(self, rel, context)
    249 
    250         for window in rel.groups:
--> 251             dc = self._apply_window(
    252                 window, constants, constant_count_offset, dc, field_names, context
    253             )

~/miniconda3/envs/rapids-21.12/lib/python3.8/site-packages/dask_sql/physical/rel/logical/window.py in _apply_window(self, window, constants, constant_count_offset, dc, field_names, context)
    292         temporary_columns += group_columns
    293 
--> 294         operations, df = self._extract_operations(window, df, dc, context)
    295         for _, _, cols in operations:
    296             temporary_columns += cols

~/miniconda3/envs/rapids-21.12/lib/python3.8/site-packages/dask_sql/physical/rel/logical/window.py in _extract_operations(self, window, df, dc, context)
    394             except KeyError:  # pragma: no cover
    395                 try:
--> 396                     operation = context.functions[operator_name]
    397                 except KeyError:  # pragma: no cover
    398                     raise NotImplementedError(f"{operator_name} not (yet) implemented")

AttributeError: 'Context' object has no attribute 'functions'

Environment:

dask-sql version: 0.3.10.dev30+ge811e54 Python version: 3.8 Operating System: Ubuntu 18.04 Install method (conda, pip, source): conda

DaceT avatar Oct 20 '21 20:10 DaceT

Thanks for raising the issue @DaceT! I believe there's a couple of things going on here:

  1. Dask-sql does not support window rank functions today. Here's a list of initial aggregations supported with window operations: https://github.com/dask-contrib/dask-sql/issues/43#issuecomment-840665868
  2. The error message is a bit confusing because it should ideally give a NotImplementedError instead of a KeyError. The reason for that is a bug which needs to be fixed here: https://github.com/dask-contrib/dask-sql/blob/a0ea1d96dad6658cb2d6c0103d32a61db85278cc/dask_sql/physical/rel/logical/window.py#L394-L400

The check on line 398 should look at context.schema[schema_name].functions rather than context.functions similar to how it's done in other parts of the codebase like this one: https://github.com/dask-contrib/dask-sql/blob/a0ea1d96dad6658cb2d6c0103d32a61db85278cc/dask_sql/physical/rex/core/call.py#L787

ayushdg avatar Oct 20 '21 20:10 ayushdg

Thanks, @DaceT, and @ayushdg, This is really great catch.

The check on line 398 should look at context.schema[schema_name].functions rather than context.functions similar to how it's done in other parts of the codebase like this one:

I am responsible for this bug of not changing from context.functions to context.schema[schema_name].functions missed updating window functions while working on this multiple schema PR https://github.com/dask-contrib/dask-sql/pull/205.

let me fix this !

rajagurunath avatar Oct 22 '21 18:10 rajagurunath

Hi Everyone,

I am playing with the ranking window function for the past few days, Need some help/suggestions to improve the feature.

Background:

  1. Used pandas dataframe's rank API. here
  2. OverOperation classes used in dask-sql gets expanding window as the argument which doesn't support rank function yet which will be supported from upcoming pandas version 1.4.0

Approach/workaround Taken:

Consider there are two cases for this problem:

  • single column in order-by clause
  • multiple columns in order-by clause

Single column in order-by clause :

 SELECT *,  RANK() OVER (ORDER BY a ASC NULLS LAST) AS a4 FROM a
# with partition:
 SELECT *,  RANK() OVER (PARTITION BY a ORDER BY a ASC NULLS LAST) AS a4 FROM a

In the above query, there is only one column a in order-by clause, implementing the above case is straightforward because there is a one-to-one mapping between the SQL and pandas rank methods.

df.rank(method='min',na_option="bottom",ascending=True) 
# same applies for partition rank
df.groupby("a").rank(method='min',na_option="bottom",ascending=True) 

Multiple columns in order-by clause :

SELECT *, 
RANK() OVER (ORDER BY a NULLS FIRST, b  NULLS FIRST, e) AS a3
FROM a
ORDER BY a NULLS FIRST, b NULLS FIRST, c NULLS FIRST, d NULLS FIRST, e

To handle multiple columns in the order by clause, we are changing the type of the columns into a string datatype and combining multiple columns into single tuple columns (tuples of string).

res = (
          df.obj[sort_cols]
          .astype(str)
          .apply(tuple, axis=1)
          .rank(method="min", na_option=na_option, ascending=sort_ascending)
          .astype(int)
        )

so na_option and sort_ascending can be specified for only for the combined columns, not sure how to handle multiple column's [a ASC NULLS LAST, b DESC NULLS FIRST ] tried sorting the data frame first (and used sort_ascending and na_position parameters of pd.DataFrame.sort_values) and then applied the rank function but doesn't seem to be matching the SQLite results.

Please do let me know if there is any feedback/suggestions to handle these multiple columns case 🤔 , is something I am missing here ? or any other better approach?

The Feature branch I am working on currently was here

Apologies for the long post,

Thanks in Advance

rajagurunath avatar Oct 26 '21 20:10 rajagurunath

@rajagurunath is there any update on the implementation of rank/rownum support in dask-sql?

paarivarik-engineer avatar Jul 11 '22 10:07 paarivarik-engineer