duckdb_engine icon indicating copy to clipboard operation
duckdb_engine copied to clipboard

Reusing engines from different threads

Open dhirschfeld opened this issue 1 year ago • 3 comments

My observation is that passing an engine connected to an in-memory duckdb database to a different thread doesn't work.

I'm wondering if that's expected or if it would be considered a bug / missing feature?

Example:

import anyio
import sqlalchemy as sa
from sqlalchemy.orm import (
    DeclarativeBase,
    Mapped,
    Session,
    mapped_column,
)


class Base(DeclarativeBase):
    pass


seq = sa.Sequence("user_id")


class User(Base):
    __tablename__ = "Users"
    id: Mapped[int] = mapped_column(
        seq,
        server_default=seq.next_value(),
        primary_key=True,
    )
    name: Mapped[str] = mapped_column(sa.String(30))


engine = sa.create_engine("duckdb:///:memory:")
Base.metadata.create_all(bind=engine)

with Session(engine) as session:
    spongebob = User(name="spongebob")
    sandy = User(name="sandy")
    patrick = User(name="patrick")
    session.add_all([spongebob, sandy, patrick])
    session.commit()


def run_query(engine: sa.Engine):
    with engine.connect() as conn:
        return conn.execute(sa.text("select * from Users")).fetchall()

Running the run_query function works as expected:

>>> run_query(engine)
[(1, 'spongebob'), (2, 'sandy'), (3, 'patrick')]

...but if I run it in a background thread I get a Catalog Error: Table with name Users does not exist! exception 😔

My assumption is that the engine loses it's connection to the in-memory database in the main thread and creates a new in-memory database where that table doesn't exist?

>>> await anyio.to_thread.run_sync(run_query, engine)
---------------------------------------------------------------------------
CatalogException                          Traceback (most recent call last)
File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/sqlalchemy/engine/base.py:1967, in Connection._exec_single_context(self, dialect, context, statement, parameters)
   1966     if not evt_handled:
-> 1967         self.dialect.do_execute(
   1968             cursor, str_statement, effective_parameters, context
   1969         )
   1971 if self._has_events or self.engine._has_events:

File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/sqlalchemy/engine/default.py:941, in DefaultDialect.do_execute(self, cursor, statement, parameters, context)
    940 def do_execute(self, cursor, statement, parameters, context=None):
--> 941     cursor.execute(statement, parameters)

File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/duckdb_engine/__init__.py:140, in CursorWrapper.execute(self, statement, parameters, context)
    139     else:
--> 140         self.__c.execute(statement, parameters)
    141 except RuntimeError as e:

CatalogException: Catalog Error: Table with name Users does not exist!
Did you mean "sqlite_master"?
LINE 1: select * from Users
                      ^

The above exception was the direct cause of the following exception:

ProgrammingError                          Traceback (most recent call last)
Cell In[10], line 1
----> 1 await anyio.to_thread.run_sync(run_query, engine)

File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/anyio/to_thread.py:56, in run_sync(func, abandon_on_cancel, cancellable, limiter, *args)
     48     abandon_on_cancel = cancellable
     49     warn(
     50         "The `cancellable=` keyword argument to `anyio.to_thread.run_sync` is "
     51         "deprecated since AnyIO 4.1.0; use `abandon_on_cancel=` instead",
     52         DeprecationWarning,
     53         stacklevel=2,
     54     )
---> 56 return await get_async_backend().run_sync_in_worker_thread(
     57     func, args, abandon_on_cancel=abandon_on_cancel, limiter=limiter
     58 )

File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/anyio/_backends/_trio.py:1060, in TrioBackend.run_sync_in_worker_thread(cls, func, args, abandon_on_cancel, limiter)
   1057         return func(*args)
   1059 token = TrioBackend.current_token()
-> 1060 return await run_sync(
   1061     wrapper,
   1062     abandon_on_cancel=abandon_on_cancel,
   1063     limiter=cast(trio.CapacityLimiter, limiter),
   1064 )

File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/trio/_threads.py:437, in to_thread_run_sync(sync_fn, thread_name, abandon_on_cancel, limiter, *args)
    433 msg_from_thread: outcome.Outcome[RetT] | Run[object] | RunSync[object] = (
    434     await trio.lowlevel.wait_task_rescheduled(abort)
    435 )
    436 if isinstance(msg_from_thread, outcome.Outcome):
--> 437     return msg_from_thread.unwrap()
    438 elif isinstance(msg_from_thread, Run):
    439     await msg_from_thread.run()

File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/outcome/_impl.py:213, in Error.unwrap(***failed resolving arguments***)
    211 captured_error = self.error
    212 try:
--> 213     raise captured_error
    214 finally:
    215     # We want to avoid creating a reference cycle here. Python does
    216     # collect cycles just fine, so it wouldn't be the end of the world
   (...)
    225     # methods frame, we avoid the 'captured_error' object's
    226     # __traceback__ from indirectly referencing 'captured_error'.
    227     del captured_error, self

File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/trio/_threads.py:363, in to_thread_run_sync.<locals>.report_back_in_trio_thread_fn.<locals>.do_release_then_return_result()
    357 def do_release_then_return_result() -> RetT:
    358     # release_on_behalf_of is an arbitrary user-defined method, so it
    359     # might raise an error. If it does, we want that error to
    360     # replace the regular return value, and if the regular return was
    361     # already an exception then we want them to chain.
    362     try:
--> 363         return result.unwrap()
    364     finally:
    365         limiter.release_on_behalf_of(placeholder)

File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/outcome/_impl.py:213, in Error.unwrap(***failed resolving arguments***)
    211 captured_error = self.error
    212 try:
--> 213     raise captured_error
    214 finally:
    215     # We want to avoid creating a reference cycle here. Python does
    216     # collect cycles just fine, so it wouldn't be the end of the world
   (...)
    225     # methods frame, we avoid the 'captured_error' object's
    226     # __traceback__ from indirectly referencing 'captured_error'.
    227     del captured_error, self

File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/trio/_threads.py:382, in to_thread_run_sync.<locals>.worker_fn()
    380 PARENT_TASK_DATA.task_register = task_register
    381 try:
--> 382     ret = context.run(sync_fn, *args)
    384     if inspect.iscoroutine(ret):
    385         # Manually close coroutine to avoid RuntimeWarnings
    386         ret.close()

File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/anyio/_backends/_trio.py:1057, in TrioBackend.run_sync_in_worker_thread.<locals>.wrapper()
   1055 def wrapper() -> T_Retval:
   1056     with claim_worker_thread(TrioBackend, token):
-> 1057         return func(*args)

Cell In[8], line 3, in run_query(engine)
      1 def run_query(engine: sa.Engine):
      2     with engine.connect() as conn:
----> 3         return conn.execute(sa.text("select * from Users")).fetchall()

File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/sqlalchemy/engine/base.py:1418, in Connection.execute(self, statement, parameters, execution_options)
   1416     raise exc.ObjectNotExecutableError(statement) from err
   1417 else:
-> 1418     return meth(
   1419         self,
   1420         distilled_parameters,
   1421         execution_options or NO_OPTIONS,
   1422     )

File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/sqlalchemy/sql/elements.py:515, in ClauseElement._execute_on_connection(self, connection, distilled_params, execution_options)
    513     if TYPE_CHECKING:
    514         assert isinstance(self, Executable)
--> 515     return connection._execute_clauseelement(
    516         self, distilled_params, execution_options
    517     )
    518 else:
    519     raise exc.ObjectNotExecutableError(self)

File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/sqlalchemy/engine/base.py:1640, in Connection._execute_clauseelement(self, elem, distilled_parameters, execution_options)
   1628 compiled_cache: Optional[CompiledCacheType] = execution_options.get(
   1629     "compiled_cache", self.engine._compiled_cache
   1630 )
   1632 compiled_sql, extracted_params, cache_hit = elem._compile_w_cache(
   1633     dialect=dialect,
   1634     compiled_cache=compiled_cache,
   (...)
   1638     linting=self.dialect.compiler_linting | compiler.WARN_LINTING,
   1639 )
-> 1640 ret = self._execute_context(
   1641     dialect,
   1642     dialect.execution_ctx_cls._init_compiled,
   1643     compiled_sql,
   1644     distilled_parameters,
   1645     execution_options,
   1646     compiled_sql,
   1647     distilled_parameters,
   1648     elem,
   1649     extracted_params,
   1650     cache_hit=cache_hit,
   1651 )
   1652 if has_events:
   1653     self.dispatch.after_execute(
   1654         self,
   1655         elem,
   (...)
   1659         ret,
   1660     )

File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/sqlalchemy/engine/base.py:1846, in Connection._execute_context(self, dialect, constructor, statement, parameters, execution_options, *args, **kw)
   1844     return self._exec_insertmany_context(dialect, context)
   1845 else:
-> 1846     return self._exec_single_context(
   1847         dialect, context, statement, parameters
   1848     )

File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/sqlalchemy/engine/base.py:1986, in Connection._exec_single_context(self, dialect, context, statement, parameters)
   1983     result = context._setup_result_proxy()
   1985 except BaseException as e:
-> 1986     self._handle_dbapi_exception(
   1987         e, str_statement, effective_parameters, cursor, context
   1988     )
   1990 return result

File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/sqlalchemy/engine/base.py:2355, in Connection._handle_dbapi_exception(self, e, statement, parameters, cursor, context, is_sub_exec)
   2353 elif should_wrap:
   2354     assert sqlalchemy_exception is not None
-> 2355     raise sqlalchemy_exception.with_traceback(exc_info[2]) from e
   2356 else:
   2357     assert exc_info[1] is not None

File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/sqlalchemy/engine/base.py:1967, in Connection._exec_single_context(self, dialect, context, statement, parameters)
   1965                 break
   1966     if not evt_handled:
-> 1967         self.dialect.do_execute(
   1968             cursor, str_statement, effective_parameters, context
   1969         )
   1971 if self._has_events or self.engine._has_events:
   1972     self.dispatch.after_cursor_execute(
   1973         self,
   1974         cursor,
   (...)
   1978         context.executemany,
   1979     )

File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/sqlalchemy/engine/default.py:941, in DefaultDialect.do_execute(self, cursor, statement, parameters, context)
    940 def do_execute(self, cursor, statement, parameters, context=None):
--> 941     cursor.execute(statement, parameters)

File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/duckdb_engine/__init__.py:140, in CursorWrapper.execute(self, statement, parameters, context)
    138         self.__c.execute(statement)
    139     else:
--> 140         self.__c.execute(statement, parameters)
    141 except RuntimeError as e:
    142     if e.args[0].startswith("Not implemented Error"):

ProgrammingError: (duckdb.duckdb.CatalogException) Catalog Error: Table with name Users does not exist!
Did you mean "sqlite_master"?
LINE 1: select * from Users
                      ^
[SQL: select * from Users]
(Background on this error at: https://sqlalche.me/e/20/f405)

dhirschfeld avatar Oct 01 '24 00:10 dhirschfeld

If I change the run_query function to instead accept a sa.Connection then the query works in a background thread:

>>> def run_query(conn: sa.Connection):
...     return conn.execute(sa.text("select * from Users")).fetchall()

>>> with engine.connect() as conn:
...     res = run_query(conn)

>>> res
[(1, 'spongebob'), (2, 'sandy'), (3, 'patrick')]

>>> with engine.connect() as conn:
...     res = await anyio.to_thread.run_sync(run_query, conn)

>>> res
[(1, 'spongebob'), (2, 'sandy'), (3, 'patrick')]

dhirschfeld avatar Oct 01 '24 00:10 dhirschfeld

It would be great if it were possible to pass an engine to s separate thread to use so you could use the same code irrespective of whether you were connected to a Postgres database in production or a duckdb in-memory database in CI.

dhirschfeld avatar Oct 01 '24 00:10 dhirschfeld

Calling back into the main-thread from the worker thread seems to work, but then it only works from the worker-thread context, so, not ideal.

def run_query(engine: sa.Engine):
    with anyio.from_thread.run_sync(engine.connect) as conn:
        return conn.execute(sa.text("select * from Users")).fetchall()

dhirschfeld avatar Oct 01 '24 01:10 dhirschfeld