tortoise-orm icon indicating copy to clipboard operation
tortoise-orm copied to clipboard

Parallel execution of multiple `get_or_create` calls wrapped with transaction raises `TransactionManagementError` with postgresql backend

Open droserasprout opened this issue 3 years ago • 13 comments

Describe the bug This bug appears when multiple conditions are met:

  1. DB backend is postgresql
  2. Function containing get_or_create call is wrapped with in_transaction
  3. This function is invoked multiple times in parallel using asyncio.gather

Seems like some sort of race condition.

To Reproduce

test_models.py

from tortoise.models import Model
import tortoise.fields as fields

class TestModel(Model):
    id = fields.IntField(pk=True)

test.py

import asyncio
import logging
from test_models import TestModel
from tortoise import Tortoise
from tortoise.transactions import in_transaction

logging.basicConfig()
logging.getLogger().setLevel(0)


async def test():
    async with in_transaction():
        await TestModel.get_or_create(id=1)
        await TestModel.get_or_create(id=2)


async def main():
    await Tortoise.init(
        db_url='postgres://[email protected]:13337',
        # db_url='sqlite://:memory:',
        modules={'test': ['test_models']},
    )
    async with in_transaction() as conn:
        await conn.execute_query('DROP TABLE IF EXISTS testmodel')
    await Tortoise.generate_schemas()

    await asyncio.gather(
        test(),
        test(),
        test(),
    ),

asyncio.run(main())

log

(.venv) ➜  dipdup-dex git:(feat/initial) ✗ python test.py
DEBUG:asyncio:Using selector: EpollSelector
DEBUG:tortoise:Tortoise-ORM startup
    connections: {'default': {'engine': 'tortoise.backends.asyncpg', 'credentials': {'port': 13337, 'database': None, 'host': '127.0.0.1', 'user': 'postgres', 'password': None}}}
    apps: {'test': {'models': ['test_models'], 'default_connection': 'default'}}
DEBUG:db_client:Created connection pool <asyncpg.pool.Pool object at 0x7fd6be274110> with params: {'host': '127.0.0.1', 'port': 13337, 'user': 'postgres', 'database': None, 'min_size': 1, 'max_size': 5, 'connection_class': <class 'asyncpg.connection.Connection'>, 'loop': None}
DEBUG:db_client:DROP TABLE IF EXISTS testmodel: None
DEBUG:tortoise:Creating schema: CREATE TABLE IF NOT EXISTS "testmodel" (
    "id" SERIAL NOT NULL PRIMARY KEY
);
DEBUG:db_client:CREATE TABLE IF NOT EXISTS "testmodel" (
    "id" SERIAL NOT NULL PRIMARY KEY
);
DEBUG:db_client:SELECT "id" FROM "testmodel" WHERE "id"=1 LIMIT 2: None
DEBUG:db_client:INSERT INTO "testmodel" ("id") VALUES ($1): [1]
DEBUG:db_client:SELECT "id" FROM "testmodel" WHERE "id"=1 LIMIT 2: None
DEBUG:db_client:SELECT "id" FROM "testmodel" WHERE "id"=1 LIMIT 2: None
DEBUG:db_client:SELECT "id" FROM "testmodel" WHERE "id"=2 LIMIT 2: None
DEBUG:db_client:INSERT INTO "testmodel" ("id") VALUES ($1): [1]
DEBUG:db_client:INSERT INTO "testmodel" ("id") VALUES ($1): [2]
DEBUG:db_client:INSERT INTO "testmodel" ("id") VALUES ($1): [1]
DEBUG:db_client:SELECT "id" FROM "testmodel" WHERE "id"=1 LIMIT 2: None
DEBUG:db_client:SELECT "id" FROM "testmodel" WHERE "id"=1 LIMIT 2: None
ERROR:asyncio:Resetting connection with an active transaction <asyncpg.connection.Connection object at 0x7fd6be11e9e0>
ERROR:asyncio:Resetting connection with an active transaction <asyncpg.connection.Connection object at 0x7fd6be11eac0>
Traceback (most recent call last):
  File "/home/droserasprout/git/dipdup-dex/.venv/lib/python3.9/site-packages/tortoise/models.py", line 1041, in get_or_create
    return await cls.filter(**kwargs).using_db(connection).get(), False
  File "/home/droserasprout/git/dipdup-dex/.venv/lib/python3.9/site-packages/tortoise/queryset.py", line 891, in _execute
    raise DoesNotExist("Object does not exist")
tortoise.exceptions.DoesNotExist: Object does not exist

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/droserasprout/git/dipdup-dex/.venv/lib/python3.9/site-packages/tortoise/models.py", line 1044, in get_or_create
    return await cls.create(using_db=connection, **defaults, **kwargs), True
  File "/home/droserasprout/git/dipdup-dex/.venv/lib/python3.9/site-packages/tortoise/backends/asyncpg/client.py", line 40, in translate_exceptions_
    raise IntegrityError(exc)
tortoise.exceptions.IntegrityError: duplicate key value violates unique constraint "testmodel_pkey"
DETAIL:  Key (id)=(1) already exists.

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/droserasprout/git/dipdup-dex/test.py", line 33, in <module>
    asyncio.run(main())
  File "/usr/lib/python3.9/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/lib/python3.9/asyncio/base_events.py", line 642, in run_until_complete
    return future.result()
  File "/home/droserasprout/git/dipdup-dex/test.py", line 27, in main
    await asyncio.gather(
  File "/home/droserasprout/git/dipdup-dex/test.py", line 13, in test
    await TestModel.get_or_create(id=1)
  File "/home/droserasprout/git/dipdup-dex/.venv/lib/python3.9/site-packages/tortoise/models.py", line 1046, in get_or_create
    return await cls.filter(**kwargs).using_db(connection).get(), False
  File "/home/droserasprout/git/dipdup-dex/.venv/lib/python3.9/site-packages/tortoise/backends/asyncpg/client.py", line 42, in translate_exceptions_
    raise TransactionManagementError(exc)
tortoise.exceptions.TransactionManagementError: current transaction is aborted, commands ignored until end of transaction block

Expected behavior All of 6 get_or_create calls executed correctly.

Additional context Reproducible since 0.17.1 until 0.17.4. Using latest postgres Docker image. Possibly related: https://github.com/tortoise/tortoise-orm/issues/744

droserasprout avatar Jun 14 '21 08:06 droserasprout

use using_db

long2ice avatar Jun 15 '21 01:06 long2ice

@long2ice, thanks, I should have notice there's no BEGIN/COMMIT instructions in logs at all. But is there any way to enforce using current transaction for all model calls inside the block? In our case there's always only one database connection and model calls are always wrapped with in_transaction, performing calls which bypass it must be forbidden. I guess the best way to achieve this is to set yielded connection to user defined router, right?

UPD: Hacky solution for a single DB:

@asynccontextmanager
async def in_global_transaction():
    async with in_transaction() as conn:
        original_conn = Tortoise._connections['default']
        Tortoise._connections['default'] = conn
        yield
    Tortoise._connections['default'] = original_conn

droserasprout avatar Jun 15 '21 07:06 droserasprout

Also explicit using_db didn't help, fails with probability around 50%:

test.py

import asyncio
import logging
from test_models import TestModel
from tortoise import Tortoise
from tortoise.transactions import in_transaction

logging.basicConfig()
logging.getLogger().setLevel(0)


async def test():
    async with in_transaction() as conn:
        await TestModel.get_or_create(id=1, using_db=conn)
        await TestModel.get_or_create(id=2, using_db=conn)


async def main():
    await Tortoise.init(
        db_url='postgres://[email protected]:13337',
        # db_url='sqlite://:memory:',
        modules={'test': ['test_models']},
    )
    async with in_transaction() as conn:
        await conn.execute_query('DROP TABLE IF EXISTS testmodel')
    await Tortoise.generate_schemas()

    await asyncio.gather(
        test(),
        test(),
        test(),
    ),

asyncio.run(main())

log

(.venv) ➜  dipdup-dex git:(feat/initial) ✗ python test.py
DEBUG:asyncio:Using selector: EpollSelector
DEBUG:tortoise:Tortoise-ORM startup
    connections: {'default': {'engine': 'tortoise.backends.asyncpg', 'credentials': {'port': 13337, 'database': None, 'host': '127.0.0.1', 'user': 'postgres', 'password': None}}}
    apps: {'test': {'models': ['test_models'], 'default_connection': 'default'}}
DEBUG:db_client:Created connection pool <asyncpg.pool.Pool object at 0x7fdb5f946110> with params: {'host': '127.0.0.1', 'port': 13337, 'user': 'postgres', 'database': None, 'min_size': 1, 'max_size': 5, 'connection_class': <class 'asyncpg.connection.Connection'>, 'loop': None}
DEBUG:db_client:DROP TABLE IF EXISTS testmodel: None
DEBUG:tortoise:Creating schema: CREATE TABLE IF NOT EXISTS "testmodel" (
    "id" SERIAL NOT NULL PRIMARY KEY
);
DEBUG:db_client:CREATE TABLE IF NOT EXISTS "testmodel" (
    "id" SERIAL NOT NULL PRIMARY KEY
);
DEBUG:db_client:SELECT "id" FROM "testmodel" WHERE "id"=1 LIMIT 1: None
DEBUG:db_client:INSERT INTO "testmodel" ("id") VALUES ($1): [1]
DEBUG:db_client:SELECT "id" FROM "testmodel" WHERE "id"=2 LIMIT 1: None
DEBUG:db_client:SELECT "id" FROM "testmodel" WHERE "id"=1 LIMIT 1: None
DEBUG:db_client:SELECT "id" FROM "testmodel" WHERE "id"=1 LIMIT 1: None
DEBUG:db_client:INSERT INTO "testmodel" ("id") VALUES ($1): [2]
DEBUG:db_client:INSERT INTO "testmodel" ("id") VALUES ($1): [1]
DEBUG:db_client:INSERT INTO "testmodel" ("id") VALUES ($1): [1]
DEBUG:db_client:SELECT "id" FROM "testmodel" WHERE "id"=1 LIMIT 2: None
DEBUG:db_client:SELECT "id" FROM "testmodel" WHERE "id"=1 LIMIT 2: None
ERROR:asyncio:Resetting connection with an active transaction <asyncpg.connection.Connection object at 0x7fdb5f09e040>
ERROR:asyncio:Resetting connection with an active transaction <asyncpg.connection.Connection object at 0x7fdb5f09e4a0>
Traceback (most recent call last):
  File "/home/droserasprout/git/dipdup-dex/.venv/lib/python3.9/site-packages/tortoise/backends/asyncpg/client.py", line 36, in translate_exceptions_
    return await func(self, *args)
  File "/home/droserasprout/git/dipdup-dex/.venv/lib/python3.9/site-packages/tortoise/backends/asyncpg/client.py", line 176, in execute_query
    rows = await connection.fetch(*params)
  File "/home/droserasprout/git/dipdup-dex/.venv/lib/python3.9/site-packages/asyncpg/connection.py", line 583, in fetch
    return await self._execute(
  File "/home/droserasprout/git/dipdup-dex/.venv/lib/python3.9/site-packages/asyncpg/connection.py", line 1625, in _execute
    result, _ = await self.__execute(
  File "/home/droserasprout/git/dipdup-dex/.venv/lib/python3.9/site-packages/asyncpg/connection.py", line 1650, in __execute
    return await self._do_execute(
  File "/home/droserasprout/git/dipdup-dex/.venv/lib/python3.9/site-packages/asyncpg/connection.py", line 1677, in _do_execute
    stmt = await self._get_statement(
  File "/home/droserasprout/git/dipdup-dex/.venv/lib/python3.9/site-packages/asyncpg/connection.py", line 375, in _get_statement
    statement = await self._protocol.prepare(
  File "asyncpg/protocol/protocol.pyx", line 168, in prepare
asyncpg.exceptions.InFailedSQLTransactionError: current transaction is aborted, commands ignored until end of transaction block

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/droserasprout/git/dipdup-dex/test.py", line 33, in <module>
    asyncio.run(main())
  File "/usr/lib/python3.9/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/lib/python3.9/asyncio/base_events.py", line 642, in run_until_complete
    return future.result()
  File "/home/droserasprout/git/dipdup-dex/test.py", line 27, in main
    await asyncio.gather(
  File "/home/droserasprout/git/dipdup-dex/test.py", line 13, in test
    await TestModel.get_or_create(id=1, using_db=conn)
  File "/home/droserasprout/git/dipdup-dex/.venv/lib/python3.9/site-packages/tortoise/models.py", line 1009, in get_or_create
    return await cls.get(**kwargs), False
  File "/home/droserasprout/git/dipdup-dex/.venv/lib/python3.9/site-packages/tortoise/queryset.py", line 802, in _execute
    instance_list = await self._db.executor_class(
  File "/home/droserasprout/git/dipdup-dex/.venv/lib/python3.9/site-packages/tortoise/backends/base/executor.py", line 124, in execute_select
    _, raw_results = await self.db.execute_query(query.get_sql())
  File "/home/droserasprout/git/dipdup-dex/.venv/lib/python3.9/site-packages/tortoise/backends/asyncpg/client.py", line 42, in translate_exceptions_
    raise TransactionManagementError(exc)
tortoise.exceptions.TransactionManagementError: current transaction is aborted, commands ignored until end of transaction block

droserasprout avatar Jun 15 '21 08:06 droserasprout

It's because use transaction inside get_or_create but exception in it, need some measure there

long2ice avatar Jun 15 '21 09:06 long2ice

It's because use transaction inside get_or_create but exception in it, need some measure there

I figured out the problem, at least in my case. When an Integrity Error is thrown, like duplicating a value, it runs this block:

https://github.com/tortoise/tortoise-orm/blob/e3420b00a11410be3de47bf381d36027b1278c9d/tortoise/models.py#L1043-L1046

Postgres does not let you run more commands in a transaction. The obvious solution is to exit the transaction and then try.

My database shows these errors happening:

2021-08-01 22:59:20.912 EDT [1954] ERROR:  duplicate key value violates unique constraint "uid_argument_command_b35c80"
2021-08-01 22:59:20.912 EDT [1954] DETAIL:  Key (command_id, "order")=(135, 2) already exists.
2021-08-01 22:59:20.912 EDT [1954] STATEMENT:  INSERT INTO "argument" ("name","description","order","optional","repeatable","quote_escaped","flag","literal_values","command_id") VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9) RETURNING "id"
2021-08-01 22:59:20.913 EDT [1954] ERROR:  current transaction is aborted, commands ignored until end of transaction block
2021-08-01 22:59:20.913 EDT [1954] STATEMENT:  SELECT "name","optional","literal_values","command_id","flag","description","repeatable","id","quote_escaped","order" FROM "argument" WHERE "name"='mode' AND "command_id"=135 LIMIT 2

PythonCoderAS avatar Aug 02 '21 03:08 PythonCoderAS

@PythonCoderAS, there's a race condition somewhere in the code which handles Postgres connection pool. You can add `maxsize=1' to your connection string to limit asyncpg to a single connection and thus work around this issue.

droserasprout avatar Aug 02 '21 11:08 droserasprout

@PythonCoderAS, there's a race condition somewhere in the code which handles Postgres connection pool. You can add `maxsize=1' to your connection string to limit asyncpg to a single connection and thus work around this issue.

Your statement is not totally correct. I tried enabling this setting and it did nothing. I'm very sure it has to do with the code being in the same transaction.

PythonCoderAS avatar Aug 02 '21 12:08 PythonCoderAS

I had this problem too when I was doing an asynchronous queue via tortoise like asyncio.Queue, I had to do this:

    async def _get_or_create_guild(self) -> tuple[models.Guild, bool]:
        async with self._lock:
            return await models.Guild.get_or_create(
                id=self.guild.id
            )

LEv145 avatar Oct 30 '21 18:10 LEv145

You can try adding asyncio.Lock to tortoise so that the function cannot be called multiple times at the same time

LEv145 avatar Oct 30 '21 19:10 LEv145

@long2ice Any news on this issue?

I am facing exactly the same* issue even nothing is wrapped with in_transaction.

*:

Traceback (most recent call last):
  File "/opt/venv/lib/python3.10/site-packages/tortoise/models.py", line 1056, in get_or_create
    await cls.select_for_update().filter(**kwargs).using_db(connection).get(),
  File "/opt/venv/lib/python3.10/site-packages/tortoise/queryset.py", line 1018, in _execute
    raise DoesNotExist("Object does not exist")
tortoise.exceptions.DoesNotExist: Object does not exist

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/venv/lib/python3.10/site-packages/tortoise/models.py", line 1061, in get_or_create
    return await cls.create(using_db=connection, **defaults, **kwargs), True
  File "/opt/venv/lib/python3.10/site-packages/tortoise/backends/asyncpg/client.py", line 83, in _translate_exceptions
    raise IntegrityError(exc)
tortoise.exceptions.IntegrityError: duplicate key value violates unique constraint "uid_sub_user_id_029239"
DETAIL:  Key (user_id, feed_id)=(<REDACTED>, <REDACTED>) already exists.

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/app/src/command/inner/sub.py", line 93, in sub
    _sub, created_new_sub = await db.Sub.get_or_create(user_id=user_id, feed=feed,
  File "/opt/venv/lib/python3.10/site-packages/tortoise/models.py", line 1063, in get_or_create
    return await cls.filter(**kwargs).using_db(connection).get(), False
  File "/opt/venv/lib/python3.10/site-packages/tortoise/backends/asyncpg/client.py", line 85, in _translate_exceptions
    raise TransactionManagementError(exc)
tortoise.exceptions.TransactionManagementError: current transaction is aborted, commands ignored until end of transaction block

Rongronggg9 avatar Apr 09 '22 22:04 Rongronggg9

Without in_transaction, the issue can still be reproduced easily:

import asyncio
import logging
from test_models import TestModel
from tortoise import Tortoise
from tortoise.transactions import in_transaction

logging.basicConfig()
logging.getLogger().setLevel(0)


async def test():
-    async with in_transaction():
-        await TestModel.get_or_create(id=1)
-        await TestModel.get_or_create(id=2)
+    await TestModel.get_or_create(id=1)
+    await TestModel.get_or_create(id=2)


async def main():
    await Tortoise.init(
        db_url='postgres://[email protected]:13337',
        # db_url='sqlite://:memory:',
        modules={'test': ['test_models']},
    )
    async with in_transaction() as conn:
        await conn.execute_query('DROP TABLE IF EXISTS testmodel')
    await Tortoise.generate_schemas()

-    await asyncio.gather(
-        test(),
-        test(),
-        test(),
-    ),
+    await asyncio.gather(*(test() for _ in range(16)))

asyncio.run(main())

Rongronggg9 avatar Apr 09 '22 22:04 Rongronggg9

Still reproducible on 0.19.1, both with and without in_transaction.

droserasprout avatar May 25 '22 06:05 droserasprout

Related https://github.com/tortoise/tortoise-orm/pull/1404

droserasprout avatar Nov 22 '23 16:11 droserasprout