databases icon indicating copy to clipboard operation
databases copied to clipboard

psqlpy adapter

Open sharkguto opened this issue 8 months ago • 0 comments

Hi,

I have code an adapter for using psqlpy driver , as following code:

from databases.backends.postgres import PostgresBackend, ConnectionBackend
from databases.core import DatabaseURL
import psqlpy
from typing import Any, Dict, List, Optional
import asyncio


class PSQLPyConnectionBackend(ConnectionBackend):
    def __init__(self, pool: psqlpy.ConnectionPool):
        self._pool = pool
        self._connection = None
        self._lock = asyncio.Lock()

    async def __aenter__(self):
        await self.acquire()
        return self

    async def __aexit__(self, exc_type, exc, tb):
        await self.release()

    async def acquire(self) -> None:
        async with self._lock:
            if self._connection is None:
                self._connection = await self._pool.connection()

    async def release(self) -> None:
        async with self._lock:
            if self._connection is not None:
                self._connection = None  # Pool-managed, no explicit close needed

    async def fetch_all(self, query: str, values: Optional[Dict] = None) -> List[Any]:
        if self._connection is None:
            await self.acquire()
        query_str = str(query)
        params = values if isinstance(values, (dict, list, tuple)) else []
        cursor = await self._connection.execute(query_str, params)

        return cursor.result()

    async def fetch_one(
        self, query: str, values: Optional[Dict] = None
    ) -> Optional[Any]:
        if self._connection is None:
            await self.acquire()
        query_str = str(query)
        params = values if isinstance(values, (dict, list, tuple)) else []
        cursor = await self._connection.execute(query_str, params)

        return cursor.result()

    async def execute(self, query: str, values: Optional[Dict] = None) -> None:
        if self._connection is None:
            await self.acquire()
        await self._connection.execute(query, values or {})

    async def execute_many(self, query: str, values: List[Dict]) -> None:
        if self._connection is None:
            await self.acquire()
        for value in values:
            await self._connection.execute(query, value)

    async def iterate(self, query: str, values: Optional[Dict] = None) -> Any:
        if self._connection is None:
            await self.acquire()
        query_str = str(query)
        params = values if isinstance(values, (dict, list, tuple)) else []
        cursor = await self._connection.execute(query_str, params)
        async for row in cursor:
            yield dict(row)


class PSQLPyBackend(PostgresBackend):
    def __init__(self, database_url: DatabaseURL, **options: Any) -> None:
        super().__init__(database_url, **options)
        self._pool: Optional[psqlpy.ConnectionPool] = None
        self._url = database_url
        self._options = options

    async def connect(self) -> None:
        if self._pool is None:
            print("Connecting with PSQLPyBackend")
            self._pool = psqlpy.ConnectionPool(
                username=str(self._url.username or "postgres"),
                password=str(self._url.password or ""),
                host=str(self._url.hostname or "localhost"),
                port=int(self._url.port or 5432),
                db_name=str(self._url.database or ""),
                max_db_pool_size=self._options.get("max_size", 20),
            )

    async def disconnect(self) -> None:
        if self._pool is not None:
            print("Disconnecting PSQLPyBackend")
            await self._pool.close()
            self._pool = None

    def connection(self) -> PSQLPyConnectionBackend:
        if self._pool is None:
            raise RuntimeError("Database not connected. Call connect() first.")
        return PSQLPyConnectionBackend(self._pool)

how to use:

.....

Database.SUPPORTED_BACKENDS["psqlpy"] = "velejar.utils.psqlpy_adapter:PSQLPyBackend"

SQLALCHEMY_DATABASE_URI = f"psqlpy://{POSTGRES_USER}:{urllib.parse.quote_plus(POSTGRES_PASSWORD)}@{POSTGRES_SERVER}/{POSTGRES_DB}"

database = Database(
    SQLALCHEMY_DATABASE_URI,
    min_size=1,
    max_size=POSTGRES_POOL_SIZE,
)

It would be great if you guys implement this new driver: https://github.com/psqlpy-python/psqlpy

sharkguto avatar May 09 '25 14:05 sharkguto