databases
databases copied to clipboard
Clarification on transaction isolation and management
Consider the following simulation of concurrent access:
# pylint: skip-file
import asyncio
import os
from databases import Database
async def tx1(db):
async with db.transaction():
await db.execute("INSERT INTO foo VALUES (1)")
await asyncio.sleep(1.5)
async def tx2(db):
async with db.transaction():
await asyncio.sleep(0.5)
result = await db.execute("SELECT * FROM foo")
assert result is None, result
await asyncio.sleep(1)
async def main():
db = Database("postgresql://rdbms:rdbms@localhost")
await db.connect()
await db.execute("CREATE TABLE IF NOT EXISTS foo (bar int4)")
await db.execute("TRUNCATE foo CASCADE")
await asyncio.gather(
tx1(db.connection()),
tx2(db.connection())
)
if __name__ == '__main__':
asyncio.run(main())
This code should exit succesfully, but either fails with cannot perform operation: another operation is in progress
(which is also weird because a new connection is requested) or at the assert
statement. Please provide some clarification regarding the expected transactional behavior and isolation of this module.
I don't have that much knowledge of connection internals, but as I understand the two calls of db.connection()
return the same connection object.
To get different connections maybe we can do this instead of passing the connection?
async with db.transaction():
...
Now each transaction will have different connection.
I'm very much interested in the same question.
While digging I found the 2 possible solutions to the problem in the script provided by @cochiseruhulessin
use a new connection with: db._new_connection()
script
# pylint: skip-file
import asyncio
import os
from databases import Database
async def tx1(db):
async with db.transaction():
await db.execute("INSERT INTO foo VALUES (1)")
await asyncio.sleep(1.5)
async def tx2(db):
async with db.transaction():
await asyncio.sleep(0.5)
result = await db.execute("SELECT * FROM foo")
assert result is None, result
await asyncio.sleep(1)
async def main():
db = Database("postgresql://postgres:postgres@localhost/portal")
await db.connect()
await db.execute("CREATE TABLE IF NOT EXISTS foo (bar int4)")
await db.execute("TRUNCATE foo CASCADE")
await asyncio.gather(
tx1(db._new_connection()),
tx2(db._new_connection())
)
if __name__ == '__main__':
asyncio.run(main())
use the connection on transaction with: transaction._connection
script
# pylint: skip-file
import asyncio
import os
from databases import Database
async def tx1(db):
async with db.transaction() as transaction:
await transaction._connection.execute("INSERT INTO foo VALUES (1)")
await asyncio.sleep(1.5)
async def tx2(db):
async with db.transaction() as transaction:
await asyncio.sleep(0.5)
result = await transaction._connection.execute("SELECT * FROM foo")
assert result is None, result
await asyncio.sleep(1)
async def main():
db = Database("postgresql://postgres:postgres@localhost/portal")
await db.connect()
await db.execute("CREATE TABLE IF NOT EXISTS foo (bar int4)")
await db.execute("TRUNCATE foo CASCADE")
await asyncio.gather(
tx1(db),
tx2(db)
)
if __name__ == '__main__':
asyncio.run(main())
Both work, but neither is ideal (accessing protected members). Personally I think the second approach (use the connection on the transaction) feels much more natural and clear (similar to how pogi does it). And it would only require to expose the already existing separate connection on the transaction (core.py#L207).
Side note: I need to make queries in fastAPI route handlers. The code in the documentation suggests to use one connection for all queries. How to start and use a transaction on this connection without impacting concurrent requests also accessing the DB? Or is it better to acquire a separate connection on each request as this example is suggesting?
I agree that this isn't clear and the accessing transaction connection can be improved. PRs are welcome.
As for your last comment, the reason why the docs say you should use a single connection is that the underlying connection is actually a connection pool. I don't think using connection-per-request would make sense, soon you'll reach the limits. Transaction-per-request should be allowed, it's been around in Django too. Maybe we can have different isolation levels for creating transactions, I think SQLAlchemy does that too. I need to take a look into it.
asyncpg Transaction docs here.
Update: We already have support for specifying isolation-level here.
So this should work for the isolation levels and needs to be documented:
db = Database("postgresql://...")
await db.connect()
async with db.transaction(isolation="serialisable"):
db.execute("SELECT 1")
But using the isolation level won't solve the issue in the example alone.
Added isolation docs in #434 .
I see this as another fallout from the current (databases <= 0.7.0) handling of ContextVar that I am fixing in #546.
await asyncio.gather(
tx1(db.connection()),
tx2(db.connection())
)
asyncio.gather
creates creates two asyncio.Tasks
, which may have been overwriting eachother's references to the active connection and transaction each task should have been using, which would cause the cannot perform operation: another operation is in progress
error you had been seeing.
As part of #546 I converted your example into a test, and it looked roughly like this:
@pytest.mark.parametrize("database_url", DATABASE_URLS)
async def test_parallel_transaction_isolation(database_url):
metadata = sqlalchemy.MetaData()
notes = sqlalchemy.Table(
"notes",
metadata,
sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
sqlalchemy.Column("text", sqlalchemy.String(length=100)),
sqlalchemy.Column("completed", sqlalchemy.Boolean),
)
engine = sqlalchemy.create_engine(database_url)
metadata.create_all(engine)
setup = asyncio.Event()
done = asyncio.Event()
async def tx1(connection):
async with connection.transaction():
await db.execute(
notes.insert(), values={"id": 1, "text": "tx1", "completed": False}
)
setup.set()
await done.wait()
async def tx2(connection):
async with connection.transaction():
await setup.wait()
result = await db.fetch_all(notes.select())
assert result == [], result
done.set()
async with Database(database_url) as db:
async with db.connection() as conn:
await asyncio.gather(tx1(conn), tx2(conn))
And I have a few thoughts for you @cochiseruhulessin to ponder, minor things first:
- Using
asyncio.sleep
is less guaranteed thanasyncio.Event()
, so I've switched to that. Same intent applies though:- Create a new row in one transaction
- Before the first transaction closes, enter a new transaction and make sure that inital row isn't observed by the other transaction
-
db.execute
returns the number of rows affected, or-1
, but neverNone
, as such your assertion will always fail. I switched todb.fetch_all
which returns a list of selected rows.
Now the main point here:
-
db.connection()
returns the same object. So your use of that prior to starting the new tasks means you end up with code logically equivalent to this:async with connection.transaction(): async with connection.transaction(): # both tx1 and tx2 transactions were opened before any sleep calls await connection.execute( notes.insert(), values={"id": 1, "text": "tx1", "completed": False} ) setup.set() await setup.wait() # Events help show when control switches to another task result = await db.fetch_all(notes.select()) assert result == [], result done.set() await done.wait() # Again just synchronization
- In this scenario, because the connection is shared, you actually end up with nested transactions. If you let each task acquire it's own connection the
cannot perform operation: another operation is in progress
bug you're mentioning still comes up in current versions of databases, but #546 will fix that. What this means is that you'll then be in this "virtual nested transactions" scenario because of the shared connection. - If you change your code to let each task get it's own connection, you should see the isolation you're expecting;
async with Database(database_url) as db: await asyncio.gather(tx1(db), tx2(db))
- In this scenario, because the connection is shared, you actually end up with nested transactions. If you let each task acquire it's own connection the