sqlalchemy icon indicating copy to clipboard operation
sqlalchemy copied to clipboard

Document `AsyncAdaptedQueue` in relation to `create_async_engine`

Open dybi opened this issue 3 years ago • 3 comments

Describe the use case

AsyncAdaptedQueue is not documented - neither in Available Pool Implementations nor in Async Engine API.

What is more, even in the source code docstrings there is no information about it.

The lack of thereof has lead to reproduction of Main thread is blocked on concurrent async operations bug in our code.

Databases / Backends / Drivers targeted

All.

Example Use

SIMPLIFIED

def get_engine_kwargs():
    # AsyncAdaptedQueuePool shoul be used, but docs don't mention it
    return { 'poolclass': QueuePool}

engine = create_async_engine(DATABASE_URL, echo=True, **get_engine_kwargs())
async_session = sessionmaker(
    engine, class_=AsyncSession, expire_on_commit=False, future=True,
)

async def query_in_session(param):
     session = async_session()
     try:
          results = await session.scalars(select(param))
          return results.unique().one()
     finally:
         await session.close()

async def main():
   return await asyncio.gather(query_in_session(1), query_in_session(2))

asyncio.run(main())

Additional context

In the project we are introducing async SQLAlchemy usage - we had some helper classes that were used to build underlying engine and sessions. We were reusing the existing code, particularly a method that was supplying engine kwargs to create_engine(). The same helper method was used to supply kwargs for create_async_engine() resulting in QueuePool being passed as pool_class for async engine, which led to above described bug reproduction.

dybi avatar Nov 08 '22 07:11 dybi

Hi,

Thanks for reporting. I was convinced that there was a check to prevent this, but apparently there isn't.

Also looking at queue pool it seems to be using a threading lock even in asyncio. It should not cause large issues since it's only used to inclement / decrement an integer but it should probably use an async lock instead

@zzzeek why don't we just deprecate the AsyncAdaptedQueue and just make the normal queue pool use the appropriate implementation based on the dialect? (Or we just make on a alias of the normal QueuePool). IIRC at the beginning there was no indication that a dialect was async or not, but it's now requested by the create async engine, so queue pool should be able to rely on that

CaselIT avatar Nov 08 '22 08:11 CaselIT

im not sure QueuePool would make that easy or not. I'm not around much today so while I would initially lean towards just documenting AsyncAdaptedQueuePool , if QueuePool can do this as a one liner, then maybe. I haven't looked lately

zzzeek avatar Nov 08 '22 13:11 zzzeek

I'm not sure if dialect is always required for the pool. I'll check

CaselIT avatar Nov 08 '22 13:11 CaselIT

I'm grateful that this issue is still open because it provides the solution for the issue we had.

In SQLAlchemy 2.0.23, we were using QueuePool inside a FastAPI application. Using QueuePool with high traffic volume was casually locking the event loop. After doing a few tests with Locust, I noticed that it was reproducible and happened each time the queue pool size + max_overflow was on limit. If it was locking that meant that somewhere in our configuration, the code was not entirely async - namely self._pool.get(wait, self._timeout) - which blocks the event loop.

After digging, I found this thread, replaced the QueuePool with AsyncAdaptedQueuePool and... ok, no further problems!

At the least by now, if we are using Asyncio, it's probably better to use the AsyncAdaptedQueuePool. Could this be added in the documentation? Thanks in advance

lricardo avatar Dec 05 '23 12:12 lricardo

strange that nothing is raising an error in this case. I though there was a check that did that.

CaselIT avatar Dec 05 '23 12:12 CaselIT

I got the typical "sqlalchemy.exc.TimeoutError: QueuePool limit of size 20 overflow 10 reached," but after a time where the application seems to be deadlocked for a while (due to the event loop being blocked). This means, for instance, that other tasks running concurrently lock entirely (example: our prometheus exporter stops responding under /metrics endpoint).

AsyncAdaptedQueuePool deals ok with it, not blocking and for a reason: AsyncAdaptedQueue uses internally the asyncio.Queue which provides methods for dealing with async / await code (not blocking).

If you need some sort of example project I can provide one.

lricardo avatar Dec 05 '23 13:12 lricardo

The same issue reported by @lricardo took us several days to resolve due to the lack of docs. I'm sure others would benefit from this being shown in a couple examples.

rmorshea avatar Feb 14 '24 21:02 rmorshea

is the issue that you saw the docs at https://docs.sqlalchemy.org/en/14/core/pooling.html#switching-pool-implementations and applied AsyncAdaptedQueuePool to asyncengine? we should document that but also make it so create_async_engine should raise an error if QueuePool is passed

zzzeek avatar Feb 14 '24 21:02 zzzeek

We were explicitly passing poolclass=QueuePool to create_async_engine so an error there would have saved us a lot of time.

rmorshea avatar Feb 15 '24 00:02 rmorshea

Mike Bayer has proposed a fix for this issue in the main branch:

raise for asyncio-incompatible pool classes https://gerrit.sqlalchemy.org/c/sqlalchemy/sqlalchemy/+/5168

sqla-tester avatar Feb 15 '24 02:02 sqla-tester

Mike Bayer has proposed a fix for this issue in the rel_2_0 branch:

raise for asyncio-incompatible pool classes https://gerrit.sqlalchemy.org/c/sqlalchemy/sqlalchemy/+/5169

sqla-tester avatar Feb 15 '24 02:02 sqla-tester