dask-sql
dask-sql copied to clipboard
[ENH] Support correlated subqueries
I'd like to be able to execute correlated subqueries.
It looks like the following example is successfully parsed by Calcite, but there's a logic error in mapping to Dask DataFrame calls:
import pandas as pd
df = pd.DataFrame({'id': [0, 1, 2], 'name': ['a', 'b', 'c'], 'val': [0, 1, 2]})
c.create_table('test', df)
c.sql("""
select name, val, id from test a
where val >
(select avg(val) from test where id = a.id)
""")
Result:
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
/tmp/ipykernel_1522695/1713961665.py in <module>
6
7 c.create_table('test', df)
----> 8 c.sql("""
9 select name, val, id from test a
10 where val >
~/conda/envs/dsql-dask-main/lib/python3.8/site-packages/dask_sql/context.py in sql(self, sql, return_futures, dataframes)
436 rel, select_names, _ = self._get_ral(sql)
437
--> 438 dc = RelConverter.convert(rel, context=self)
439
440 if dc is None:
~/conda/envs/dsql-dask-main/lib/python3.8/site-packages/dask_sql/physical/rel/convert.py in convert(cls, rel, context)
58 f"Processing REL {rel} using {plugin_instance.__class__.__name__}..."
59 )
---> 60 df = plugin_instance.convert(rel, context=context)
61 logger.debug(f"Processed REL {rel} into {LoggableDataFrame(df)}")
62 return df
~/conda/envs/dsql-dask-main/lib/python3.8/site-packages/dask_sql/physical/rel/logical/project.py in convert(self, rel, context)
27 ) -> DataContainer:
28 # Get the input of the previous step
---> 29 (dc,) = self.assert_inputs(rel, 1, context)
30
31 df = dc.df
~/conda/envs/dsql-dask-main/lib/python3.8/site-packages/dask_sql/physical/rel/base.py in assert_inputs(rel, n, context)
83 from dask_sql.physical.rel.convert import RelConverter
84
---> 85 return [RelConverter.convert(input_rel, context) for input_rel in input_rels]
86
87 @staticmethod
~/conda/envs/dsql-dask-main/lib/python3.8/site-packages/dask_sql/physical/rel/base.py in <listcomp>(.0)
83 from dask_sql.physical.rel.convert import RelConverter
84
---> 85 return [RelConverter.convert(input_rel, context) for input_rel in input_rels]
86
87 @staticmethod
~/conda/envs/dsql-dask-main/lib/python3.8/site-packages/dask_sql/physical/rel/convert.py in convert(cls, rel, context)
58 f"Processing REL {rel} using {plugin_instance.__class__.__name__}..."
59 )
---> 60 df = plugin_instance.convert(rel, context=context)
61 logger.debug(f"Processed REL {rel} into {LoggableDataFrame(df)}")
62 return df
~/conda/envs/dsql-dask-main/lib/python3.8/site-packages/dask_sql/physical/rel/logical/join.py in convert(self, rel, context)
186 )
187 logger.debug(f"Additionally applying filter {filter_condition}")
--> 188 df = filter_or_scalar(df, filter_condition)
189 dc = DataContainer(df, cc)
190
~/conda/envs/dsql-dask-main/lib/python3.8/site-packages/dask_sql/physical/rel/logical/filter.py in filter_or_scalar(df, filter_condition)
32 # In SQL, a NULL in a boolean is False on filtering
33 filter_condition = filter_condition.fillna(False)
---> 34 return df[filter_condition]
35
36
~/conda/envs/dsql-dask-main/lib/python3.8/site-packages/dask-2021.11.1+9.g43f33a32-py3.8.egg/dask/dataframe/core.py in __getitem__(self, key)
4111 from .multi import _maybe_align_partitions
4112
-> 4113 self, key = _maybe_align_partitions([self, key])
4114 dsk = partitionwise_graph(operator.getitem, name, self, key)
4115 graph = HighLevelGraph.from_collections(name, dsk, dependencies=[self, key])
~/conda/envs/dsql-dask-main/lib/python3.8/site-packages/dask-2021.11.1+9.g43f33a32-py3.8.egg/dask/dataframe/multi.py in _maybe_align_partitions(args)
166 divisions = dfs[0].divisions
167 if not all(df.divisions == divisions for df in dfs):
--> 168 dfs2 = iter(align_partitions(*dfs)[0])
169 return [a if not isinstance(a, _Frame) else next(dfs2) for a in args]
170 return args
~/conda/envs/dsql-dask-main/lib/python3.8/site-packages/dask-2021.11.1+9.g43f33a32-py3.8.egg/dask/dataframe/multi.py in align_partitions(*dfs)
120 raise ValueError("dfs contains no DataFrame and Series")
121 if not all(df.known_divisions for df in dfs1):
--> 122 raise ValueError(
123 "Not all divisions are known, can't align "
124 "partitions. Please use `set_index` "
ValueError: Not all divisions are known, can't align partitions. Please use `set_index` to set the index.
Note: Edits were from adjusting the snippet and trace back to a plain DF input. I had tried w/ both Pandas & Dask DataFrame inputs, but got the same errors.
Thanks for reporting @randerzander! I can't reproduce the error with the latest dask-sql and dask releases button reproduce with dask from main since the merge of dask/dask#8341. cc: @charlesbluca
This should be resolved with dask/dask#8389; we can leave this open until the next Dask release with that PR is pushed out
Forgot to mention, but it would probably be good to add a test for this specific type of query so that we can verify this continues to work in the future. IIRC, tests weren't failing when this stopped working