dask-sql
dask-sql copied to clipboard
Add STDDEV, STDDEV_SAMP, and STDDEV_POP
Closes #608
Blocked by: https://github.com/rapidsai/cudf/issues/11515
Note: currently, performing multiple aggregations at once seems to result in incorrect values. Ex: SELECT STDDEV(a) AS s1, STDDEV_POP(a) AS s2 FROM df returns the same result for both s1 and s2 but running two separate queries (one for each aggregation) returns the correct results (#655)
Codecov Report
:exclamation: No coverage uploaded for pull request base (
datafusion-sql-planner@4b291ed). Click here to learn what that means. The diff coverage isn/a.
@@ Coverage Diff @@
## datafusion-sql-planner #629 +/- ##
=========================================================
Coverage ? 67.25%
=========================================================
Files ? 73
Lines ? 3637
Branches ? 753
=========================================================
Hits ? 2446
Misses ? 1050
Partials ? 141
:mega: We’re building smart automated test selection to slash your CI/CD build times. Learn more
Thanks for opening this up @ChrisJar. Is this ready for initial reviews?
I'm still looking into the relational conversion error (seems to be an issue with the column name), but yeah I think I'm ready for a first round of reviews.
I'm still looking into the relational conversion error (seems to be an issue with the column name), but yeah I think I'm ready for a first round of reviews.
Could you drop a link to that issue?
I haven't been able to create a minimal reproducer, but here's a traceback from using this PR to run:
import pandas as pd
from dask_sql import Context
df = pd.DataFrame({"a":[1,1,2,1,2,2], "b":[4,5,6,4,1,5]})
c.create_table("df", df)
c.sql("SELECT STDDEV_POP(b) FROM df GROUP BY a").compute()
Traceback
---------------------------------------------------------------------------
KeyError Traceback (most recent call last)
File ~/miniconda3/envs/dask-sql-datafusion/lib/python3.9/site-packages/pandas/core/indexes/base.py:3621, in Index.get_loc(self, key, method, tolerance)
3620 try:
-> 3621 return self._engine.get_loc(casted_key)
3622 except KeyError as err:
File ~/miniconda3/envs/dask-sql-datafusion/lib/python3.9/site-packages/pandas/_libs/index.pyx:136, in pandas._libs.index.IndexEngine.get_loc()
File ~/miniconda3/envs/dask-sql-datafusion/lib/python3.9/site-packages/pandas/_libs/index.pyx:163, in pandas._libs.index.IndexEngine.get_loc()
File pandas/_libs/hashtable_class_helper.pxi:5198, in pandas._libs.hashtable.PyObjectHashTable.get_item()
File pandas/_libs/hashtable_class_helper.pxi:5206, in pandas._libs.hashtable.PyObjectHashTable.get_item()
KeyError: 'df.a'
The above exception was the direct cause of the following exception:
KeyError Traceback (most recent call last)
Input In [21], in <cell line: 1>()
----> 1 c.sql("SELECT STDDEV_POP(b) FROM df GROUP BY a").compute()
File ~/dask-sql/dask_sql/context.py:495, in Context.sql(self, sql, return_futures, dataframes, gpu, config_options)
491 self.create_table(df_name, df, gpu=gpu)
493 rel, select_fields, _ = self._get_ral(sql)
--> 495 dc = RelConverter.convert(rel, context=self)
497 if rel.get_current_node_type() == "Explain":
498 return dc
File ~/dask-sql/dask_sql/physical/rel/convert.py:61, in RelConverter.convert(cls, rel, context)
55 raise NotImplementedError(
56 f"No relational conversion for node type {node_type} available (yet)."
57 )
58 logger.debug(
59 f"Processing REL {rel} using {plugin_instance.class.name}..."
60 )
---> 61 df = plugin_instance.convert(rel, context=context)
62 logger.debug(f"Processed REL {rel} into {LoggableDataFrame(df)}")
63 return df
File ~/dask-sql/dask_sql/physical/rel/logical/project.py:28, in DaskProjectPlugin.convert(self, rel, context)
26 def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContainer:
27 # Get the input of the previous step
---> 28 (dc,) = self.assert_inputs(rel, 1, context)
30 df = dc.df
31 cc = dc.column_container
File ~/dask-sql/dask_sql/physical/rel/base.py:84, in BaseRelPlugin.assert_inputs(rel, n, context)
81 # Late import to remove cycling dependency
82 from dask_sql.physical.rel.convert import RelConverter
---> 84 return [RelConverter.convert(input_rel, context) for input_rel in input_rels]
File ~/dask-sql/dask_sql/physical/rel/base.py:84, in (.0)
81 # Late import to remove cycling dependency
82 from dask_sql.physical.rel.convert import RelConverter
---> 84 return [RelConverter.convert(input_rel, context) for input_rel in input_rels]
File ~/dask-sql/dask_sql/physical/rel/convert.py:61, in RelConverter.convert(cls, rel, context)
55 raise NotImplementedError(
56 f"No relational conversion for node type {node_type} available (yet)."
57 )
58 logger.debug(
59 f"Processing REL {rel} using {plugin_instance.class.name}..."
60 )
---> 61 df = plugin_instance.convert(rel, context=context)
62 logger.debug(f"Processed REL {rel} into {LoggableDataFrame(df)}")
63 return df
File ~/dask-sql/dask_sql/physical/rel/logical/aggregate.py:195, in DaskAggregatePlugin.convert(self, rel, context)
192 logger.debug("Performing full-table aggregation")
194 # Do all aggregates
--> 195 df_result, output_column_order = self._do_aggregations(
196 rel,
197 dc,
198 group_columns,
199 context,
200 )
202 # SQL does not care about the index, but we do not want to have any multiindices
203 df_agg = df_result.reset_index(drop=True)
File ~/dask-sql/dask_sql/physical/rel/logical/aggregate.py:276, in DaskAggregatePlugin._do_aggregations(self, rel, dc, group_columns, context)
274 if key in collected_aggregations:
275 aggregations = collected_aggregations.pop(key)
--> 276 df_result = self._perform_aggregation(
277 DataContainer(df, cc),
278 None,
279 aggregations,
280 additional_column_name,
281 group_columns,
282 groupby_agg_options,
283 )
285 # Now we can also the the rest
286 for filter_column, aggregations in collected_aggregations.items():
File ~/dask-sql/dask_sql/physical/rel/logical/aggregate.py:461, in DaskAggregatePlugin._perform_aggregation(self, dc, filter_column, aggregations, additional_column_name, group_columns, groupby_agg_options)
457 grouped_df = tmp_df.groupby(
458 by=(group_columns or [additional_column_name]), dropna=False
459 )
460 else:
--> 461 group_columns = [tmp_df[group_column] for group_column in group_columns]
462 group_columns_and_nulls = get_groupby_with_nulls_cols(
463 tmp_df, group_columns, additional_column_name
464 )
465 grouped_df = tmp_df.groupby(by=group_columns_and_nulls)
File ~/dask-sql/dask_sql/physical/rel/logical/aggregate.py:461, in (.0)
457 grouped_df = tmp_df.groupby(
458 by=(group_columns or [additional_column_name]), dropna=False
459 )
460 else:
--> 461 group_columns = [tmp_df[group_column] for group_column in group_columns]
462 group_columns_and_nulls = get_groupby_with_nulls_cols(
463 tmp_df, group_columns, additional_column_name
464 )
465 grouped_df = tmp_df.groupby(by=group_columns_and_nulls)
File ~/miniconda3/envs/dask-sql-datafusion/lib/python3.9/site-packages/dask/dataframe/core.py:4441, in DataFrame.getitem(self, key)
4438 return self.loc[key]
4440 # error is raised from pandas
-> 4441 meta = self._meta[_extract_meta(key)]
4442 dsk = partitionwise_graph(operator.getitem, name, self, key)
4443 graph = HighLevelGraph.from_collections(name, dsk, dependencies=[self])
File ~/miniconda3/envs/dask-sql-datafusion/lib/python3.9/site-packages/pandas/core/frame.py:3505, in DataFrame.getitem(self, key)
3503 if self.columns.nlevels > 1:
3504 return self._getitem_multilevel(key)
-> 3505 indexer = self.columns.get_loc(key)
3506 if is_integer(indexer):
3507 indexer = [indexer]
File ~/miniconda3/envs/dask-sql-datafusion/lib/python3.9/site-packages/pandas/core/indexes/base.py:3623, in Index.get_loc(self, key, method, tolerance)
3621 return self._engine.get_loc(casted_key)
3622 except KeyError as err:
-> 3623 raise KeyError(key) from err
3624 except TypeError:
3625 # If we have a listlike key, _check_indexing_error will raise
3626 # InvalidIndexError. Otherwise we fall through and re-raise
3627 # the TypeError.
3628 self._check_indexing_error(key)
KeyError: 'df.a'
For some reason the column_name function in dask_planner is returning df.a as the column name instead of a.
Digging into this more, it looks like we expect column_name to return a fully-qualified column name and extract the column name itself using get_backend_by_frontend_name; it looks like we're only doing this for the fast groupby codepath, making it so aggregations that don't take this codepath end up failing; for example, the following code fails without this PR:
import pandas as pd
from dask_sql import Context
c = Context()
df = pd.DataFrame({
"a":[1, 2, 1, 2, 1, 2],
"b":["1", "2", "3", "4", "5", "6"] # object dtype forces a slow groupby
})
c.create_table("df", df)
c.sql("SELECT MAX(b) FROM df GROUP BY a") # KeyError: 'df.a'
There are definitely tests using the slow groupby codepath that would've revealed this issue (such as test_agg_min_max), but it looks like those are failing before we even reach the Python aggregation code. Adding tests to this PR should give us that coverage, so feel free to include this patch here, which should resolve your reproducer:
diff --git a/dask_sql/physical/rel/logical/aggregate.py b/dask_sql/physical/rel/logical/aggregate.py
index 333fbea..54a9597 100644
--- a/dask_sql/physical/rel/logical/aggregate.py
+++ b/dask_sql/physical/rel/logical/aggregate.py
@@ -447,13 +447,15 @@ class DaskAggregatePlugin(BaseRelPlugin):
if additional_column_name is None:
additional_column_name = new_temporary_column(dc.df)
+ # extract group column names
+ group_columns = [
+ dc.column_container.get_backend_by_frontend_name(group_name)
+ for group_name in group_columns
+ ]
+
# perform groupby operation; if we are using custom aggregations, we must handle
# null values manually (this is slow)
if fast_groupby:
- group_columns = [
- dc.column_container.get_backend_by_frontend_name(group_name)
- for group_name in group_columns
- ]
grouped_df = tmp_df.groupby(
by=(group_columns or [additional_column_name]), dropna=False
)
@charlesbluca Thanks! This solved the issue
re-run tests