databases icon indicating copy to clipboard operation
databases copied to clipboard

Errors with multiple tasks and transactions in postgresql and sqlite

Open reclosedev opened this issue 6 years ago • 3 comments

Minimal reproduce script with comments

import asyncio

import databases
import sqlalchemy as sa

# import logging
# logging.basicConfig(level=logging.DEBUG)


DB_URL = "sqlite:////tmp/test.db"

metadata = sa.MetaData()
table = sa.Table(
    "t1", metadata,
    sa.Column("id", sa.Integer()),
    sa.Column("data", sa.String()),
)
database = databases.Database(DB_URL)


async def init_db():
    engine = sa.create_engine(DB_URL)
    metadata.create_all(engine)
    await database.connect()


async def work(start: int, n=5, data="a"):
    async with database.transaction():
        query = table.insert()
        for i in range(start, n):
            await database.execute(query, {"id": i, "data": data})
    print(f"Ok {start}")


async def main_async():
    await init_db()
    futures = []
    some_query = table.select()
    # Error is raised if there is query executed before creating futures
    # Doesn't matter if transaction is explicit or not
    async with database.transaction():
        # !!! If I remove this query (from parent task), it works fine
        await database.fetch_all(some_query)
    step = 5
    for i in range(0, 4 * step, step):
        fut = asyncio.ensure_future(work(i, i + step))
        futures.append(fut)
    await asyncio.wait(futures)
    print("Done")


loop = asyncio.get_event_loop()
loop.run_until_complete(main_async())

Error with sqlite database:

databases/core.py", line 305, in commit
assert self._connection._transaction_stack[-1] is self

Error with postgresql database:

  File "asyncpg/protocol/protocol.pyx", line 301, in query
  File "asyncpg/protocol/protocol.pyx", line 659, in asyncpg.protocol.protocol.BaseProtocol._check_state
asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress

reclosedev avatar Aug 09 '19 21:08 reclosedev

Looks like the root cause of this issue is how contexvars works. I've tried 3.6 aiocontextvars and 3.7 standart's library verision.

If context var is set in outer scope, then all inner tasks have the same value, so new connection is not created in new tasks.

If you change code in example to

    new_context = contextvars.Context()
    for i in range(0, 4 * step, step):
        fut = new_context.run(asyncio.ensure_future, work(i, i + 5))
        futures.append(fut)
    await asyncio.wait(futures)

then it works as expected. But I don't think that it's a good solution.

reclosedev avatar Aug 10 '19 14:08 reclosedev

Perhaps my problem is related, but I don't get any errors, in my case workers just get stuck.

import asyncio
from databases import Database


def database():
    return Database("postgresql://",
                    host="db",
                    port=5432,
                    min_size=1,
                    max_size=3,
                    user="tiger",
                    password="hunter2
                    database="test")


async def insert_something(db: Database):
    async with db.connection() as conn:
        async with db.transaction():
            await db.execute("insert into person (name) values (:name)", {"name": "testing..."})


async def query_something(db: Database, n):
    async with db.connection() as conn:
        async with db.transaction():
            row = await db.fetch_one("select :foo as foo", {"foo": "bar"})
            print(f"{n} done")


async def run():
    async with database() as db:

        # If this is commented out then the app runs successfully:
        await insert_something(db)

        tasks = []
        for n in range(10):
            tasks.append(asyncio.create_task(query_something(db, n)))

        print("Waiting...")
        await asyncio.gather(*tasks)
        print("Done")


if __name__ == "__main__":
    asyncio.run(run())

When run I get 4 or 5 worker printing done, and then the app is stuck. If I comment out the insert_someting then all the workers run and application terminates.

The size of the pool does not matter. Also I tried using db.transaction() but that does not seem to matter either.

jarppe avatar Oct 27 '19 21:10 jarppe

I have the same issue. asyncio.gather(...) two Database.iterate calls causes asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress.

ojii avatar Apr 16 '20 07:04 ojii

I'm quite certain this is the same underlying issue as #125, #424, #452, and the discussion in #456. It all comes back to how ContextVars have been used in this library, and @reclosedev was totally on the right track all the way back in 2019. I believe I have fixed this in #546 - the gist is that only one parent transaction can run concurrently per connection (but nested transactions are okay). In #546, I'm proposing that databases acquires a new connection from a connection pool for each asyncio.Task. If concurrent work needs to happen on the same connection or within the same transaction, users should pass the connection instance to those child tasks.

zevisert avatar May 26 '23 18:05 zevisert