sqlalchemy
sqlalchemy copied to clipboard
Using statement_cache_size asyncpg setting / prepared statement name for asyncpg w pgbouncer
Hi!
I use sqlalchemy 1.4 with asyncpg driver with pgbouncer.
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.asyncio import AsyncSession
engine = create_async_engine(
f'postgresql+asyncpg://{username}:{password}@{host}:{port}/{dbname}',
echo=False,
)
session_maker = sessionmaker(
engine,
class_=AsyncSession,
)
I have an error:
asyncpg.exceptions.DuplicatePreparedStatementError: prepared statement "__asyncpg_stmt_a__" already exists
HINT:
NOTE: pgbouncer with pool_mode set to "transaction" or
"statement" does not support prepared statements properly.
You have two options:
* if you are using pgbouncer for connection pooling to a
single server, switch to the connection pool functionality
provided by asyncpg, it is a much better option for this
purpose;
* if you have no option of avoiding the use of pgbouncer,
then you can set statement_cache_size to 0 when creating
the asyncpg connection object.
How i can pass this setting (statement_cache_size=0) to asyncpg connection object?
SQLAlchemy doesn't make use of "statement_cache_size" as it necessarily uses the asyncpg.connection.prepare() method directly, so there is no way to disable the use of prepared statements across the board. However, you can remove the use of prepared statement caching itself, so that it will make a new prepared statement each time, using prepared_statement_cache_size=0, if you can try that and see if it works we can mention this in the docs:
https://docs.sqlalchemy.org/en/14/dialects/postgresql.html#prepared-statement-cache
Thank you, @zzzeek
In my case it does not work.
I found in :
https://github.com/sqlalchemy/sqlalchemy/blob/master/lib/sqlalchemy/dialects/postgresql/asyncpg.py#L747
await_only(self.asyncpg.connect(*arg, **kw))
in **kw we does not pass statement_cache_size.
If i add:
if 'prepared_statement_cache_size' in kw:
kw['statement_cache_size'] = kw['prepared_statement_cache_size']
It helps sometimes, but sometimes does not (i don't know why...)
hi -
did you try just direct usage of the prepared_statement_cache_size variable as given:
engine = create_async_engine("postgresql+asyncpg://user:pass@hostname/dbname?prepared_statement_cache_size=0")
? if that doesn't work, we are out of luck. SQLAlchemy is forced to use connection.prepare() which means we have to use prepared statements.
Yes, i use this:
engine = create_async_engine("postgresql+asyncpg://user:pass@hostname/dbname?prepared_statement_cache_size=0")
all of our SELECT statements have to use connection.prepare() because we need to be able to call get_attributes(). if asyncpg could be convinced to give us this API without necessitating the use of connection.prepare() we could begin to think about how to support that.
the doc at https://magicstack.github.io/asyncpg/current/api/index.html?highlight=statement_cache_size doesnt claim this disables prepared statements, just that it doesn't cache them:
statement_cache_size (int) – The size of prepared statement LRU cache. Pass 0 to disable the cache.
oh perhaps this is needed for when we do INSERT/UPDATE/DELETE, OK.... the patch you tried at kw['statement_cache_size'] = kw['prepared_statement_cache_size']
should do that.
that is, if you use prepared_statement_cache_size=0 and add code like this:
diff --git a/lib/sqlalchemy/dialects/postgresql/asyncpg.py b/lib/sqlalchemy/dialects/postgresql/asyncpg.py
index 4a191cd286..415862a17b 100644
--- a/lib/sqlalchemy/dialects/postgresql/asyncpg.py
+++ b/lib/sqlalchemy/dialects/postgresql/asyncpg.py
@@ -735,6 +735,7 @@ class AsyncAdapt_asyncpg_dbapi:
prepared_statement_cache_size = kw.pop(
"prepared_statement_cache_size", 100
)
+ kw["statement_cache_size"] = prepared_statement_cache_size
if util.asbool(async_fallback):
return AsyncAdaptFallback_asyncpg_connection(
self,
that's the best we can do. if there are still problems then we need support from asyncpg
I try this. But for some reason this is not help. (sometimes an error appears but sometimes not...)
this might be related to the fact that we still use connection.prepare(). I dont know the internals of asyncpg enough to advise further on what might be going on.
Yes, this is a reason..
Also in AsyncAdapt_asyncpg_cursor._prepare_and_execute
We actively use prepared statements for executing queries.
In asyncpg, as i can see, for not prepared queries they does not use prepared statements
# Pseudo code:
# In asyncpg for statement_cache_size = 0
stmt = Connection._get_statement(
self,
query,
named=False # named=True is used for connection.prepare call
)
# in deep of this function, if named = False and statement_cache_size = 0
# asyncpg use empty name for prepared statements.
result = Connection._protocol.bind_execute(stmt, args, ...) # a little wrong but essence is
We had a similar problem due to multiple web workers. They generated prepared statements with the same names - original function to generate IDs looks like this:
def _get_unique_id(self, prefix):
global _uid
_uid += 1
return '__asyncpg_{}_{:x}__'.format(prefix, _uid)
So we just changed the Connection class a bit
from uuid import uuid4
from asyncpg import Connection
class CConnection(Connection):
def _get_unique_id(self, prefix: str) -> str:
return f'__asyncpg_{prefix}_{uuid4()}__'
You need to provide it when you create the engine
engine = create_async_engine(
settings.database_url,
connect_args={
'connection_class': CConnection,
},
)
@SlavaSkvortsov that seems to be something we cannot change on out end, other than subclassing the connection class to change a private method like in you example.
You may want to open an issue on the asyncpg repo for a solution out of the box. Imho _get_unique_id
should at least take into consideration the process pid (and/or thread id). The best way would be the ability of customizing it, maybe using a connect
kwarg or with the ability to specify a name when calling prepare
, since there is no way of manually assigning a name to a statement
engine = create_async_engine("postgresql+asyncpg://user:pass@hostname/dbname?prepared_statement_cache_size=0")
Should this configuration pass the prepared statement cache size parameter to asyncpg connection? With [email protected] it doesn't.
So i managed to set cache size on asyncpg side like this:
engine = create_async_engine(
"postgresql+asyncpg://user:pass@hostname/dbname?prepared_statement_cache_size=0",
poolclass=NullPool,
future=True,
connect_args={'statement_cache_size': 0},
)
Note the connect_args={'statement_cache_size': 0}
parameter.
And in combination with @SlavaSkvortsov's unique ids suggestion it seems that i got rid of prepared statement errors with pgbouncer.
When using asyncpg only, you can use statement_cache_size=0 and it won't use prepared statements at all, thus working behind pgbouncer in transaction mode.
My understanding is that sqlalchemy/asyncpg will use prepared statements no matter what the setting prepared_statement_cache_size value. This means that it will create prepared statements in any case. Using the unique ID workaround only hides the problem, you will create a lot of prepared statements in your open sessions to the database (each with a unique name), and each transaction will get a random session including some subset of those statements. Since you are disabling the cache, it won't be a real issue as it will keep creating new ones, and they will never conflict because the name is random, but they are still there and created. I'm not sure what's the mechanism to delete them, maybe they just get deleted whenever the backend session is closed, once in a while?
As for why sqlalchemy needs to use prepared statements in any case, I have no idea.
As for why sqlalchemy needs to use prepared statements in any case, I have no idea.
asyncpg has no other way of returning the type information of the selected columns in the query when using a normal query, so sqlalchemy needs to use prepared statements: https://github.com/sqlalchemy/sqlalchemy/blob/990eb3d8813369d3b8a7776ae85fb33627443d30/lib/sqlalchemy/dialects/postgresql/asyncpg.py#L613-L616 and https://github.com/sqlalchemy/sqlalchemy/blob/990eb3d8813369d3b8a7776ae85fb33627443d30/lib/sqlalchemy/dialects/postgresql/asyncpg.py#L392-L404
As for why sqlalchemy needs to use prepared statements in any case, I have no idea.
https://github.com/sqlalchemy/sqlalchemy/issues/6467#issuecomment-838549407
assuming we keep using prepared statements theres a bunch of people on this issue now can we come up with a way to fix what's wrong here and get this closed? thanks
https://github.com/MagicStack/asyncpg/issues/837 is closed via https://github.com/MagicStack/asyncpg/pull/846 . we will want to expose this and then add a documentation section that this is a means of using our asyncpg driver with pgbouncer along with a recipe based on UUID or similar.
Is there any update on how use sqlalchemy+asyncpg+pgbouncer? I'm new to sqlalchemy's async engine and stumbled onto this issue while trying out Prefect's v2 beta. Their package uses create_async_engine
under the hood, but even if I modified their implementation, I don't think they would accept the strategy used by @SlavaSkvortsov. The only other suggestion I've seen is setting pgbouncer’s pool_mode
to session
(according to asyncpg's FAQ), but I didn't have any luck with it.
EDIT: I should add -- session
mode didn't work for me because I have many long-lived clients connecting. So I depend on a pool in transaction mode.
Is there any update on how use sqlalchemy+asyncpg+pgbouncer?
an update would be right here, so if you don't see one, nope!
I'm new to sqlalchemy's async engine and stumbled onto this issue while trying out Prefect's v2 beta. Their package uses
create_async_engine
under the hood, but even if I modified their implementation, I don't think they would accept the strategy used by @SlavaSkvortsov. The only other suggestion I've seen is setting pgbouncer’spool_mode
tosession
(according to asyncpg's FAQ), but I didn't have any luck with it.
I think we want to be able to send a name to the prepared statement using connection.prepare(name=somename)
with this new parameter. then we'd need a hook for the asyncpg dialect where you send a callable that returns a name for a prepared statement that can isolate on pid or something like that.
EDIT: I should add --
session
mode didn't work for me because I have many long-lived clients connecting. So I depend on a pool in transaction mode.
then we'd need a hook for the asyncpg dialect where you send a callable that returns a name for a prepared statement that can isolate on pid or something like that.
Would I be able to sponser this PR or support it some way? As much as I'd want to open a PR, I wouldn't know where to start :fearful:.
not on this end, money doesn't really create time :) @CaselIT any interest?
I haven't look at this one enough, but would implementing this one https://github.com/sqlalchemy/sqlalchemy/issues/8215 help?
I mean can asyncpg be told how to name prepared statements?
@CaselIT I think it would help implement the fix used above https://github.com/sqlalchemy/sqlalchemy/issues/6467#issuecomment-864943824, but I'm not sure on the distinction between connect_args={'connection_class': ...}
above and async_creator
in the issue you mentioned.
And yes, I think asyncpg can now be told how to name prepared statements based on https://github.com/sqlalchemy/sqlalchemy/issues/6467#issuecomment-970348808
@CaselIT we need to support some kind of callable, or alternatively an option to do it ourselves, such that every time we call connection.prepare(), we can send in a name that disambiguates the name from other processes in some way, but this is tricky because the same pid can be re-used among new processes.
basically w/ pgbouncer, what asyncpg sees as "a new connection" is in fact a long lived connection maintained by pgbouncer.
which makes me ask, isn't there some way to just clear out the existing prepared statements on a connection ?
how about this ?
because there is basically no point in saving these prepared statements across "connects" as they are unusable.
@jacksund can you try this out? just add this event handler. does this fix?
engine = create_async_engine(
"postgresql+asyncpg://scott:tiger@localhost/test",
echo=True,
)
@event.listens_for(engine.sync_engine, "connect")
def clear_prepared_statements_on_connect(
dbapi_connection, connection_record
):
cursor = dbapi_connection.cursor()
cursor.execute("DEALLOCATE ALL")
cursor.close()
i mean this could also be configured in pgbouncer itself I would think
I think that duplicated names of prepared statements is just one part of problem -- another one is situation when pgbouncer is in transactional mode and different transactions on one sqalchemy session use same prepared statement. So, even I use single transaction like this:
async with engine.connect() as conn:
async with conn.begin():
result = await conn.execute(query)
I get an error like: "prepared statement "x" does not exist"
thanks @romsuhov. I'm getting a matching result of prepared statement "x" does not exist
, and once I try the fix by @zzzeek I'm back to prepared statement X already exists
OK folks can we please confirm, that these issues are local only to pgbouncer transaction and statement mode and not "the most polite" "session" mode? because per this doc session mode resets everything automatically and there should be no issue with prepared statements.
that is, @romsuhov and @jacksund are both trying to use "transaction" mode, where everything needs to be reset per transaction rather than per connection.
If so, then simply move the above event to DEALLOCATE ALL into the begin event (edit 8/18/2022: this version needs to turn off the prepared statement cache, since we can't rely on them being present in subsequent transactions)
engine = create_async_engine(
"postgresql+asyncpg://scott:tiger@localhost/test",
connect_args={"prepared_statement_cache_size": 0},
echo=True,
)
@event.listens_for(engine.sync_engine, "begin")
def clear_prepared_statements_on_begin(conn):
conn.exec_driver_sql("DEALLOCATE ALL")
however, this seems a bit wasteful and I'd consider benching against regular session mode, transaction mode seems a little weird
or just set server_reset_query_always to true in PGbouncer