sqlalchemy icon indicating copy to clipboard operation
sqlalchemy copied to clipboard

Using statement_cache_size asyncpg setting / prepared statement name for asyncpg w pgbouncer

Open dyens opened this issue 3 years ago • 48 comments

Hi!

I use sqlalchemy 1.4 with asyncpg driver with pgbouncer.


    from sqlalchemy.ext.asyncio import create_async_engine
    from sqlalchemy.orm import sessionmaker
    from sqlalchemy.ext.asyncio import AsyncSession

    engine = create_async_engine(
        f'postgresql+asyncpg://{username}:{password}@{host}:{port}/{dbname}',
        echo=False,
    )
    session_maker = sessionmaker(
        engine,
        class_=AsyncSession,
    )


I have an error:

asyncpg.exceptions.DuplicatePreparedStatementError: prepared statement "__asyncpg_stmt_a__" already exists
HINT:
NOTE: pgbouncer with pool_mode set to "transaction" or
"statement" does not support prepared statements properly.
You have two options:

* if you are using pgbouncer for connection pooling to a
  single server, switch to the connection pool functionality
  provided by asyncpg, it is a much better option for this
  purpose;

* if you have no option of avoiding the use of pgbouncer,
  then you can set statement_cache_size to 0 when creating
  the asyncpg connection object.

How i can pass this setting (statement_cache_size=0) to asyncpg connection object?

dyens avatar May 11 '21 12:05 dyens

SQLAlchemy doesn't make use of "statement_cache_size" as it necessarily uses the asyncpg.connection.prepare() method directly, so there is no way to disable the use of prepared statements across the board. However, you can remove the use of prepared statement caching itself, so that it will make a new prepared statement each time, using prepared_statement_cache_size=0, if you can try that and see if it works we can mention this in the docs:

https://docs.sqlalchemy.org/en/14/dialects/postgresql.html#prepared-statement-cache

zzzeek avatar May 11 '21 13:05 zzzeek

Thank you, @zzzeek

In my case it does not work.

I found in :

https://github.com/sqlalchemy/sqlalchemy/blob/master/lib/sqlalchemy/dialects/postgresql/asyncpg.py#L747

                await_only(self.asyncpg.connect(*arg, **kw))

in **kw we does not pass statement_cache_size.

If i add:


if 'prepared_statement_cache_size' in kw:
    kw['statement_cache_size'] = kw['prepared_statement_cache_size']

It helps sometimes, but sometimes does not (i don't know why...)

dyens avatar May 11 '21 13:05 dyens

hi -

did you try just direct usage of the prepared_statement_cache_size variable as given:

engine = create_async_engine("postgresql+asyncpg://user:pass@hostname/dbname?prepared_statement_cache_size=0")

? if that doesn't work, we are out of luck. SQLAlchemy is forced to use connection.prepare() which means we have to use prepared statements.

zzzeek avatar May 11 '21 14:05 zzzeek

Yes, i use this:

engine = create_async_engine("postgresql+asyncpg://user:pass@hostname/dbname?prepared_statement_cache_size=0")

dyens avatar May 11 '21 14:05 dyens

all of our SELECT statements have to use connection.prepare() because we need to be able to call get_attributes(). if asyncpg could be convinced to give us this API without necessitating the use of connection.prepare() we could begin to think about how to support that.

zzzeek avatar May 11 '21 14:05 zzzeek

the doc at https://magicstack.github.io/asyncpg/current/api/index.html?highlight=statement_cache_size doesnt claim this disables prepared statements, just that it doesn't cache them:

statement_cache_size (int) – The size of prepared statement LRU cache. Pass 0 to disable the cache.

oh perhaps this is needed for when we do INSERT/UPDATE/DELETE, OK.... the patch you tried at kw['statement_cache_size'] = kw['prepared_statement_cache_size'] should do that.

zzzeek avatar May 11 '21 14:05 zzzeek

that is, if you use prepared_statement_cache_size=0 and add code like this:

diff --git a/lib/sqlalchemy/dialects/postgresql/asyncpg.py b/lib/sqlalchemy/dialects/postgresql/asyncpg.py
index 4a191cd286..415862a17b 100644
--- a/lib/sqlalchemy/dialects/postgresql/asyncpg.py
+++ b/lib/sqlalchemy/dialects/postgresql/asyncpg.py
@@ -735,6 +735,7 @@ class AsyncAdapt_asyncpg_dbapi:
         prepared_statement_cache_size = kw.pop(
             "prepared_statement_cache_size", 100
         )
+        kw["statement_cache_size"] = prepared_statement_cache_size
         if util.asbool(async_fallback):
             return AsyncAdaptFallback_asyncpg_connection(
                 self,

that's the best we can do. if there are still problems then we need support from asyncpg

zzzeek avatar May 11 '21 14:05 zzzeek

I try this. But for some reason this is not help. (sometimes an error appears but sometimes not...)

dyens avatar May 11 '21 14:05 dyens

this might be related to the fact that we still use connection.prepare(). I dont know the internals of asyncpg enough to advise further on what might be going on.

zzzeek avatar May 11 '21 14:05 zzzeek

Yes, this is a reason..

Also in AsyncAdapt_asyncpg_cursor._prepare_and_execute

We actively use prepared statements for executing queries.

In asyncpg, as i can see, for not prepared queries they does not use prepared statements


# Pseudo code:

# In asyncpg for statement_cache_size = 0
stmt = Connection._get_statement(
        self,
        query,
        named=False  # named=True is used for connection.prepare call
)

# in deep of this function, if named = False and statement_cache_size = 0
# asyncpg use empty name for prepared statements.

result = Connection._protocol.bind_execute(stmt, args, ...) # a little wrong but essence is 

dyens avatar May 11 '21 16:05 dyens

We had a similar problem due to multiple web workers. They generated prepared statements with the same names - original function to generate IDs looks like this:

    def _get_unique_id(self, prefix):
        global _uid
        _uid += 1
        return '__asyncpg_{}_{:x}__'.format(prefix, _uid)

So we just changed the Connection class a bit

from uuid import uuid4

from asyncpg import Connection


class CConnection(Connection):
    def _get_unique_id(self, prefix: str) -> str:
        return f'__asyncpg_{prefix}_{uuid4()}__'

You need to provide it when you create the engine

engine = create_async_engine(
    settings.database_url,
    connect_args={
        'connection_class': CConnection,
    },
)

SlavaSkvortsov avatar Jun 21 '21 11:06 SlavaSkvortsov

@SlavaSkvortsov that seems to be something we cannot change on out end, other than subclassing the connection class to change a private method like in you example.

You may want to open an issue on the asyncpg repo for a solution out of the box. Imho _get_unique_id should at least take into consideration the process pid (and/or thread id). The best way would be the ability of customizing it, maybe using a connect kwarg or with the ability to specify a name when calling prepare, since there is no way of manually assigning a name to a statement

CaselIT avatar Jun 21 '21 12:06 CaselIT

engine = create_async_engine("postgresql+asyncpg://user:pass@hostname/dbname?prepared_statement_cache_size=0")

Should this configuration pass the prepared statement cache size parameter to asyncpg connection? With [email protected] it doesn't.

So i managed to set cache size on asyncpg side like this:

engine = create_async_engine(
    "postgresql+asyncpg://user:pass@hostname/dbname?prepared_statement_cache_size=0",
    poolclass=NullPool,
    future=True,
    connect_args={'statement_cache_size': 0},
)

Note the connect_args={'statement_cache_size': 0} parameter.

And in combination with @SlavaSkvortsov's unique ids suggestion it seems that i got rid of prepared statement errors with pgbouncer.

forshev avatar Sep 15 '21 06:09 forshev

When using asyncpg only, you can use statement_cache_size=0 and it won't use prepared statements at all, thus working behind pgbouncer in transaction mode.

My understanding is that sqlalchemy/asyncpg will use prepared statements no matter what the setting prepared_statement_cache_size value. This means that it will create prepared statements in any case. Using the unique ID workaround only hides the problem, you will create a lot of prepared statements in your open sessions to the database (each with a unique name), and each transaction will get a random session including some subset of those statements. Since you are disabling the cache, it won't be a real issue as it will keep creating new ones, and they will never conflict because the name is random, but they are still there and created. I'm not sure what's the mechanism to delete them, maybe they just get deleted whenever the backend session is closed, once in a while?

As for why sqlalchemy needs to use prepared statements in any case, I have no idea.

rslinckx avatar Sep 15 '21 08:09 rslinckx

As for why sqlalchemy needs to use prepared statements in any case, I have no idea.

asyncpg has no other way of returning the type information of the selected columns in the query when using a normal query, so sqlalchemy needs to use prepared statements: https://github.com/sqlalchemy/sqlalchemy/blob/990eb3d8813369d3b8a7776ae85fb33627443d30/lib/sqlalchemy/dialects/postgresql/asyncpg.py#L613-L616 and https://github.com/sqlalchemy/sqlalchemy/blob/990eb3d8813369d3b8a7776ae85fb33627443d30/lib/sqlalchemy/dialects/postgresql/asyncpg.py#L392-L404

CaselIT avatar Sep 15 '21 10:09 CaselIT

As for why sqlalchemy needs to use prepared statements in any case, I have no idea.

https://github.com/sqlalchemy/sqlalchemy/issues/6467#issuecomment-838549407

zzzeek avatar Sep 15 '21 12:09 zzzeek

assuming we keep using prepared statements theres a bunch of people on this issue now can we come up with a way to fix what's wrong here and get this closed? thanks

zzzeek avatar Sep 15 '21 12:09 zzzeek

https://github.com/MagicStack/asyncpg/issues/837 is closed via https://github.com/MagicStack/asyncpg/pull/846 . we will want to expose this and then add a documentation section that this is a means of using our asyncpg driver with pgbouncer along with a recipe based on UUID or similar.

zzzeek avatar Nov 16 '21 14:11 zzzeek

Is there any update on how use sqlalchemy+asyncpg+pgbouncer? I'm new to sqlalchemy's async engine and stumbled onto this issue while trying out Prefect's v2 beta. Their package uses create_async_engine under the hood, but even if I modified their implementation, I don't think they would accept the strategy used by @SlavaSkvortsov. The only other suggestion I've seen is setting pgbouncer’s pool_mode to session (according to asyncpg's FAQ), but I didn't have any luck with it.

EDIT: I should add -- session mode didn't work for me because I have many long-lived clients connecting. So I depend on a pool in transaction mode.

jacksund avatar Jul 17 '22 03:07 jacksund

Is there any update on how use sqlalchemy+asyncpg+pgbouncer?

an update would be right here, so if you don't see one, nope!

I'm new to sqlalchemy's async engine and stumbled onto this issue while trying out Prefect's v2 beta. Their package uses create_async_engine under the hood, but even if I modified their implementation, I don't think they would accept the strategy used by @SlavaSkvortsov. The only other suggestion I've seen is setting pgbouncer’s pool_mode to session (according to asyncpg's FAQ), but I didn't have any luck with it.

I think we want to be able to send a name to the prepared statement using connection.prepare(name=somename) with this new parameter. then we'd need a hook for the asyncpg dialect where you send a callable that returns a name for a prepared statement that can isolate on pid or something like that.

EDIT: I should add -- session mode didn't work for me because I have many long-lived clients connecting. So I depend on a pool in transaction mode.

zzzeek avatar Jul 17 '22 14:07 zzzeek

then we'd need a hook for the asyncpg dialect where you send a callable that returns a name for a prepared statement that can isolate on pid or something like that.

Would I be able to sponser this PR or support it some way? As much as I'd want to open a PR, I wouldn't know where to start :fearful:.

jacksund avatar Jul 17 '22 14:07 jacksund

not on this end, money doesn't really create time :) @CaselIT any interest?

zzzeek avatar Jul 17 '22 14:07 zzzeek

I haven't look at this one enough, but would implementing this one https://github.com/sqlalchemy/sqlalchemy/issues/8215 help?

I mean can asyncpg be told how to name prepared statements?

CaselIT avatar Jul 17 '22 17:07 CaselIT

@CaselIT I think it would help implement the fix used above https://github.com/sqlalchemy/sqlalchemy/issues/6467#issuecomment-864943824, but I'm not sure on the distinction between connect_args={'connection_class': ...} above and async_creator in the issue you mentioned.

And yes, I think asyncpg can now be told how to name prepared statements based on https://github.com/sqlalchemy/sqlalchemy/issues/6467#issuecomment-970348808

jacksund avatar Jul 18 '22 12:07 jacksund

@CaselIT we need to support some kind of callable, or alternatively an option to do it ourselves, such that every time we call connection.prepare(), we can send in a name that disambiguates the name from other processes in some way, but this is tricky because the same pid can be re-used among new processes.

basically w/ pgbouncer, what asyncpg sees as "a new connection" is in fact a long lived connection maintained by pgbouncer.

which makes me ask, isn't there some way to just clear out the existing prepared statements on a connection ?

how about this ?

because there is basically no point in saving these prepared statements across "connects" as they are unusable.

@jacksund can you try this out? just add this event handler. does this fix?

    engine = create_async_engine(
        "postgresql+asyncpg://scott:tiger@localhost/test",
        echo=True,
    )

    @event.listens_for(engine.sync_engine, "connect")
    def clear_prepared_statements_on_connect(
        dbapi_connection, connection_record
    ):
        cursor = dbapi_connection.cursor()
        cursor.execute("DEALLOCATE ALL")
        cursor.close()

zzzeek avatar Jul 18 '22 12:07 zzzeek

i mean this could also be configured in pgbouncer itself I would think

zzzeek avatar Jul 18 '22 12:07 zzzeek

I think that duplicated names of prepared statements is just one part of problem -- another one is situation when pgbouncer is in transactional mode and different transactions on one sqalchemy session use same prepared statement. So, even I use single transaction like this:

async with engine.connect() as conn:
    async with conn.begin():
        result = await conn.execute(query)

I get an error like: "prepared statement "x" does not exist"

romsuhov avatar Jul 18 '22 13:07 romsuhov

thanks @romsuhov. I'm getting a matching result of prepared statement "x" does not exist, and once I try the fix by @zzzeek I'm back to prepared statement X already exists

jacksund avatar Jul 18 '22 13:07 jacksund

OK folks can we please confirm, that these issues are local only to pgbouncer transaction and statement mode and not "the most polite" "session" mode? because per this doc session mode resets everything automatically and there should be no issue with prepared statements.

that is, @romsuhov and @jacksund are both trying to use "transaction" mode, where everything needs to be reset per transaction rather than per connection.

If so, then simply move the above event to DEALLOCATE ALL into the begin event (edit 8/18/2022: this version needs to turn off the prepared statement cache, since we can't rely on them being present in subsequent transactions)

    engine = create_async_engine(
        "postgresql+asyncpg://scott:tiger@localhost/test",
        connect_args={"prepared_statement_cache_size": 0},
        echo=True,
    )

    @event.listens_for(engine.sync_engine, "begin")
    def clear_prepared_statements_on_begin(conn):
        conn.exec_driver_sql("DEALLOCATE ALL")

however, this seems a bit wasteful and I'd consider benching against regular session mode, transaction mode seems a little weird

zzzeek avatar Jul 18 '22 13:07 zzzeek

or just set server_reset_query_always to true in PGbouncer

zzzeek avatar Jul 18 '22 13:07 zzzeek