dask-sql icon indicating copy to clipboard operation
dask-sql copied to clipboard

[BUG] Subqueries aren't always supported in projections, binary operations

Open charlesbluca opened this issue 3 years ago • 0 comments

What happened: When running queries containing subqueries, we sometimes run into an issue where a subquery's tabular result will get passed directly into a plugin expecting a column or scalar as input, causing errors either directly in the plugin or when the malformed results of this plugin are used later on. This can be seen trivially by passing a subquery directly into a select statement:

from dask_sql import Context
from dask.datasets import timeseries

c = Context()
c.create_table("df", timeseries())

c.sql("""
    select
        name, id, (select avg(x) from df)
    from df
""")
# TypeError: Column assignment doesn't support type dask.dataframe.core.DataFrame

However, in some cases, DataFusion optimizes to cross join the results of the subquery to the table they are needed in, avoiding this problem:

c.sql("""
    select
        name, id
    from df
    where
        x > (
            select avg(x) from df
        )
""")

# Projection: df.name, df.id
#   Filter: df.x > __sq_1.__value
#     CrossJoin:
#       TableScan: df projection=[name, id, x]
#       Projection: AVG(df.x) AS __value, alias=__sq_1
#         Aggregate: groupBy=[[]], aggr=[[AVG(df.x)]]
#           TableScan: df projection=[name, id, x]

What you expected to happen: I would expect there to be more robust handling for subqueries on either the DataFusion or Python side, or both. Some ideas that come to mind as potential solutions:

  • For Python plugins where input expressions are expected to be either columns or scalars, add handling for cases where a table is received instead; I'd imagine this would entail:
    • asserting the table has a single column and selecting that single column
    • checking if the length of the columns is 1, which implies we should probably extract the single value from it (unfortunately think this can't be done without manually computing the Dask object)
  • Introducing a custom optimizer rule that optimizes away subqueries if we see they would be getting passed into a plugin where they are unsupported; essentially we would want to take plans like that of the initial failing query:
Projection: df.name, df.id, (<subquery>)
  Subquery:
    Projection: AVG(df.x)
      Aggregate: groupBy=[[]], aggr=[[AVG(df.x)]]
        TableScan: df
  TableScan: df projection=[name, id]

And turn it into something like:

Projection: df.name, df.id, __sq_1.__value AS AVG(df.x)
  CrossJoin:
    TableScan: df projection=[name, id]
    Projection: AVG(df.x) AS __value, alias=__sq_1
      Aggregate: groupBy=[[]], aggr=[[AVG(df.x)]]
        TableScan: df projection=[name, id, x]

Anything else we need to know?: Uncovered this while digging through some of the query failures, pretty sure this is the root cause of several failures that crop up in slightly different ways due to the various plugins this issue could occur in.

Environment:

  • dask-sql version: latest
  • Python version: 3.9
  • Operating System: ubuntu
  • Install method (conda, pip, source): source

charlesbluca avatar Dec 12 '22 17:12 charlesbluca