feat: enable experimental async use of driver
FYI: I am using the duckdb_engine as Async engine
I had to patch
duckdb_engine.duckdb_engine.Dialect.get_pool_class
in order to be able to set an Async Connection pool
FYI: I am using the duckdb_engine as Async engine
I had to patch
duckdb_engine.duckdb_engine.Dialect.get_pool_classin order to be able to set an Async Connection pool
That shouldn't be required, you should just be able to pass poolclass=... to the create_engine(...) call
latest version of SQLAlchemy is more explicit it adds a verification that the async status of the Dialect matches the async status of the connection pool.
I followed the psycopg3 engine example, and created AsyncDuckDbDialect as a subclass of the original dialect
import duckdb_engine
class AsyncDialect(duckdb_engine.Dialect):
is_async = True
@classmethod
def get_pool_class(cls, url):
async_fallback = url.query.get("async_fallback", False)
if util.asbool(async_fallback):
return pool.FallbackAsyncAdaptedQueuePool
else:
return pool.AsyncAdaptedQueuePool
duckdb_engine.Dialect.get_async_dialect_cls = lambda cls: AsyncDialect
This PR currently has a merge conflict. Please resolve this and then re-add the automerge label.
the method @sinaiy mentioned works for me in the mean time. here it is with type hints
import logging
import typing as t
import duckdb_engine
from pydantic_core import from_json, to_json
from sqlalchemy import URL, pool, util
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine
# Assuming the logger is already configured elsewhere
logger = logging.getLogger(__name__)
class AsyncDialect(duckdb_engine.Dialect):
is_async = True
@classmethod
def get_pool_class(cls, url: URL):
async_fallback = url.query.get("async_fallback", False)
if util.asbool(async_fallback):
return pool.FallbackAsyncAdaptedQueuePool
else:
return pool.AsyncAdaptedQueuePool
duckdb_engine.Dialect.get_async_dialect_cls = lambda cls: AsyncDialect # type: ignore