dask-sql icon indicating copy to clipboard operation
dask-sql copied to clipboard

[BUG] Schema: <schema> not found in DaskSQLContext

Open hungcs opened this issue 2 years ago • 5 comments

Screen Shot 2022-12-20 at 3 56 30 PM

this worked in dask 2022.8, but after the switch to dataFusion, I get this error when running queries. We believe this is because dataFusion doesn't support schemas - is it possible to add support this again?

hungcs avatar Dec 20 '22 23:12 hungcs

Thanks for raising this issue. We do have some limited schema support with the move over to datafusion and are working on expanding this to support the same level of operations as earlier. (See #841).

@hungcs It would be great if you're able to provide a minimal example/reproducer of where schema support is failing in your workflow.

ayushdg avatar Dec 31 '22 06:12 ayushdg

@ayushdg sorry for the delay, heres my repo (still not working in 2022.3):

import dask_sql
import pandas as pd

context = dask_sql.Context()

connection_name = "file_uploads"
dataset_name = "titanic_dataset"
df = pd.DataFrame({"name": ["Tom"], "mask": ["pink"], "weapon": ["stick"]})

if connection_name not in context.schema:
    print("----added connection_name to schema----")
    context.create_schema(connection_name)
    print("----schema----", context.schema)

if dataset_name not in context.schema[connection_name].tables:
    context.create_table(dataset_name, df, schema_name=connection_name)

hungcs avatar Feb 07 '23 02:02 hungcs

Screen Shot 2023-02-06 at 6 26 21 PM

hungcs avatar Feb 07 '23 02:02 hungcs

playing around with some stuff, i can register the schema by doing this:

import dask_sql
import pandas as pd
import dask.dataframe as dd
from dask_planner.rust import DaskSchema, DaskTable

context = dask_sql.Context()

connection_name = "file_uploads"
dataset_name = "titanic_dataset"
df = dd.from_pandas(pd.DataFrame({"name": ["Tom"], "mask": ["pink"], "weapon": ["stick"]}), npartitions=1)

if connection_name not in context.schema:
    print("----added connection_name to schema----")
    context.create_schema(connection_name)

    dask_schema = DaskSchema(schema_name=connection_name)
    dask_schema.add_table(DaskTable(schema_name=connection_name, row_count=len(df.index), table_name=dataset_name))
    context.context.register_schema(schema_name=connection_name, schema=dask_schema)

if dataset_name not in context.schema[connection_name].tables:
    context.create_table(dataset_name, df, schema_name=connection_name)
print("____end______", context.schema[connection_name].tables)

context.sql("select * from titanic_dataset").compute()

but then i get: Screen Shot 2023-02-06 at 6 53 11 PM

am i doing something wrong?

if i try context.context.use_schema(schema_name=connection_name), i get something like

  File "/Users/hw/mambaforge/envs/based/lib/python3.8/site-packages/dask_sql/physical/rel/logical/project.py", line 28, in convert
    (dc,) = self.assert_inputs(rel, 1, context)
  File "/Users/hw/mambaforge/envs/based/lib/python3.8/site-packages/dask_sql/physical/rel/base.py", line 84, in assert_inputs
    return [RelConverter.convert(input_rel, context) for input_rel in input_rels]
  File "/Users/hw/mambaforge/envs/based/lib/python3.8/site-packages/dask_sql/physical/rel/base.py", line 84, in <listcomp>
    return [RelConverter.convert(input_rel, context) for input_rel in input_rels]
  File "/Users/hw/mambaforge/envs/based/lib/python3.8/site-packages/dask_sql/physical/rel/convert.py", line 61, in convert
    df = plugin_instance.convert(rel, context=context)
  File "/Users/hw/mambaforge/envs/based/lib/python3.8/site-packages/dask_sql/physical/rel/logical/table_scan.py", line 47, in convert
    dc = context.schema[schema_name].tables[table_name]
KeyError: 'titanic_dataset'

and it looks like it's trying to use schema_name="root" instead of file_uploads

hungcs avatar Feb 07 '23 02:02 hungcs

Sorry for the delayed response here - could you share what dask-sql version you're using to reproduce these failures?

With a source install of main, I don't see any errors using your first reproducer - I wouldn't think you would need to manually register the schema with DaskSQLContext.register_schema, as this is already done for all created schemas as part of Context._get_ral:

https://github.com/dask-contrib/dask-sql/blob/883cc3cfdfabeef88a7a152b642b0c84d9385047/dask_sql/context.py#L803-L806

In general, would not recommend directly calling DaskSQLContext methods such as use_schema, as these are meant for internal use and generally only impact things on the Rust-side of SQL parsing, not the Python end of execution; instead, I would recommend using a USE SCHEMA statement to switch the active schema, after which I'm able to query the table in your above reproducer:

import dask_sql
import pandas as pd

context = dask_sql.Context()

connection_name = "file_uploads"
dataset_name = "titanic_dataset"
df = pd.DataFrame({"name": ["Tom"], "mask": ["pink"], "weapon": ["stick"]})

context.create_schema(connection_name)
context.create_table(dataset_name, df, schema_name=connection_name)

context.sql(f"use schema {connection_name}")
context.sql("select * from titanic_dataset").compute()

I do understand why one wouldn't get the impression that the DaskSQLContext methods are internal, as they aren't prefaced by an underscore or indicated as such - this reflects modifications we should consider making to the names of these method's bindings (cc @jdye64 if you have some thoughts here). Might also be worth adding a use_schema method to dask_sql.Context since this seems roughly inline with the create_*/alter_* methods we have there to coincide with SQL queries.

charlesbluca avatar Mar 30 '23 18:03 charlesbluca