duckdb_engine
duckdb_engine copied to clipboard
Reusing engines from different threads
My observation is that passing an engine connected to an in-memory duckdb database to a different thread doesn't work.
I'm wondering if that's expected or if it would be considered a bug / missing feature?
Example:
import anyio
import sqlalchemy as sa
from sqlalchemy.orm import (
DeclarativeBase,
Mapped,
Session,
mapped_column,
)
class Base(DeclarativeBase):
pass
seq = sa.Sequence("user_id")
class User(Base):
__tablename__ = "Users"
id: Mapped[int] = mapped_column(
seq,
server_default=seq.next_value(),
primary_key=True,
)
name: Mapped[str] = mapped_column(sa.String(30))
engine = sa.create_engine("duckdb:///:memory:")
Base.metadata.create_all(bind=engine)
with Session(engine) as session:
spongebob = User(name="spongebob")
sandy = User(name="sandy")
patrick = User(name="patrick")
session.add_all([spongebob, sandy, patrick])
session.commit()
def run_query(engine: sa.Engine):
with engine.connect() as conn:
return conn.execute(sa.text("select * from Users")).fetchall()
Running the run_query function works as expected:
>>> run_query(engine)
[(1, 'spongebob'), (2, 'sandy'), (3, 'patrick')]
...but if I run it in a background thread I get a Catalog Error: Table with name Users does not exist! exception 😔
My assumption is that the engine loses it's connection to the in-memory database in the main thread and creates a new in-memory database where that table doesn't exist?
>>> await anyio.to_thread.run_sync(run_query, engine)
---------------------------------------------------------------------------
CatalogException Traceback (most recent call last)
File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/sqlalchemy/engine/base.py:1967, in Connection._exec_single_context(self, dialect, context, statement, parameters)
1966 if not evt_handled:
-> 1967 self.dialect.do_execute(
1968 cursor, str_statement, effective_parameters, context
1969 )
1971 if self._has_events or self.engine._has_events:
File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/sqlalchemy/engine/default.py:941, in DefaultDialect.do_execute(self, cursor, statement, parameters, context)
940 def do_execute(self, cursor, statement, parameters, context=None):
--> 941 cursor.execute(statement, parameters)
File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/duckdb_engine/__init__.py:140, in CursorWrapper.execute(self, statement, parameters, context)
139 else:
--> 140 self.__c.execute(statement, parameters)
141 except RuntimeError as e:
CatalogException: Catalog Error: Table with name Users does not exist!
Did you mean "sqlite_master"?
LINE 1: select * from Users
^
The above exception was the direct cause of the following exception:
ProgrammingError Traceback (most recent call last)
Cell In[10], line 1
----> 1 await anyio.to_thread.run_sync(run_query, engine)
File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/anyio/to_thread.py:56, in run_sync(func, abandon_on_cancel, cancellable, limiter, *args)
48 abandon_on_cancel = cancellable
49 warn(
50 "The `cancellable=` keyword argument to `anyio.to_thread.run_sync` is "
51 "deprecated since AnyIO 4.1.0; use `abandon_on_cancel=` instead",
52 DeprecationWarning,
53 stacklevel=2,
54 )
---> 56 return await get_async_backend().run_sync_in_worker_thread(
57 func, args, abandon_on_cancel=abandon_on_cancel, limiter=limiter
58 )
File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/anyio/_backends/_trio.py:1060, in TrioBackend.run_sync_in_worker_thread(cls, func, args, abandon_on_cancel, limiter)
1057 return func(*args)
1059 token = TrioBackend.current_token()
-> 1060 return await run_sync(
1061 wrapper,
1062 abandon_on_cancel=abandon_on_cancel,
1063 limiter=cast(trio.CapacityLimiter, limiter),
1064 )
File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/trio/_threads.py:437, in to_thread_run_sync(sync_fn, thread_name, abandon_on_cancel, limiter, *args)
433 msg_from_thread: outcome.Outcome[RetT] | Run[object] | RunSync[object] = (
434 await trio.lowlevel.wait_task_rescheduled(abort)
435 )
436 if isinstance(msg_from_thread, outcome.Outcome):
--> 437 return msg_from_thread.unwrap()
438 elif isinstance(msg_from_thread, Run):
439 await msg_from_thread.run()
File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/outcome/_impl.py:213, in Error.unwrap(***failed resolving arguments***)
211 captured_error = self.error
212 try:
--> 213 raise captured_error
214 finally:
215 # We want to avoid creating a reference cycle here. Python does
216 # collect cycles just fine, so it wouldn't be the end of the world
(...)
225 # methods frame, we avoid the 'captured_error' object's
226 # __traceback__ from indirectly referencing 'captured_error'.
227 del captured_error, self
File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/trio/_threads.py:363, in to_thread_run_sync.<locals>.report_back_in_trio_thread_fn.<locals>.do_release_then_return_result()
357 def do_release_then_return_result() -> RetT:
358 # release_on_behalf_of is an arbitrary user-defined method, so it
359 # might raise an error. If it does, we want that error to
360 # replace the regular return value, and if the regular return was
361 # already an exception then we want them to chain.
362 try:
--> 363 return result.unwrap()
364 finally:
365 limiter.release_on_behalf_of(placeholder)
File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/outcome/_impl.py:213, in Error.unwrap(***failed resolving arguments***)
211 captured_error = self.error
212 try:
--> 213 raise captured_error
214 finally:
215 # We want to avoid creating a reference cycle here. Python does
216 # collect cycles just fine, so it wouldn't be the end of the world
(...)
225 # methods frame, we avoid the 'captured_error' object's
226 # __traceback__ from indirectly referencing 'captured_error'.
227 del captured_error, self
File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/trio/_threads.py:382, in to_thread_run_sync.<locals>.worker_fn()
380 PARENT_TASK_DATA.task_register = task_register
381 try:
--> 382 ret = context.run(sync_fn, *args)
384 if inspect.iscoroutine(ret):
385 # Manually close coroutine to avoid RuntimeWarnings
386 ret.close()
File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/anyio/_backends/_trio.py:1057, in TrioBackend.run_sync_in_worker_thread.<locals>.wrapper()
1055 def wrapper() -> T_Retval:
1056 with claim_worker_thread(TrioBackend, token):
-> 1057 return func(*args)
Cell In[8], line 3, in run_query(engine)
1 def run_query(engine: sa.Engine):
2 with engine.connect() as conn:
----> 3 return conn.execute(sa.text("select * from Users")).fetchall()
File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/sqlalchemy/engine/base.py:1418, in Connection.execute(self, statement, parameters, execution_options)
1416 raise exc.ObjectNotExecutableError(statement) from err
1417 else:
-> 1418 return meth(
1419 self,
1420 distilled_parameters,
1421 execution_options or NO_OPTIONS,
1422 )
File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/sqlalchemy/sql/elements.py:515, in ClauseElement._execute_on_connection(self, connection, distilled_params, execution_options)
513 if TYPE_CHECKING:
514 assert isinstance(self, Executable)
--> 515 return connection._execute_clauseelement(
516 self, distilled_params, execution_options
517 )
518 else:
519 raise exc.ObjectNotExecutableError(self)
File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/sqlalchemy/engine/base.py:1640, in Connection._execute_clauseelement(self, elem, distilled_parameters, execution_options)
1628 compiled_cache: Optional[CompiledCacheType] = execution_options.get(
1629 "compiled_cache", self.engine._compiled_cache
1630 )
1632 compiled_sql, extracted_params, cache_hit = elem._compile_w_cache(
1633 dialect=dialect,
1634 compiled_cache=compiled_cache,
(...)
1638 linting=self.dialect.compiler_linting | compiler.WARN_LINTING,
1639 )
-> 1640 ret = self._execute_context(
1641 dialect,
1642 dialect.execution_ctx_cls._init_compiled,
1643 compiled_sql,
1644 distilled_parameters,
1645 execution_options,
1646 compiled_sql,
1647 distilled_parameters,
1648 elem,
1649 extracted_params,
1650 cache_hit=cache_hit,
1651 )
1652 if has_events:
1653 self.dispatch.after_execute(
1654 self,
1655 elem,
(...)
1659 ret,
1660 )
File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/sqlalchemy/engine/base.py:1846, in Connection._execute_context(self, dialect, constructor, statement, parameters, execution_options, *args, **kw)
1844 return self._exec_insertmany_context(dialect, context)
1845 else:
-> 1846 return self._exec_single_context(
1847 dialect, context, statement, parameters
1848 )
File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/sqlalchemy/engine/base.py:1986, in Connection._exec_single_context(self, dialect, context, statement, parameters)
1983 result = context._setup_result_proxy()
1985 except BaseException as e:
-> 1986 self._handle_dbapi_exception(
1987 e, str_statement, effective_parameters, cursor, context
1988 )
1990 return result
File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/sqlalchemy/engine/base.py:2355, in Connection._handle_dbapi_exception(self, e, statement, parameters, cursor, context, is_sub_exec)
2353 elif should_wrap:
2354 assert sqlalchemy_exception is not None
-> 2355 raise sqlalchemy_exception.with_traceback(exc_info[2]) from e
2356 else:
2357 assert exc_info[1] is not None
File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/sqlalchemy/engine/base.py:1967, in Connection._exec_single_context(self, dialect, context, statement, parameters)
1965 break
1966 if not evt_handled:
-> 1967 self.dialect.do_execute(
1968 cursor, str_statement, effective_parameters, context
1969 )
1971 if self._has_events or self.engine._has_events:
1972 self.dispatch.after_cursor_execute(
1973 self,
1974 cursor,
(...)
1978 context.executemany,
1979 )
File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/sqlalchemy/engine/default.py:941, in DefaultDialect.do_execute(self, cursor, statement, parameters, context)
940 def do_execute(self, cursor, statement, parameters, context=None):
--> 941 cursor.execute(statement, parameters)
File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/duckdb_engine/__init__.py:140, in CursorWrapper.execute(self, statement, parameters, context)
138 self.__c.execute(statement)
139 else:
--> 140 self.__c.execute(statement, parameters)
141 except RuntimeError as e:
142 if e.args[0].startswith("Not implemented Error"):
ProgrammingError: (duckdb.duckdb.CatalogException) Catalog Error: Table with name Users does not exist!
Did you mean "sqlite_master"?
LINE 1: select * from Users
^
[SQL: select * from Users]
(Background on this error at: https://sqlalche.me/e/20/f405)
If I change the run_query function to instead accept a sa.Connection then the query works in a background thread:
>>> def run_query(conn: sa.Connection):
... return conn.execute(sa.text("select * from Users")).fetchall()
>>> with engine.connect() as conn:
... res = run_query(conn)
>>> res
[(1, 'spongebob'), (2, 'sandy'), (3, 'patrick')]
>>> with engine.connect() as conn:
... res = await anyio.to_thread.run_sync(run_query, conn)
>>> res
[(1, 'spongebob'), (2, 'sandy'), (3, 'patrick')]
It would be great if it were possible to pass an engine to s separate thread to use so you could use the same code irrespective of whether you were connected to a Postgres database in production or a duckdb in-memory database in CI.
Calling back into the main-thread from the worker thread seems to work, but then it only works from the worker-thread context, so, not ideal.
def run_query(engine: sa.Engine):
with anyio.from_thread.run_sync(engine.connect) as conn:
return conn.execute(sa.text("select * from Users")).fetchall()