cloud-sql-python-connector icon indicating copy to clipboard operation
cloud-sql-python-connector copied to clipboard

WIP: investigate adding connection pooling as part of connector package

Open jackwotherspoon opened this issue 2 years ago • 0 comments

PR that investigates adding connection pooling with SQLAlchemy internally to provide a basic usage and reduce code required for users to connect. Also hides workaround code to provide support for asyncpg connection pooling with SQLAlchemy.

Synchronous pool usage:

from google.cloud.sql.connector import create_pool

pool = create_pool(
    "project:region:instance",
    "pg8000",
    user="my-user",
    password="my-password",
    db="my-database",
)

with pool.connect() as db_conn:
    results = db_conn.execute("SELECT * from ratings").fetchall()
    for row in results:
        # ... do something with results

Async pool usage:

from google.cloud.sql.connector import create_async_pool
from sqlalchemy import text

pool = await create_async_pool(
    "project:region:instance",
    "asyncpg",
    user="my-user",
    password="my-password",
    db="my-database",
)

async with pool.connect() as db_conn:
    query = await db_conn.execute(text("SELECT * from ratings"))
    results = query.fetchall()
    for row in results:
        # ... do something with results

This helps reduce the code needed to connect as seen when comparing the current ways below.

Current connection pool setup (sync):

from google.cloud.sql.connector import Connector
import sqlalchemy

# function to return the database connection
def getconn() -> pymysql.connections.Connection:
    with Connector() as connector():
        conn: pymysql.connections.Connection = connector.connect(
            "project:region:instance",
            "pymysql",
            user="my-user",
            password="my-password",
            db="my-db-name"
        )
        return conn

# create connection pool
pool = sqlalchemy.create_engine(
    "mysql+pymysql://",
    creator=getconn,
)

with pool.connect() as db_conn:
    results = db_conn.execute("SELECT * from ratings").fetchall()
    for row in results:
        # ... do something with results

Current connection pool setup (async):

import asyncio
from sqlalchemy import text
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.util import await_only
from google.cloud.sql.connector import Connector

async def async_creator():
    loop = asyncio.get_running_loop()
    async with Connector(loop=loop) as connector:
        conn = await connector.connect_async(
            "project:region:instance",
            "asyncpg",
            user="my-db-user",
            password="my-db-password",
            db="my-db-name",
        )
        return conn

async def create_pool():
    def adapted_creator():
        dbapi = engine.dialect.dbapi
        from sqlalchemy.dialects.postgresql.asyncpg import (
            AsyncAdapt_asyncpg_connection,
        )
        return AsyncAdapt_asyncpg_connection(
            dbapi,
            await_only(async_creator()),
            prepared_statement_cache_size=100,
        )

    # create async connection pool with wrapped creator
    engine = create_async_engine(
        "postgresql+asyncpg://",
        echo=True,
        creator=adapted_creator,
    )
    
    # use connection pooling with Cloud SQL Python Connector
    async with engine.connect() as conn:
        query = await conn.execute(text("SELECT * from ratings"))
        results = query.fetchall()
        for row in results:
            # ... do something with results

jackwotherspoon avatar Aug 15 '22 19:08 jackwotherspoon