cloud-sql-python-connector
cloud-sql-python-connector copied to clipboard
WIP: investigate adding connection pooling as part of connector package
PR that investigates adding connection pooling with SQLAlchemy internally to provide a basic usage and reduce code required for users to connect. Also hides workaround code to provide support for asyncpg
connection pooling with SQLAlchemy
.
Synchronous pool usage:
from google.cloud.sql.connector import create_pool
pool = create_pool(
"project:region:instance",
"pg8000",
user="my-user",
password="my-password",
db="my-database",
)
with pool.connect() as db_conn:
results = db_conn.execute("SELECT * from ratings").fetchall()
for row in results:
# ... do something with results
Async pool usage:
from google.cloud.sql.connector import create_async_pool
from sqlalchemy import text
pool = await create_async_pool(
"project:region:instance",
"asyncpg",
user="my-user",
password="my-password",
db="my-database",
)
async with pool.connect() as db_conn:
query = await db_conn.execute(text("SELECT * from ratings"))
results = query.fetchall()
for row in results:
# ... do something with results
This helps reduce the code needed to connect as seen when comparing the current ways below.
Current connection pool setup (sync):
from google.cloud.sql.connector import Connector
import sqlalchemy
# function to return the database connection
def getconn() -> pymysql.connections.Connection:
with Connector() as connector():
conn: pymysql.connections.Connection = connector.connect(
"project:region:instance",
"pymysql",
user="my-user",
password="my-password",
db="my-db-name"
)
return conn
# create connection pool
pool = sqlalchemy.create_engine(
"mysql+pymysql://",
creator=getconn,
)
with pool.connect() as db_conn:
results = db_conn.execute("SELECT * from ratings").fetchall()
for row in results:
# ... do something with results
Current connection pool setup (async):
import asyncio
from sqlalchemy import text
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.util import await_only
from google.cloud.sql.connector import Connector
async def async_creator():
loop = asyncio.get_running_loop()
async with Connector(loop=loop) as connector:
conn = await connector.connect_async(
"project:region:instance",
"asyncpg",
user="my-db-user",
password="my-db-password",
db="my-db-name",
)
return conn
async def create_pool():
def adapted_creator():
dbapi = engine.dialect.dbapi
from sqlalchemy.dialects.postgresql.asyncpg import (
AsyncAdapt_asyncpg_connection,
)
return AsyncAdapt_asyncpg_connection(
dbapi,
await_only(async_creator()),
prepared_statement_cache_size=100,
)
# create async connection pool with wrapped creator
engine = create_async_engine(
"postgresql+asyncpg://",
echo=True,
creator=adapted_creator,
)
# use connection pooling with Cloud SQL Python Connector
async with engine.connect() as conn:
query = await conn.execute(text("SELECT * from ratings"))
results = query.fetchall()
for row in results:
# ... do something with results