duckdb_engine icon indicating copy to clipboard operation
duckdb_engine copied to clipboard

feat: enable experimental async use of driver

Open Mause opened this issue 1 year ago • 5 comments

Mause avatar Feb 14 '24 04:02 Mause

FYI: I am using the duckdb_engine as Async engine

I had to patch duckdb_engine.duckdb_engine.Dialect.get_pool_class in order to be able to set an Async Connection pool

sinaiy avatar Mar 24 '24 12:03 sinaiy

FYI: I am using the duckdb_engine as Async engine

I had to patch duckdb_engine.duckdb_engine.Dialect.get_pool_class in order to be able to set an Async Connection pool

That shouldn't be required, you should just be able to pass poolclass=... to the create_engine(...) call

Mause avatar Apr 07 '24 17:04 Mause

latest version of SQLAlchemy is more explicit it adds a verification that the async status of the Dialect matches the async status of the connection pool.

I followed the psycopg3 engine example, and created AsyncDuckDbDialect as a subclass of the original dialect

import duckdb_engine

class AsyncDialect(duckdb_engine.Dialect):
    is_async = True

    @classmethod
    def get_pool_class(cls, url):
        async_fallback = url.query.get("async_fallback", False)

        if util.asbool(async_fallback):
            return pool.FallbackAsyncAdaptedQueuePool
        else:
            return pool.AsyncAdaptedQueuePool


duckdb_engine.Dialect.get_async_dialect_cls = lambda cls: AsyncDialect

sinaiy avatar Apr 08 '24 08:04 sinaiy

This PR currently has a merge conflict. Please resolve this and then re-add the automerge label.

kodiakhq[bot] avatar Apr 18 '24 14:04 kodiakhq[bot]

the method @sinaiy mentioned works for me in the mean time. here it is with type hints

import logging
import typing as t

import duckdb_engine
from pydantic_core import from_json, to_json
from sqlalchemy import URL, pool, util
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine

# Assuming the logger is already configured elsewhere
logger = logging.getLogger(__name__)


class AsyncDialect(duckdb_engine.Dialect):
  is_async = True

  @classmethod
  def get_pool_class(cls, url: URL):
    async_fallback = url.query.get("async_fallback", False)

    if util.asbool(async_fallback):
      return pool.FallbackAsyncAdaptedQueuePool
    else:
      return pool.AsyncAdaptedQueuePool


duckdb_engine.Dialect.get_async_dialect_cls = lambda cls: AsyncDialect  # type: ignore

ShravanSunder avatar Jul 09 '24 14:07 ShravanSunder