query_lock() in iterate() prohibits any other database operations within `async for` loop
#108 introduced query locking to prohibit situation when multiple queries are executed at same time, however logic within iterate() is also is also wrapped with such logic, making code like such impossible due to deadlock:
async for row in database.iterate("SELECT * FROM table"):
await database.execute("UPDATE table SET ... WHERE ...")
Workout I've found for the time being is to use custom batching select utility instead:
async for row in batching_fetch_all("SELECT * FROM table", batch=50):
...
Interesting @rafalp! What a DB back-end is it? What do you use under the hood, is it still the transactional cursor?
It's postgresql, and I am using two setups for DB connection:
- Test runner it's
database = Database(database_url, force_rollback=True). I have pytest fixture that wraps each test inasync with database:. - Elsewhere its
database = Database(database_url), but in CLI tasks I am usingdatabaseas context manager, while in the Starlette app I am using lifecycle events.
Just to capture the thoughts from the https://gitter.im/encode/community.
query_lock() was introduced to prevent concurrent operations within the one connection (which is not generally allowed by the DB engines), eg. run with asyncio.gather.
Possible ways to overcome the issue:
- allow relaxing the lock, eg
async with db.connection(lock=False) as con:
…
tho, it removes the guarantees and the developer should be responsible and accurate in actions
- run it in the idle (diff) connections or wait for them to become available
Hey!
This issue is also affecting me and it's rather annoying and limiting since I can't really fetch_all in my case. Any ETA or at least any consensus on what to do about this?
The locks exist per connection, so unless you have force_rollback=True every call to database creates a separate Connection in each coroutine. Your example apparently runs both calls in the same coroutine and you deadlock. Here is how you can avoid this:
queue = asyncio.Queue()
async def producer():
async with database.connection() as conn:
print("Producer's connection:", conn)
async for row in conn.iterate("SELECT * FROM table"):
await queue.put(row)
await queue.put(None)
async def consumer():
async with database.connection() as conn:
print("Consumer's connection:", conn)
while True:
row = await queue.get()
if row is None:
break
await conn.execute("UPDATE table SET ... WHERE ...")
for r in await asyncio.gather(producer(), consumer(), return_exceptions=True):
if isinstance(r, Exception):
raise r from None
Ojo! If the connections are printed the same, you need to ensure that you have not worked with that database in the parent coroutine.
Additional docs: https://asyncio.readthedocs.io/en/latest/producer_consumer.html
There is another way with subclassing:
class ParallelDatabase(databases.Database):
"""Override connection() to ignore the task context and spawn a new Connection every time."""
def connection(self) -> "databases.core.Connection":
"""Bypass self._connection_context."""
return databases.core.Connection(self._backend)
However, you will have to forget about executing on a database directly as it will open a new connection every time. Use async with db.connection() everywhere.
The locks exist per connection, so unless you have
force_rollback=Trueevery call todatabasecreates a separateConnectionin each coroutine.
Couldn't I just launch a task then? Like asyncio.create_task(database.execute("..."). It will be in another coroutine.
Depending on the surrounding code, yes or no. For example, this will not work:
async for row in database.iterate("SELECT * FROM table"):
await asyncio.create_task(database.execute("UPDATE table SET ... WHERE ..."))
because the spawned coroutine will inherit a copy of the parent's context.
Oh I see, that makes sense. Never worked with contextvars.
Another thing I don't quite get is why Database.connection() doesn't get a new connection from the pool each time. It gets a Connection which more like a pool actually, which then has the _connection field which directly corresponds to the backend driver pool. Why is that?
That teased my brain, too. This is how it's currently implemented: https://github.com/encode/databases/blob/master/databases/core.py#L176
def connection(self) -> "Connection":
if self._global_connection is not None:
return self._global_connection
try:
return self._connection_context.get()
except LookupError:
connection = Connection(self._backend)
self._connection_context.set(connection)
return connection
Let's ignore _global_connection, it is used when force_rollback=True.
It tries to load the Connection which is bound to the executing coroutine's context. If there is none, it creates a new Connection and assigns it to the context. Next time we invoke Database.connection() the lookup will succeed and we will not create a new Connection.
I guess they wrote code this way to avoid the overhead of creating a new Connection each time people execute queries directly on a Database. While it does make the client code simpler, it leads to such hidden landmines as this issue.
I hit this issue in my prod many times. I had to fight with the library to make my queries run in parallel. I gave up and applied https://github.com/encode/databases/issues/176#issuecomment-635245308
@vmarkovtsev AFAIR iterate() sets transaction on top of the lock, forcing connection reuse within the current async context.
As any other query method of Database. It just happens that iterate() allows calling another method in parallel easier. The problem is deeper. For example, this is a real production case of poor performance due to serialized DB queries:
db = Database(...)
await db.connect()
db.execute("SELECT 1") # any preceeding query to initialize the context var
async def serialized_query():
# these guys will go serialized because the context is inherited
await db.execute(...)
await asyncio.gather(serialized_query(), serialized_query(), return_exceptions=True)
Hence https://github.com/encode/databases/issues/176#issuecomment-635245308
I guess some kind of alternative solution might be using cursor from raw_connection (at least in asyncpg)?
async with database.connection() as connection:
async with connection.transaction():
async for row in connection.raw_connection.cursor(str_query):
await database.fetch_all("SELECT based on row") # or anything other with database really
Are there any flaws with this that I'm not seeing?
I'm running into this problem. An additional complication for me is that I need everything to be done within a single "repeatable_read" transaction. So what I desire is something like
from sqlalchemy import select
async def generate_results(db):
async with db.transaction(isolation="repeatable_read"):
async for some_row in db.iterate(select(...)):
some_other_rows = await db.fetch_all(select(...))
yield "some result based on both some_row and some_other_rows"
So a fetch_all within an async for ... db.iterate(...), all within a single "repeatable_read" transaction.
My use case is entirely read-only.
Is there any hope for me?
I looked at the suggestion from @vmarkovtsev in https://github.com/encode/databases/issues/176#issuecomment-635152442, with two coroutines communicating via a queue, each opening a separate connection. But I think with that approach I would not be able to put it all in a single "repeatable_read" transaction?
same here :(