dask-sql icon indicating copy to clipboard operation
dask-sql copied to clipboard

Add STDDEV, STDDEV_SAMP, and STDDEV_POP

Open ChrisJar opened this issue 3 years ago • 8 comments

Closes #608

Blocked by: https://github.com/rapidsai/cudf/issues/11515

Note: currently, performing multiple aggregations at once seems to result in incorrect values. Ex: SELECT STDDEV(a) AS s1, STDDEV_POP(a) AS s2 FROM df returns the same result for both s1 and s2 but running two separate queries (one for each aggregation) returns the correct results (#655)

ChrisJar avatar Jul 15 '22 15:07 ChrisJar

Codecov Report

:exclamation: No coverage uploaded for pull request base (datafusion-sql-planner@4b291ed). Click here to learn what that means. The diff coverage is n/a.

@@                    Coverage Diff                    @@
##             datafusion-sql-planner     #629   +/-   ##
=========================================================
  Coverage                          ?   67.25%           
=========================================================
  Files                             ?       73           
  Lines                             ?     3637           
  Branches                          ?      753           
=========================================================
  Hits                              ?     2446           
  Misses                            ?     1050           
  Partials                          ?      141           

:mega: We’re building smart automated test selection to slash your CI/CD build times. Learn more

codecov-commenter avatar Jul 15 '22 15:07 codecov-commenter

Thanks for opening this up @ChrisJar. Is this ready for initial reviews?

ayushdg avatar Jul 19 '22 00:07 ayushdg

I'm still looking into the relational conversion error (seems to be an issue with the column name), but yeah I think I'm ready for a first round of reviews.

ChrisJar avatar Jul 19 '22 15:07 ChrisJar

I'm still looking into the relational conversion error (seems to be an issue with the column name), but yeah I think I'm ready for a first round of reviews.

Could you drop a link to that issue?

ayushdg avatar Jul 22 '22 17:07 ayushdg

I haven't been able to create a minimal reproducer, but here's a traceback from using this PR to run:

import pandas as pd
from dask_sql import Context

df = pd.DataFrame({"a":[1,1,2,1,2,2], "b":[4,5,6,4,1,5]})
c.create_table("df", df)

c.sql("SELECT STDDEV_POP(b) FROM df GROUP BY a").compute()
Traceback

  ---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
File ~/miniconda3/envs/dask-sql-datafusion/lib/python3.9/site-packages/pandas/core/indexes/base.py:3621, in Index.get_loc(self, key, method, tolerance)
   3620 try:
-> 3621     return self._engine.get_loc(casted_key)
   3622 except KeyError as err:

File ~/miniconda3/envs/dask-sql-datafusion/lib/python3.9/site-packages/pandas/_libs/index.pyx:136, in pandas._libs.index.IndexEngine.get_loc()

File ~/miniconda3/envs/dask-sql-datafusion/lib/python3.9/site-packages/pandas/_libs/index.pyx:163, in pandas._libs.index.IndexEngine.get_loc()

File pandas/_libs/hashtable_class_helper.pxi:5198, in pandas._libs.hashtable.PyObjectHashTable.get_item()

File pandas/_libs/hashtable_class_helper.pxi:5206, in pandas._libs.hashtable.PyObjectHashTable.get_item()

KeyError: 'df.a'

The above exception was the direct cause of the following exception:

KeyError Traceback (most recent call last) Input In [21], in <cell line: 1>() ----> 1 c.sql("SELECT STDDEV_POP(b) FROM df GROUP BY a").compute()

File ~/dask-sql/dask_sql/context.py:495, in Context.sql(self, sql, return_futures, dataframes, gpu, config_options) 491 self.create_table(df_name, df, gpu=gpu) 493 rel, select_fields, _ = self._get_ral(sql) --> 495 dc = RelConverter.convert(rel, context=self) 497 if rel.get_current_node_type() == "Explain": 498 return dc

File ~/dask-sql/dask_sql/physical/rel/convert.py:61, in RelConverter.convert(cls, rel, context) 55 raise NotImplementedError( 56 f"No relational conversion for node type {node_type} available (yet)." 57 ) 58 logger.debug( 59 f"Processing REL {rel} using {plugin_instance.class.name}..." 60 ) ---> 61 df = plugin_instance.convert(rel, context=context) 62 logger.debug(f"Processed REL {rel} into {LoggableDataFrame(df)}") 63 return df

File ~/dask-sql/dask_sql/physical/rel/logical/project.py:28, in DaskProjectPlugin.convert(self, rel, context) 26 def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContainer: 27 # Get the input of the previous step ---> 28 (dc,) = self.assert_inputs(rel, 1, context) 30 df = dc.df 31 cc = dc.column_container

File ~/dask-sql/dask_sql/physical/rel/base.py:84, in BaseRelPlugin.assert_inputs(rel, n, context) 81 # Late import to remove cycling dependency 82 from dask_sql.physical.rel.convert import RelConverter ---> 84 return [RelConverter.convert(input_rel, context) for input_rel in input_rels]

File ~/dask-sql/dask_sql/physical/rel/base.py:84, in (.0) 81 # Late import to remove cycling dependency 82 from dask_sql.physical.rel.convert import RelConverter ---> 84 return [RelConverter.convert(input_rel, context) for input_rel in input_rels]

File ~/dask-sql/dask_sql/physical/rel/convert.py:61, in RelConverter.convert(cls, rel, context) 55 raise NotImplementedError( 56 f"No relational conversion for node type {node_type} available (yet)." 57 ) 58 logger.debug( 59 f"Processing REL {rel} using {plugin_instance.class.name}..." 60 ) ---> 61 df = plugin_instance.convert(rel, context=context) 62 logger.debug(f"Processed REL {rel} into {LoggableDataFrame(df)}") 63 return df

File ~/dask-sql/dask_sql/physical/rel/logical/aggregate.py:195, in DaskAggregatePlugin.convert(self, rel, context) 192 logger.debug("Performing full-table aggregation") 194 # Do all aggregates --> 195 df_result, output_column_order = self._do_aggregations( 196 rel, 197 dc, 198 group_columns, 199 context, 200 ) 202 # SQL does not care about the index, but we do not want to have any multiindices 203 df_agg = df_result.reset_index(drop=True)

File ~/dask-sql/dask_sql/physical/rel/logical/aggregate.py:276, in DaskAggregatePlugin._do_aggregations(self, rel, dc, group_columns, context) 274 if key in collected_aggregations: 275 aggregations = collected_aggregations.pop(key) --> 276 df_result = self._perform_aggregation( 277 DataContainer(df, cc), 278 None, 279 aggregations, 280 additional_column_name, 281 group_columns, 282 groupby_agg_options, 283 ) 285 # Now we can also the the rest 286 for filter_column, aggregations in collected_aggregations.items():

File ~/dask-sql/dask_sql/physical/rel/logical/aggregate.py:461, in DaskAggregatePlugin._perform_aggregation(self, dc, filter_column, aggregations, additional_column_name, group_columns, groupby_agg_options) 457 grouped_df = tmp_df.groupby( 458 by=(group_columns or [additional_column_name]), dropna=False 459 ) 460 else: --> 461 group_columns = [tmp_df[group_column] for group_column in group_columns] 462 group_columns_and_nulls = get_groupby_with_nulls_cols( 463 tmp_df, group_columns, additional_column_name 464 ) 465 grouped_df = tmp_df.groupby(by=group_columns_and_nulls)

File ~/dask-sql/dask_sql/physical/rel/logical/aggregate.py:461, in (.0) 457 grouped_df = tmp_df.groupby( 458 by=(group_columns or [additional_column_name]), dropna=False 459 ) 460 else: --> 461 group_columns = [tmp_df[group_column] for group_column in group_columns] 462 group_columns_and_nulls = get_groupby_with_nulls_cols( 463 tmp_df, group_columns, additional_column_name 464 ) 465 grouped_df = tmp_df.groupby(by=group_columns_and_nulls)

File ~/miniconda3/envs/dask-sql-datafusion/lib/python3.9/site-packages/dask/dataframe/core.py:4441, in DataFrame.getitem(self, key) 4438 return self.loc[key] 4440 # error is raised from pandas -> 4441 meta = self._meta[_extract_meta(key)] 4442 dsk = partitionwise_graph(operator.getitem, name, self, key) 4443 graph = HighLevelGraph.from_collections(name, dsk, dependencies=[self])

File ~/miniconda3/envs/dask-sql-datafusion/lib/python3.9/site-packages/pandas/core/frame.py:3505, in DataFrame.getitem(self, key) 3503 if self.columns.nlevels > 1: 3504 return self._getitem_multilevel(key) -> 3505 indexer = self.columns.get_loc(key) 3506 if is_integer(indexer): 3507 indexer = [indexer]

File ~/miniconda3/envs/dask-sql-datafusion/lib/python3.9/site-packages/pandas/core/indexes/base.py:3623, in Index.get_loc(self, key, method, tolerance) 3621 return self._engine.get_loc(casted_key) 3622 except KeyError as err: -> 3623 raise KeyError(key) from err 3624 except TypeError: 3625 # If we have a listlike key, _check_indexing_error will raise 3626 # InvalidIndexError. Otherwise we fall through and re-raise 3627 # the TypeError. 3628 self._check_indexing_error(key)

KeyError: 'df.a'

For some reason the column_name function in dask_planner is returning df.a as the column name instead of a.

ChrisJar avatar Jul 22 '22 18:07 ChrisJar

Digging into this more, it looks like we expect column_name to return a fully-qualified column name and extract the column name itself using get_backend_by_frontend_name; it looks like we're only doing this for the fast groupby codepath, making it so aggregations that don't take this codepath end up failing; for example, the following code fails without this PR:

import pandas as pd
from dask_sql import Context

c = Context()

df = pd.DataFrame({
    "a":[1, 2, 1, 2, 1, 2],
    "b":["1", "2", "3", "4", "5", "6"]  # object dtype forces a slow groupby
})

c.create_table("df", df)

c.sql("SELECT MAX(b) FROM df GROUP BY a")  # KeyError: 'df.a'

There are definitely tests using the slow groupby codepath that would've revealed this issue (such as test_agg_min_max), but it looks like those are failing before we even reach the Python aggregation code. Adding tests to this PR should give us that coverage, so feel free to include this patch here, which should resolve your reproducer:

diff --git a/dask_sql/physical/rel/logical/aggregate.py b/dask_sql/physical/rel/logical/aggregate.py
index 333fbea..54a9597 100644
--- a/dask_sql/physical/rel/logical/aggregate.py
+++ b/dask_sql/physical/rel/logical/aggregate.py
@@ -447,13 +447,15 @@ class DaskAggregatePlugin(BaseRelPlugin):
         if additional_column_name is None:
             additional_column_name = new_temporary_column(dc.df)
 
+        # extract group column names
+        group_columns = [
+            dc.column_container.get_backend_by_frontend_name(group_name)
+            for group_name in group_columns
+        ]
+
         # perform groupby operation; if we are using custom aggregations, we must handle
         # null values manually (this is slow)
         if fast_groupby:
-            group_columns = [
-                dc.column_container.get_backend_by_frontend_name(group_name)
-                for group_name in group_columns
-            ]
             grouped_df = tmp_df.groupby(
                 by=(group_columns or [additional_column_name]), dropna=False
             )

charlesbluca avatar Jul 27 '22 14:07 charlesbluca

@charlesbluca Thanks! This solved the issue

ChrisJar avatar Aug 02 '22 02:08 ChrisJar

re-run tests

jdye64 avatar Aug 08 '22 20:08 jdye64