dask-sql
dask-sql copied to clipboard
[BUG] rank() function doesn't work in dsql
What happened:
- I'm running a query in dask-sql but I'm encountering a KeyError.
What you expected to happen:
- I expected it to return the rank of each row in the output table as a column.
Minimal Complete Verifiable Example:
import cudf
from dask_sql import Context
dc = Context()
df = cudf.DataFrame({'Animal': ['cat', 'penguin', 'dog','spider'],
'Number_legs': [4, 2, 4, 8]})
dc.create_table('my_table', df)
dc.sql("select Animal, Number_legs, (rank() OVER (PARTITION BY Number_legs ORDER BY Animal)) as default_rank from my_table").compute()
Anything else we need to know?:
- Full Stack Trace:
---------------------------------------------------------------------------
KeyError Traceback (most recent call last)
~/miniconda3/envs/rapids-21.12/lib/python3.8/site-packages/dask_sql/physical/rel/logical/window.py in _extract_operations(self, window, df, dc, context)
392 try:
--> 393 operation = self.OPERATION_MAPPING[operator_name]
394 except KeyError: # pragma: no cover
KeyError: 'rank'
During handling of the above exception, another exception occurred:
AttributeError Traceback (most recent call last)
/tmp/ipykernel_45366/1186692474.py in <module>
7 'Number_legs': [4, 2, 4, 8]})
8 dc.create_table('my_table', df)
----> 9 dc.sql("select Animal, Number_legs, (rank() OVER (PARTITION BY Number_legs ORDER BY Animal)) as default_rank from my_table").compute()
~/miniconda3/envs/rapids-21.12/lib/python3.8/site-packages/dask_sql/context.py in sql(self, sql, return_futures, dataframes)
420 rel, select_names, _ = self._get_ral(sql)
421
--> 422 dc = RelConverter.convert(rel, context=self)
423
424 if dc is None:
~/miniconda3/envs/rapids-21.12/lib/python3.8/site-packages/dask_sql/physical/rel/convert.py in convert(cls, rel, context)
54 f"Processing REL {rel} using {plugin_instance.__class__.__name__}..."
55 )
---> 56 df = plugin_instance.convert(rel, context=context)
57 logger.debug(f"Processed REL {rel} into {LoggableDataFrame(df)}")
58 return df
~/miniconda3/envs/rapids-21.12/lib/python3.8/site-packages/dask_sql/physical/rel/logical/window.py in convert(self, rel, context)
249
250 for window in rel.groups:
--> 251 dc = self._apply_window(
252 window, constants, constant_count_offset, dc, field_names, context
253 )
~/miniconda3/envs/rapids-21.12/lib/python3.8/site-packages/dask_sql/physical/rel/logical/window.py in _apply_window(self, window, constants, constant_count_offset, dc, field_names, context)
292 temporary_columns += group_columns
293
--> 294 operations, df = self._extract_operations(window, df, dc, context)
295 for _, _, cols in operations:
296 temporary_columns += cols
~/miniconda3/envs/rapids-21.12/lib/python3.8/site-packages/dask_sql/physical/rel/logical/window.py in _extract_operations(self, window, df, dc, context)
394 except KeyError: # pragma: no cover
395 try:
--> 396 operation = context.functions[operator_name]
397 except KeyError: # pragma: no cover
398 raise NotImplementedError(f"{operator_name} not (yet) implemented")
AttributeError: 'Context' object has no attribute 'functions'
Environment:
dask-sql version: 0.3.10.dev30+ge811e54 Python version: 3.8 Operating System: Ubuntu 18.04 Install method (conda, pip, source): conda
Thanks for raising the issue @DaceT! I believe there's a couple of things going on here:
- Dask-sql does not support window rank functions today. Here's a list of initial aggregations supported with window operations: https://github.com/dask-contrib/dask-sql/issues/43#issuecomment-840665868
- The error message is a bit confusing because it should ideally give a
NotImplementedErrorinstead of aKeyError. The reason for that is a bug which needs to be fixed here: https://github.com/dask-contrib/dask-sql/blob/a0ea1d96dad6658cb2d6c0103d32a61db85278cc/dask_sql/physical/rel/logical/window.py#L394-L400
The check on line 398 should look at context.schema[schema_name].functions rather than context.functions similar to how it's done in other parts of the codebase like this one: https://github.com/dask-contrib/dask-sql/blob/a0ea1d96dad6658cb2d6c0103d32a61db85278cc/dask_sql/physical/rex/core/call.py#L787
Thanks, @DaceT, and @ayushdg, This is really great catch.
The check on line 398 should look at
context.schema[schema_name].functionsrather thancontext.functionssimilar to how it's done in other parts of the codebase like this one:
I am responsible for this bug of not changing from context.functions to context.schema[schema_name].functions
missed updating window functions while working on this multiple schema PR https://github.com/dask-contrib/dask-sql/pull/205.
let me fix this !
Hi Everyone,
I am playing with the ranking window function for the past few days, Need some help/suggestions to improve the feature.
Background:
- Used pandas dataframe's rank API. here
- OverOperation classes used in dask-sql gets expanding window as the argument which doesn't support rank function yet which will be supported from upcoming pandas version 1.4.0
Approach/workaround Taken:
Consider there are two cases for this problem:
- single column in order-by clause
- multiple columns in order-by clause
Single column in order-by clause :
SELECT *, RANK() OVER (ORDER BY a ASC NULLS LAST) AS a4 FROM a
# with partition:
SELECT *, RANK() OVER (PARTITION BY a ORDER BY a ASC NULLS LAST) AS a4 FROM a
In the above query, there is only one column a in order-by clause, implementing the above case is straightforward because there is a one-to-one mapping between the SQL and pandas rank methods.
df.rank(method='min',na_option="bottom",ascending=True)
# same applies for partition rank
df.groupby("a").rank(method='min',na_option="bottom",ascending=True)
Multiple columns in order-by clause :
SELECT *,
RANK() OVER (ORDER BY a NULLS FIRST, b NULLS FIRST, e) AS a3
FROM a
ORDER BY a NULLS FIRST, b NULLS FIRST, c NULLS FIRST, d NULLS FIRST, e
To handle multiple columns in the order by clause, we are changing the type of the columns into a string datatype and combining multiple columns into single tuple columns (tuples of string).
res = (
df.obj[sort_cols]
.astype(str)
.apply(tuple, axis=1)
.rank(method="min", na_option=na_option, ascending=sort_ascending)
.astype(int)
)
so na_option and sort_ascending can be specified for only for the combined columns, not sure how to handle multiple column's [a ASC NULLS LAST, b DESC NULLS FIRST ] tried sorting the data frame first (and used sort_ascending and na_position parameters of pd.DataFrame.sort_values) and then applied the rank function but doesn't seem to be matching the SQLite results.
Please do let me know if there is any feedback/suggestions to handle these multiple columns case 🤔 , is something I am missing here ? or any other better approach?
The Feature branch I am working on currently was here
Apologies for the long post,
Thanks in Advance
@rajagurunath is there any update on the implementation of rank/rownum support in dask-sql?