databases
databases copied to clipboard
Errors with multiple tasks and transactions in postgresql and sqlite
Minimal reproduce script with comments
import asyncio
import databases
import sqlalchemy as sa
# import logging
# logging.basicConfig(level=logging.DEBUG)
DB_URL = "sqlite:////tmp/test.db"
metadata = sa.MetaData()
table = sa.Table(
"t1", metadata,
sa.Column("id", sa.Integer()),
sa.Column("data", sa.String()),
)
database = databases.Database(DB_URL)
async def init_db():
engine = sa.create_engine(DB_URL)
metadata.create_all(engine)
await database.connect()
async def work(start: int, n=5, data="a"):
async with database.transaction():
query = table.insert()
for i in range(start, n):
await database.execute(query, {"id": i, "data": data})
print(f"Ok {start}")
async def main_async():
await init_db()
futures = []
some_query = table.select()
# Error is raised if there is query executed before creating futures
# Doesn't matter if transaction is explicit or not
async with database.transaction():
# !!! If I remove this query (from parent task), it works fine
await database.fetch_all(some_query)
step = 5
for i in range(0, 4 * step, step):
fut = asyncio.ensure_future(work(i, i + step))
futures.append(fut)
await asyncio.wait(futures)
print("Done")
loop = asyncio.get_event_loop()
loop.run_until_complete(main_async())
Error with sqlite database:
databases/core.py", line 305, in commit
assert self._connection._transaction_stack[-1] is self
Error with postgresql database:
File "asyncpg/protocol/protocol.pyx", line 301, in query
File "asyncpg/protocol/protocol.pyx", line 659, in asyncpg.protocol.protocol.BaseProtocol._check_state
asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress
Looks like the root cause of this issue is how contexvars works. I've tried 3.6 aiocontextvars and 3.7 standart's library verision.
If context var is set in outer scope, then all inner tasks have the same value, so new connection is not created in new tasks.
If you change code in example to
new_context = contextvars.Context()
for i in range(0, 4 * step, step):
fut = new_context.run(asyncio.ensure_future, work(i, i + 5))
futures.append(fut)
await asyncio.wait(futures)
then it works as expected. But I don't think that it's a good solution.
Perhaps my problem is related, but I don't get any errors, in my case workers just get stuck.
import asyncio
from databases import Database
def database():
return Database("postgresql://",
host="db",
port=5432,
min_size=1,
max_size=3,
user="tiger",
password="hunter2
database="test")
async def insert_something(db: Database):
async with db.connection() as conn:
async with db.transaction():
await db.execute("insert into person (name) values (:name)", {"name": "testing..."})
async def query_something(db: Database, n):
async with db.connection() as conn:
async with db.transaction():
row = await db.fetch_one("select :foo as foo", {"foo": "bar"})
print(f"{n} done")
async def run():
async with database() as db:
# If this is commented out then the app runs successfully:
await insert_something(db)
tasks = []
for n in range(10):
tasks.append(asyncio.create_task(query_something(db, n)))
print("Waiting...")
await asyncio.gather(*tasks)
print("Done")
if __name__ == "__main__":
asyncio.run(run())
When run I get 4 or 5 worker printing done, and then the app is stuck. If I comment out the insert_someting then all the workers run and application terminates.
The size of the pool does not matter. Also I tried using db.transaction() but that does not seem to matter either.
I have the same issue. asyncio.gather(...) two Database.iterate calls causes asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress.
I'm quite certain this is the same underlying issue as #125, #424, #452, and the discussion in #456. It all comes back to how ContextVars have been used in this library, and @reclosedev was totally on the right track all the way back in 2019. I believe I have fixed this in #546 - the gist is that only one parent transaction can run concurrently per connection (but nested transactions are okay). In #546, I'm proposing that databases acquires a new connection from a connection pool for each asyncio.Task. If concurrent work needs to happen on the same connection or within the same transaction, users should pass the connection instance to those child tasks.