dask-sql icon indicating copy to clipboard operation
dask-sql copied to clipboard

[ENH] Support correlated subqueries

Open randerzander opened this issue 4 years ago • 3 comments

I'd like to be able to execute correlated subqueries.

It looks like the following example is successfully parsed by Calcite, but there's a logic error in mapping to Dask DataFrame calls:

import pandas as pd

df = pd.DataFrame({'id': [0, 1, 2], 'name': ['a', 'b', 'c'], 'val': [0, 1, 2]})

c.create_table('test', df)
c.sql("""
select name, val, id from test a
where val >
  (select avg(val) from test where id = a.id)
  """)

Result:

---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
/tmp/ipykernel_1522695/1713961665.py in <module>
      6 
      7 c.create_table('test', df)
----> 8 c.sql("""
      9 select name, val, id from test a
     10 where val >

~/conda/envs/dsql-dask-main/lib/python3.8/site-packages/dask_sql/context.py in sql(self, sql, return_futures, dataframes)
    436         rel, select_names, _ = self._get_ral(sql)
    437 
--> 438         dc = RelConverter.convert(rel, context=self)
    439 
    440         if dc is None:

~/conda/envs/dsql-dask-main/lib/python3.8/site-packages/dask_sql/physical/rel/convert.py in convert(cls, rel, context)
     58             f"Processing REL {rel} using {plugin_instance.__class__.__name__}..."
     59         )
---> 60         df = plugin_instance.convert(rel, context=context)
     61         logger.debug(f"Processed REL {rel} into {LoggableDataFrame(df)}")
     62         return df

~/conda/envs/dsql-dask-main/lib/python3.8/site-packages/dask_sql/physical/rel/logical/project.py in convert(self, rel, context)
     27     ) -> DataContainer:
     28         # Get the input of the previous step
---> 29         (dc,) = self.assert_inputs(rel, 1, context)
     30 
     31         df = dc.df

~/conda/envs/dsql-dask-main/lib/python3.8/site-packages/dask_sql/physical/rel/base.py in assert_inputs(rel, n, context)
     83         from dask_sql.physical.rel.convert import RelConverter
     84 
---> 85         return [RelConverter.convert(input_rel, context) for input_rel in input_rels]
     86 
     87     @staticmethod

~/conda/envs/dsql-dask-main/lib/python3.8/site-packages/dask_sql/physical/rel/base.py in <listcomp>(.0)
     83         from dask_sql.physical.rel.convert import RelConverter
     84 
---> 85         return [RelConverter.convert(input_rel, context) for input_rel in input_rels]
     86 
     87     @staticmethod

~/conda/envs/dsql-dask-main/lib/python3.8/site-packages/dask_sql/physical/rel/convert.py in convert(cls, rel, context)
     58             f"Processing REL {rel} using {plugin_instance.__class__.__name__}..."
     59         )
---> 60         df = plugin_instance.convert(rel, context=context)
     61         logger.debug(f"Processed REL {rel} into {LoggableDataFrame(df)}")
     62         return df

~/conda/envs/dsql-dask-main/lib/python3.8/site-packages/dask_sql/physical/rel/logical/join.py in convert(self, rel, context)
    186             )
    187             logger.debug(f"Additionally applying filter {filter_condition}")
--> 188             df = filter_or_scalar(df, filter_condition)
    189             dc = DataContainer(df, cc)
    190 

~/conda/envs/dsql-dask-main/lib/python3.8/site-packages/dask_sql/physical/rel/logical/filter.py in filter_or_scalar(df, filter_condition)
     32     # In SQL, a NULL in a boolean is False on filtering
     33     filter_condition = filter_condition.fillna(False)
---> 34     return df[filter_condition]
     35 
     36 

~/conda/envs/dsql-dask-main/lib/python3.8/site-packages/dask-2021.11.1+9.g43f33a32-py3.8.egg/dask/dataframe/core.py in __getitem__(self, key)
   4111                 from .multi import _maybe_align_partitions
   4112 
-> 4113                 self, key = _maybe_align_partitions([self, key])
   4114             dsk = partitionwise_graph(operator.getitem, name, self, key)
   4115             graph = HighLevelGraph.from_collections(name, dsk, dependencies=[self, key])

~/conda/envs/dsql-dask-main/lib/python3.8/site-packages/dask-2021.11.1+9.g43f33a32-py3.8.egg/dask/dataframe/multi.py in _maybe_align_partitions(args)
    166     divisions = dfs[0].divisions
    167     if not all(df.divisions == divisions for df in dfs):
--> 168         dfs2 = iter(align_partitions(*dfs)[0])
    169         return [a if not isinstance(a, _Frame) else next(dfs2) for a in args]
    170     return args

~/conda/envs/dsql-dask-main/lib/python3.8/site-packages/dask-2021.11.1+9.g43f33a32-py3.8.egg/dask/dataframe/multi.py in align_partitions(*dfs)
    120         raise ValueError("dfs contains no DataFrame and Series")
    121     if not all(df.known_divisions for df in dfs1):
--> 122         raise ValueError(
    123             "Not all divisions are known, can't align "
    124             "partitions. Please use `set_index` "

ValueError: Not all divisions are known, can't align partitions. Please use `set_index` to set the index.

Note: Edits were from adjusting the snippet and trace back to a plain DF input. I had tried w/ both Pandas & Dask DataFrame inputs, but got the same errors.

randerzander avatar Nov 16 '21 01:11 randerzander

Thanks for reporting @randerzander! I can't reproduce the error with the latest dask-sql and dask releases button reproduce with dask from main since the merge of dask/dask#8341. cc: @charlesbluca

ayushdg avatar Nov 16 '21 17:11 ayushdg

This should be resolved with dask/dask#8389; we can leave this open until the next Dask release with that PR is pushed out

charlesbluca avatar Nov 17 '21 19:11 charlesbluca

Forgot to mention, but it would probably be good to add a test for this specific type of query so that we can verify this continues to work in the future. IIRC, tests weren't failing when this stopped working

charlesbluca avatar Dec 02 '21 00:12 charlesbluca