tortoise-orm
tortoise-orm copied to clipboard
Parallel execution of multiple `get_or_create` calls wrapped with transaction raises `TransactionManagementError` with postgresql backend
Describe the bug This bug appears when multiple conditions are met:
- DB backend is postgresql
- Function containing
get_or_create
call is wrapped within_transaction
- This function is invoked multiple times in parallel using
asyncio.gather
Seems like some sort of race condition.
To Reproduce
test_models.py
from tortoise.models import Model
import tortoise.fields as fields
class TestModel(Model):
id = fields.IntField(pk=True)
test.py
import asyncio
import logging
from test_models import TestModel
from tortoise import Tortoise
from tortoise.transactions import in_transaction
logging.basicConfig()
logging.getLogger().setLevel(0)
async def test():
async with in_transaction():
await TestModel.get_or_create(id=1)
await TestModel.get_or_create(id=2)
async def main():
await Tortoise.init(
db_url='postgres://[email protected]:13337',
# db_url='sqlite://:memory:',
modules={'test': ['test_models']},
)
async with in_transaction() as conn:
await conn.execute_query('DROP TABLE IF EXISTS testmodel')
await Tortoise.generate_schemas()
await asyncio.gather(
test(),
test(),
test(),
),
asyncio.run(main())
log
(.venv) ➜ dipdup-dex git:(feat/initial) ✗ python test.py
DEBUG:asyncio:Using selector: EpollSelector
DEBUG:tortoise:Tortoise-ORM startup
connections: {'default': {'engine': 'tortoise.backends.asyncpg', 'credentials': {'port': 13337, 'database': None, 'host': '127.0.0.1', 'user': 'postgres', 'password': None}}}
apps: {'test': {'models': ['test_models'], 'default_connection': 'default'}}
DEBUG:db_client:Created connection pool <asyncpg.pool.Pool object at 0x7fd6be274110> with params: {'host': '127.0.0.1', 'port': 13337, 'user': 'postgres', 'database': None, 'min_size': 1, 'max_size': 5, 'connection_class': <class 'asyncpg.connection.Connection'>, 'loop': None}
DEBUG:db_client:DROP TABLE IF EXISTS testmodel: None
DEBUG:tortoise:Creating schema: CREATE TABLE IF NOT EXISTS "testmodel" (
"id" SERIAL NOT NULL PRIMARY KEY
);
DEBUG:db_client:CREATE TABLE IF NOT EXISTS "testmodel" (
"id" SERIAL NOT NULL PRIMARY KEY
);
DEBUG:db_client:SELECT "id" FROM "testmodel" WHERE "id"=1 LIMIT 2: None
DEBUG:db_client:INSERT INTO "testmodel" ("id") VALUES ($1): [1]
DEBUG:db_client:SELECT "id" FROM "testmodel" WHERE "id"=1 LIMIT 2: None
DEBUG:db_client:SELECT "id" FROM "testmodel" WHERE "id"=1 LIMIT 2: None
DEBUG:db_client:SELECT "id" FROM "testmodel" WHERE "id"=2 LIMIT 2: None
DEBUG:db_client:INSERT INTO "testmodel" ("id") VALUES ($1): [1]
DEBUG:db_client:INSERT INTO "testmodel" ("id") VALUES ($1): [2]
DEBUG:db_client:INSERT INTO "testmodel" ("id") VALUES ($1): [1]
DEBUG:db_client:SELECT "id" FROM "testmodel" WHERE "id"=1 LIMIT 2: None
DEBUG:db_client:SELECT "id" FROM "testmodel" WHERE "id"=1 LIMIT 2: None
ERROR:asyncio:Resetting connection with an active transaction <asyncpg.connection.Connection object at 0x7fd6be11e9e0>
ERROR:asyncio:Resetting connection with an active transaction <asyncpg.connection.Connection object at 0x7fd6be11eac0>
Traceback (most recent call last):
File "/home/droserasprout/git/dipdup-dex/.venv/lib/python3.9/site-packages/tortoise/models.py", line 1041, in get_or_create
return await cls.filter(**kwargs).using_db(connection).get(), False
File "/home/droserasprout/git/dipdup-dex/.venv/lib/python3.9/site-packages/tortoise/queryset.py", line 891, in _execute
raise DoesNotExist("Object does not exist")
tortoise.exceptions.DoesNotExist: Object does not exist
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/droserasprout/git/dipdup-dex/.venv/lib/python3.9/site-packages/tortoise/models.py", line 1044, in get_or_create
return await cls.create(using_db=connection, **defaults, **kwargs), True
File "/home/droserasprout/git/dipdup-dex/.venv/lib/python3.9/site-packages/tortoise/backends/asyncpg/client.py", line 40, in translate_exceptions_
raise IntegrityError(exc)
tortoise.exceptions.IntegrityError: duplicate key value violates unique constraint "testmodel_pkey"
DETAIL: Key (id)=(1) already exists.
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/droserasprout/git/dipdup-dex/test.py", line 33, in <module>
asyncio.run(main())
File "/usr/lib/python3.9/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/usr/lib/python3.9/asyncio/base_events.py", line 642, in run_until_complete
return future.result()
File "/home/droserasprout/git/dipdup-dex/test.py", line 27, in main
await asyncio.gather(
File "/home/droserasprout/git/dipdup-dex/test.py", line 13, in test
await TestModel.get_or_create(id=1)
File "/home/droserasprout/git/dipdup-dex/.venv/lib/python3.9/site-packages/tortoise/models.py", line 1046, in get_or_create
return await cls.filter(**kwargs).using_db(connection).get(), False
File "/home/droserasprout/git/dipdup-dex/.venv/lib/python3.9/site-packages/tortoise/backends/asyncpg/client.py", line 42, in translate_exceptions_
raise TransactionManagementError(exc)
tortoise.exceptions.TransactionManagementError: current transaction is aborted, commands ignored until end of transaction block
Expected behavior
All of 6 get_or_create
calls executed correctly.
Additional context
Reproducible since 0.17.1
until 0.17.4
. Using latest postgres
Docker image. Possibly related: https://github.com/tortoise/tortoise-orm/issues/744
use using_db
@long2ice, thanks, I should have notice there's no BEGIN/COMMIT instructions in logs at all. But is there any way to enforce using current transaction for all model calls inside the block? In our case there's always only one database connection and model calls are always wrapped with in_transaction
, performing calls which bypass it must be forbidden. I guess the best way to achieve this is to set yielded connection to user defined router, right?
UPD: Hacky solution for a single DB:
@asynccontextmanager
async def in_global_transaction():
async with in_transaction() as conn:
original_conn = Tortoise._connections['default']
Tortoise._connections['default'] = conn
yield
Tortoise._connections['default'] = original_conn
Also explicit using_db
didn't help, fails with probability around 50%:
test.py
import asyncio
import logging
from test_models import TestModel
from tortoise import Tortoise
from tortoise.transactions import in_transaction
logging.basicConfig()
logging.getLogger().setLevel(0)
async def test():
async with in_transaction() as conn:
await TestModel.get_or_create(id=1, using_db=conn)
await TestModel.get_or_create(id=2, using_db=conn)
async def main():
await Tortoise.init(
db_url='postgres://[email protected]:13337',
# db_url='sqlite://:memory:',
modules={'test': ['test_models']},
)
async with in_transaction() as conn:
await conn.execute_query('DROP TABLE IF EXISTS testmodel')
await Tortoise.generate_schemas()
await asyncio.gather(
test(),
test(),
test(),
),
asyncio.run(main())
log
(.venv) ➜ dipdup-dex git:(feat/initial) ✗ python test.py
DEBUG:asyncio:Using selector: EpollSelector
DEBUG:tortoise:Tortoise-ORM startup
connections: {'default': {'engine': 'tortoise.backends.asyncpg', 'credentials': {'port': 13337, 'database': None, 'host': '127.0.0.1', 'user': 'postgres', 'password': None}}}
apps: {'test': {'models': ['test_models'], 'default_connection': 'default'}}
DEBUG:db_client:Created connection pool <asyncpg.pool.Pool object at 0x7fdb5f946110> with params: {'host': '127.0.0.1', 'port': 13337, 'user': 'postgres', 'database': None, 'min_size': 1, 'max_size': 5, 'connection_class': <class 'asyncpg.connection.Connection'>, 'loop': None}
DEBUG:db_client:DROP TABLE IF EXISTS testmodel: None
DEBUG:tortoise:Creating schema: CREATE TABLE IF NOT EXISTS "testmodel" (
"id" SERIAL NOT NULL PRIMARY KEY
);
DEBUG:db_client:CREATE TABLE IF NOT EXISTS "testmodel" (
"id" SERIAL NOT NULL PRIMARY KEY
);
DEBUG:db_client:SELECT "id" FROM "testmodel" WHERE "id"=1 LIMIT 1: None
DEBUG:db_client:INSERT INTO "testmodel" ("id") VALUES ($1): [1]
DEBUG:db_client:SELECT "id" FROM "testmodel" WHERE "id"=2 LIMIT 1: None
DEBUG:db_client:SELECT "id" FROM "testmodel" WHERE "id"=1 LIMIT 1: None
DEBUG:db_client:SELECT "id" FROM "testmodel" WHERE "id"=1 LIMIT 1: None
DEBUG:db_client:INSERT INTO "testmodel" ("id") VALUES ($1): [2]
DEBUG:db_client:INSERT INTO "testmodel" ("id") VALUES ($1): [1]
DEBUG:db_client:INSERT INTO "testmodel" ("id") VALUES ($1): [1]
DEBUG:db_client:SELECT "id" FROM "testmodel" WHERE "id"=1 LIMIT 2: None
DEBUG:db_client:SELECT "id" FROM "testmodel" WHERE "id"=1 LIMIT 2: None
ERROR:asyncio:Resetting connection with an active transaction <asyncpg.connection.Connection object at 0x7fdb5f09e040>
ERROR:asyncio:Resetting connection with an active transaction <asyncpg.connection.Connection object at 0x7fdb5f09e4a0>
Traceback (most recent call last):
File "/home/droserasprout/git/dipdup-dex/.venv/lib/python3.9/site-packages/tortoise/backends/asyncpg/client.py", line 36, in translate_exceptions_
return await func(self, *args)
File "/home/droserasprout/git/dipdup-dex/.venv/lib/python3.9/site-packages/tortoise/backends/asyncpg/client.py", line 176, in execute_query
rows = await connection.fetch(*params)
File "/home/droserasprout/git/dipdup-dex/.venv/lib/python3.9/site-packages/asyncpg/connection.py", line 583, in fetch
return await self._execute(
File "/home/droserasprout/git/dipdup-dex/.venv/lib/python3.9/site-packages/asyncpg/connection.py", line 1625, in _execute
result, _ = await self.__execute(
File "/home/droserasprout/git/dipdup-dex/.venv/lib/python3.9/site-packages/asyncpg/connection.py", line 1650, in __execute
return await self._do_execute(
File "/home/droserasprout/git/dipdup-dex/.venv/lib/python3.9/site-packages/asyncpg/connection.py", line 1677, in _do_execute
stmt = await self._get_statement(
File "/home/droserasprout/git/dipdup-dex/.venv/lib/python3.9/site-packages/asyncpg/connection.py", line 375, in _get_statement
statement = await self._protocol.prepare(
File "asyncpg/protocol/protocol.pyx", line 168, in prepare
asyncpg.exceptions.InFailedSQLTransactionError: current transaction is aborted, commands ignored until end of transaction block
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/droserasprout/git/dipdup-dex/test.py", line 33, in <module>
asyncio.run(main())
File "/usr/lib/python3.9/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/usr/lib/python3.9/asyncio/base_events.py", line 642, in run_until_complete
return future.result()
File "/home/droserasprout/git/dipdup-dex/test.py", line 27, in main
await asyncio.gather(
File "/home/droserasprout/git/dipdup-dex/test.py", line 13, in test
await TestModel.get_or_create(id=1, using_db=conn)
File "/home/droserasprout/git/dipdup-dex/.venv/lib/python3.9/site-packages/tortoise/models.py", line 1009, in get_or_create
return await cls.get(**kwargs), False
File "/home/droserasprout/git/dipdup-dex/.venv/lib/python3.9/site-packages/tortoise/queryset.py", line 802, in _execute
instance_list = await self._db.executor_class(
File "/home/droserasprout/git/dipdup-dex/.venv/lib/python3.9/site-packages/tortoise/backends/base/executor.py", line 124, in execute_select
_, raw_results = await self.db.execute_query(query.get_sql())
File "/home/droserasprout/git/dipdup-dex/.venv/lib/python3.9/site-packages/tortoise/backends/asyncpg/client.py", line 42, in translate_exceptions_
raise TransactionManagementError(exc)
tortoise.exceptions.TransactionManagementError: current transaction is aborted, commands ignored until end of transaction block
It's because use transaction inside get_or_create but exception in it, need some measure there
It's because use transaction inside get_or_create but exception in it, need some measure there
I figured out the problem, at least in my case. When an Integrity Error is thrown, like duplicating a value, it runs this block:
https://github.com/tortoise/tortoise-orm/blob/e3420b00a11410be3de47bf381d36027b1278c9d/tortoise/models.py#L1043-L1046
Postgres does not let you run more commands in a transaction. The obvious solution is to exit the transaction and then try.
My database shows these errors happening:
2021-08-01 22:59:20.912 EDT [1954] ERROR: duplicate key value violates unique constraint "uid_argument_command_b35c80"
2021-08-01 22:59:20.912 EDT [1954] DETAIL: Key (command_id, "order")=(135, 2) already exists.
2021-08-01 22:59:20.912 EDT [1954] STATEMENT: INSERT INTO "argument" ("name","description","order","optional","repeatable","quote_escaped","flag","literal_values","command_id") VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9) RETURNING "id"
2021-08-01 22:59:20.913 EDT [1954] ERROR: current transaction is aborted, commands ignored until end of transaction block
2021-08-01 22:59:20.913 EDT [1954] STATEMENT: SELECT "name","optional","literal_values","command_id","flag","description","repeatable","id","quote_escaped","order" FROM "argument" WHERE "name"='mode' AND "command_id"=135 LIMIT 2
@PythonCoderAS, there's a race condition somewhere in the code which handles Postgres connection pool. You can add `maxsize=1' to your connection string to limit asyncpg to a single connection and thus work around this issue.
@PythonCoderAS, there's a race condition somewhere in the code which handles Postgres connection pool. You can add `maxsize=1' to your connection string to limit asyncpg to a single connection and thus work around this issue.
Your statement is not totally correct. I tried enabling this setting and it did nothing. I'm very sure it has to do with the code being in the same transaction.
I had this problem too when I was doing an asynchronous queue via tortoise
like asyncio.Queue
, I had to do this:
async def _get_or_create_guild(self) -> tuple[models.Guild, bool]:
async with self._lock:
return await models.Guild.get_or_create(
id=self.guild.id
)
You can try adding asyncio.Lock
to tortoise so that the function cannot be called multiple times at the same time
@long2ice Any news on this issue?
I am facing exactly the same* issue even nothing is wrapped with in_transaction
.
*:
Traceback (most recent call last):
File "/opt/venv/lib/python3.10/site-packages/tortoise/models.py", line 1056, in get_or_create
await cls.select_for_update().filter(**kwargs).using_db(connection).get(),
File "/opt/venv/lib/python3.10/site-packages/tortoise/queryset.py", line 1018, in _execute
raise DoesNotExist("Object does not exist")
tortoise.exceptions.DoesNotExist: Object does not exist
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/venv/lib/python3.10/site-packages/tortoise/models.py", line 1061, in get_or_create
return await cls.create(using_db=connection, **defaults, **kwargs), True
File "/opt/venv/lib/python3.10/site-packages/tortoise/backends/asyncpg/client.py", line 83, in _translate_exceptions
raise IntegrityError(exc)
tortoise.exceptions.IntegrityError: duplicate key value violates unique constraint "uid_sub_user_id_029239"
DETAIL: Key (user_id, feed_id)=(<REDACTED>, <REDACTED>) already exists.
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/app/src/command/inner/sub.py", line 93, in sub
_sub, created_new_sub = await db.Sub.get_or_create(user_id=user_id, feed=feed,
File "/opt/venv/lib/python3.10/site-packages/tortoise/models.py", line 1063, in get_or_create
return await cls.filter(**kwargs).using_db(connection).get(), False
File "/opt/venv/lib/python3.10/site-packages/tortoise/backends/asyncpg/client.py", line 85, in _translate_exceptions
raise TransactionManagementError(exc)
tortoise.exceptions.TransactionManagementError: current transaction is aborted, commands ignored until end of transaction block
Without in_transaction
, the issue can still be reproduced easily:
import asyncio
import logging
from test_models import TestModel
from tortoise import Tortoise
from tortoise.transactions import in_transaction
logging.basicConfig()
logging.getLogger().setLevel(0)
async def test():
- async with in_transaction():
- await TestModel.get_or_create(id=1)
- await TestModel.get_or_create(id=2)
+ await TestModel.get_or_create(id=1)
+ await TestModel.get_or_create(id=2)
async def main():
await Tortoise.init(
db_url='postgres://[email protected]:13337',
# db_url='sqlite://:memory:',
modules={'test': ['test_models']},
)
async with in_transaction() as conn:
await conn.execute_query('DROP TABLE IF EXISTS testmodel')
await Tortoise.generate_schemas()
- await asyncio.gather(
- test(),
- test(),
- test(),
- ),
+ await asyncio.gather(*(test() for _ in range(16)))
asyncio.run(main())
Still reproducible on 0.19.1
, both with and without in_transaction
.
Related https://github.com/tortoise/tortoise-orm/pull/1404