dask-sql
dask-sql copied to clipboard
[BUG] Subqueries aren't always supported in projections, binary operations
What happened: When running queries containing subqueries, we sometimes run into an issue where a subquery's tabular result will get passed directly into a plugin expecting a column or scalar as input, causing errors either directly in the plugin or when the malformed results of this plugin are used later on. This can be seen trivially by passing a subquery directly into a select statement:
from dask_sql import Context
from dask.datasets import timeseries
c = Context()
c.create_table("df", timeseries())
c.sql("""
select
name, id, (select avg(x) from df)
from df
""")
# TypeError: Column assignment doesn't support type dask.dataframe.core.DataFrame
However, in some cases, DataFusion optimizes to cross join the results of the subquery to the table they are needed in, avoiding this problem:
c.sql("""
select
name, id
from df
where
x > (
select avg(x) from df
)
""")
# Projection: df.name, df.id
# Filter: df.x > __sq_1.__value
# CrossJoin:
# TableScan: df projection=[name, id, x]
# Projection: AVG(df.x) AS __value, alias=__sq_1
# Aggregate: groupBy=[[]], aggr=[[AVG(df.x)]]
# TableScan: df projection=[name, id, x]
What you expected to happen: I would expect there to be more robust handling for subqueries on either the DataFusion or Python side, or both. Some ideas that come to mind as potential solutions:
- For Python plugins where input expressions are expected to be either columns or scalars, add handling for cases where a table is received instead; I'd imagine this would entail:
- asserting the table has a single column and selecting that single column
- checking if the length of the columns is 1, which implies we should probably extract the single value from it (unfortunately think this can't be done without manually computing the Dask object)
- Introducing a custom optimizer rule that optimizes away subqueries if we see they would be getting passed into a plugin where they are unsupported; essentially we would want to take plans like that of the initial failing query:
Projection: df.name, df.id, (<subquery>)
Subquery:
Projection: AVG(df.x)
Aggregate: groupBy=[[]], aggr=[[AVG(df.x)]]
TableScan: df
TableScan: df projection=[name, id]
And turn it into something like:
Projection: df.name, df.id, __sq_1.__value AS AVG(df.x)
CrossJoin:
TableScan: df projection=[name, id]
Projection: AVG(df.x) AS __value, alias=__sq_1
Aggregate: groupBy=[[]], aggr=[[AVG(df.x)]]
TableScan: df projection=[name, id, x]
Anything else we need to know?: Uncovered this while digging through some of the query failures, pretty sure this is the root cause of several failures that crop up in slightly different ways due to the various plugins this issue could occur in.
Environment:
- dask-sql version: latest
- Python version: 3.9
- Operating System: ubuntu
- Install method (conda, pip, source): source