`TABLE_OR_VIEW_NOT_FOUND` with `sqlalchemy` connection
I've connected to our databricks workspace with a sa.Engine, specifying the database/catalog as my-catalog:
>>> engine
Engine(databricks://token:***@mycompany-test.cloud.databricks.com:443/my-catalog)
In this catalog I have a test schema which has a run_header_v1 table. Using the engine I can correctly autoload the table:
>>> import sqlalchemy as sa
>>> RunHeader = sa.Table(
... 'run_header_v1',
... sa.MetaData(),
... schema='test',
... autoload_with=engine,
... )
>>> RunHeader
Table('run_header_v1', MetaData(), Column('run_id', String(), table=<run_header_v1>, nullable=False), Column('start_time', TIMESTAMP(), table=<run_header_v1>, nullable=False), Column('end_time', TIMESTAMP(), table=<run_header_v1>, nullable=False), Column('run_type', String(), table=<run_header_v1>, nullable=False), Column('status', String(), table=<run_header_v1>, nullable=False), schema='test')
When I try to use the Table object I get a TABLE_OR_VIEW_NOT_FOUND error (even though the same engine loaded the table in the first place!)
>>> with engine.connect() as conn:
... res = conn.execute(
... sa.select(sa.func.count())
... .select_from(RunHeader)
... ).scalar_one()
...
Traceback (most recent call last):
<snip>
ServerOperationError: [TABLE_OR_VIEW_NOT_FOUND] The table or view `test`.`run_header_v1` cannot be found. Verify the spelling and correctness of the schema and catalog.
If you did not qualify the name with a schema, verify the current_schema() output, or qualify the name with the correct schema and catalog.
To tolerate the error on drop use DROP VIEW IF EXISTS or DROP TABLE IF EXISTS. SQLSTATE: 42P01; line 2 pos 5
I get the same error if I try to use the engine to execute the equivalent raw SQL:
>>> with engine.connect() as conn:
... res = conn.execute(sa.text("SELECT count(*) FROM test.run_header_v1")).scalar_one()
...
<snip>
DatabaseError: (databricks.sql.exc.ServerOperationError) [TABLE_OR_VIEW_NOT_FOUND] ...
NOTE!
I can avoid the error if I explicitly issue a USE CATALOG ... statement on the connection before executing my query:
>>> with engine.connect() as conn:
... conn.execute(sa.text(f"use catalog `{engine.url.database}`"))
... res = conn.execute(sa.text("SELECT count(*) FROM test.run_header_v1")).scalar_one()
...
>>> print(res)
0
>>>
So, it looks like the sqlalchemy integration isn't correctly setting the catalog when it is specified in the engine connection arguments.
This also works, but shouldn't be required:
def initialise_connection(conn, record):
with conn.cursor() as cursor:
cursor.execute(f"use catalog `{engine.url.database}`")
sa.event.listen(engine, 'connect', initialise_connection)