dask-sql
dask-sql copied to clipboard
[ENH] Support INTERSECT operator
Sometimes intead of using a JOIN, an INTERSECT is used to find the overlap in two sets of records:
import pandas as pd
df_a = pd.DataFrame({'id': [0, 1, 2]})
df_b = pd.DataFrame({'id': [2]})
c.create_table('table_a', df_a)
c.create_table('table_b', df_b)
c.sql("select * from table_a intersect select * from table_b")
#expected:
2
Result:
---------------------------------------------------------------------------
KeyError Traceback (most recent call last)
~/conda/envs/dsql-dask-main/lib/python3.8/site-packages/dask_sql/physical/rel/convert.py in convert(cls, rel, context)
51 try:
---> 52 plugin_instance = cls.get_plugin(class_name)
53 except KeyError: # pragma: no cover
~/conda/envs/dsql-dask-main/lib/python3.8/site-packages/dask_sql/utils.py in get_plugin(cls, name)
73 """Get a plugin with the given name"""
---> 74 return Pluggable.__plugins[cls][name]
75
KeyError: 'org.apache.calcite.rel.logical.LogicalIntersect'
During handling of the above exception, another exception occurred:
NotImplementedError Traceback (most recent call last)
/tmp/ipykernel_628704/213746317.py in <module>
6 c.create_table('table_a', df_a)
7 c.create_table('table_b', df_b)
----> 8 c.sql("select * from table_a intersect select * from table_b")
~/conda/envs/dsql-dask-main/lib/python3.8/site-packages/dask_sql/context.py in sql(self, sql, return_futures, dataframes)
436 rel, select_names, _ = self._get_ral(sql)
437
--> 438 dc = RelConverter.convert(rel, context=self)
439
440 if dc is None:
~/conda/envs/dsql-dask-main/lib/python3.8/site-packages/dask_sql/physical/rel/convert.py in convert(cls, rel, context)
58 f"Processing REL {rel} using {plugin_instance.__class__.__name__}..."
59 )
---> 60 df = plugin_instance.convert(rel, context=context)
61 logger.debug(f"Processed REL {rel} into {LoggableDataFrame(df)}")
62 return df
~/conda/envs/dsql-dask-main/lib/python3.8/site-packages/dask_sql/physical/rel/logical/project.py in convert(self, rel, context)
27 ) -> DataContainer:
28 # Get the input of the previous step
---> 29 (dc,) = self.assert_inputs(rel, 1, context)
30
31 df = dc.df
~/conda/envs/dsql-dask-main/lib/python3.8/site-packages/dask_sql/physical/rel/base.py in assert_inputs(rel, n, context)
83 from dask_sql.physical.rel.convert import RelConverter
84
---> 85 return [RelConverter.convert(input_rel, context) for input_rel in input_rels]
86
87 @staticmethod
~/conda/envs/dsql-dask-main/lib/python3.8/site-packages/dask_sql/physical/rel/base.py in <listcomp>(.0)
83 from dask_sql.physical.rel.convert import RelConverter
84
---> 85 return [RelConverter.convert(input_rel, context) for input_rel in input_rels]
86
87 @staticmethod
~/conda/envs/dsql-dask-main/lib/python3.8/site-packages/dask_sql/physical/rel/convert.py in convert(cls, rel, context)
52 plugin_instance = cls.get_plugin(class_name)
53 except KeyError: # pragma: no cover
---> 54 raise NotImplementedError(
55 f"No conversion for class {class_name} available (yet)."
56 )
NotImplementedError: No conversion for class org.apache.calcite.rel.logical.LogicalIntersect available (yet).
This could possibly be implemented by use of Dask DataFrame's merge operator.
@randerzander while not in the main branch this is supported in the datafusion-sql-planner branch which will eventually be merged into main. Does that meet your criteria enough that we could close this issue?