[BUG] Schema: <schema> not found in DaskSQLContext
this worked in dask 2022.8, but after the switch to dataFusion, I get this error when running queries. We believe this is because dataFusion doesn't support schemas - is it possible to add support this again?
Thanks for raising this issue. We do have some limited schema support with the move over to datafusion and are working on expanding this to support the same level of operations as earlier. (See #841).
@hungcs It would be great if you're able to provide a minimal example/reproducer of where schema support is failing in your workflow.
@ayushdg sorry for the delay, heres my repo (still not working in 2022.3):
import dask_sql
import pandas as pd
context = dask_sql.Context()
connection_name = "file_uploads"
dataset_name = "titanic_dataset"
df = pd.DataFrame({"name": ["Tom"], "mask": ["pink"], "weapon": ["stick"]})
if connection_name not in context.schema:
print("----added connection_name to schema----")
context.create_schema(connection_name)
print("----schema----", context.schema)
if dataset_name not in context.schema[connection_name].tables:
context.create_table(dataset_name, df, schema_name=connection_name)
playing around with some stuff, i can register the schema by doing this:
import dask_sql
import pandas as pd
import dask.dataframe as dd
from dask_planner.rust import DaskSchema, DaskTable
context = dask_sql.Context()
connection_name = "file_uploads"
dataset_name = "titanic_dataset"
df = dd.from_pandas(pd.DataFrame({"name": ["Tom"], "mask": ["pink"], "weapon": ["stick"]}), npartitions=1)
if connection_name not in context.schema:
print("----added connection_name to schema----")
context.create_schema(connection_name)
dask_schema = DaskSchema(schema_name=connection_name)
dask_schema.add_table(DaskTable(schema_name=connection_name, row_count=len(df.index), table_name=dataset_name))
context.context.register_schema(schema_name=connection_name, schema=dask_schema)
if dataset_name not in context.schema[connection_name].tables:
context.create_table(dataset_name, df, schema_name=connection_name)
print("____end______", context.schema[connection_name].tables)
context.sql("select * from titanic_dataset").compute()
but then i get:

am i doing something wrong?
if i try context.context.use_schema(schema_name=connection_name), i get something like
File "/Users/hw/mambaforge/envs/based/lib/python3.8/site-packages/dask_sql/physical/rel/logical/project.py", line 28, in convert
(dc,) = self.assert_inputs(rel, 1, context)
File "/Users/hw/mambaforge/envs/based/lib/python3.8/site-packages/dask_sql/physical/rel/base.py", line 84, in assert_inputs
return [RelConverter.convert(input_rel, context) for input_rel in input_rels]
File "/Users/hw/mambaforge/envs/based/lib/python3.8/site-packages/dask_sql/physical/rel/base.py", line 84, in <listcomp>
return [RelConverter.convert(input_rel, context) for input_rel in input_rels]
File "/Users/hw/mambaforge/envs/based/lib/python3.8/site-packages/dask_sql/physical/rel/convert.py", line 61, in convert
df = plugin_instance.convert(rel, context=context)
File "/Users/hw/mambaforge/envs/based/lib/python3.8/site-packages/dask_sql/physical/rel/logical/table_scan.py", line 47, in convert
dc = context.schema[schema_name].tables[table_name]
KeyError: 'titanic_dataset'
and it looks like it's trying to use schema_name="root" instead of file_uploads
Sorry for the delayed response here - could you share what dask-sql version you're using to reproduce these failures?
With a source install of main, I don't see any errors using your first reproducer - I wouldn't think you would need to manually register the schema with DaskSQLContext.register_schema, as this is already done for all created schemas as part of Context._get_ral:
https://github.com/dask-contrib/dask-sql/blob/883cc3cfdfabeef88a7a152b642b0c84d9385047/dask_sql/context.py#L803-L806
In general, would not recommend directly calling DaskSQLContext methods such as use_schema, as these are meant for internal use and generally only impact things on the Rust-side of SQL parsing, not the Python end of execution; instead, I would recommend using a USE SCHEMA statement to switch the active schema, after which I'm able to query the table in your above reproducer:
import dask_sql
import pandas as pd
context = dask_sql.Context()
connection_name = "file_uploads"
dataset_name = "titanic_dataset"
df = pd.DataFrame({"name": ["Tom"], "mask": ["pink"], "weapon": ["stick"]})
context.create_schema(connection_name)
context.create_table(dataset_name, df, schema_name=connection_name)
context.sql(f"use schema {connection_name}")
context.sql("select * from titanic_dataset").compute()
I do understand why one wouldn't get the impression that the DaskSQLContext methods are internal, as they aren't prefaced by an underscore or indicated as such - this reflects modifications we should consider making to the names of these method's bindings (cc @jdye64 if you have some thoughts here). Might also be worth adding a use_schema method to dask_sql.Context since this seems roughly inline with the create_*/alter_* methods we have there to coincide with SQL queries.