databases
databases copied to clipboard
Error when query data before asyncio.gather()
code
# Create a database instance, and connect to it.
import asyncio
import contextvars
from urllib.parse import quote
from databases import Database
database = Database(f'mysql://root:{quote("123456")}@192.168.1.80/learn_db')
async def main():
await database.connect()
query = "SELECT * FROM HighScores limit 1"
query2 = 'SELECT * FROM HighScores limit 3'
await database.fetch_all(query)
async def task1():
print('task1 start---')
await asyncio.sleep(4)
res = await database.fetch_all(query)
print('task1', res)
async def task2():
print('task2 start---')
async for i in database.iterate(query2):
await asyncio.sleep(2)
print('task2->', i)
await asyncio.gather(task1(), task2())
await database.disconnect()
asyncio.run(main())
error
Traceback (most recent call last):
File "/home/baloneo/gitee/learning-code/Python/databases/02-learn.py", line 37, in <module>
asyncio.run(main())
File "/home/baloneo/.pyenv/versions/3.8.12/lib/python3.8/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/home/baloneo/.pyenv/versions/3.8.12/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
return future.result()
File "/home/baloneo/gitee/learning-code/Python/databases/02-learn.py", line 32, in main
await asyncio.gather(task1(), task2())
File "/home/baloneo/gitee/learning-code/Python/databases/02-learn.py", line 23, in task1
res = await database.fetch_all(query)
File "/home/baloneo/.local/share/virtualenvs/learning-code-uVa5HjK0/lib/python3.8/site-packages/databases/core.py", line 149, in fetch_all
return await connection.fetch_all(query, values)
File "/home/baloneo/.local/share/virtualenvs/learning-code-uVa5HjK0/lib/python3.8/site-packages/databases/core.py", line 273, in fetch_all
return await self._connection.fetch_all(built_query)
File "/home/baloneo/.local/share/virtualenvs/learning-code-uVa5HjK0/lib/python3.8/site-packages/databases/backends/mysql.py", line 108, in fetch_all
await cursor.execute(query_str, args)
File "/home/baloneo/.local/share/virtualenvs/learning-code-uVa5HjK0/lib/python3.8/site-packages/aiomysql/cursors.py", line 239, in execute
await self._query(query)
File "/home/baloneo/.local/share/virtualenvs/learning-code-uVa5HjK0/lib/python3.8/site-packages/aiomysql/cursors.py", line 457, in _query
await conn.query(q)
File "/home/baloneo/.local/share/virtualenvs/learning-code-uVa5HjK0/lib/python3.8/site-packages/aiomysql/connection.py", line 428, in query
await self._read_query_result(unbuffered=unbuffered)
File "/home/baloneo/.local/share/virtualenvs/learning-code-uVa5HjK0/lib/python3.8/site-packages/aiomysql/connection.py", line 620, in _read_query_result
await result.read()
File "/home/baloneo/.local/share/virtualenvs/learning-code-uVa5HjK0/lib/python3.8/site-packages/aiomysql/connection.py", line 1103, in read
first_packet = await self.connection._read_packet()
File "/home/baloneo/.local/share/virtualenvs/learning-code-uVa5HjK0/lib/python3.8/site-packages/aiomysql/connection.py", line 559, in _read_packet
packet_header = await self._read_bytes(4)
File "/home/baloneo/.local/share/virtualenvs/learning-code-uVa5HjK0/lib/python3.8/site-packages/aiomysql/connection.py", line 596, in _read_bytes
data = await self._reader.readexactly(num_bytes)
File "/home/baloneo/.pyenv/versions/3.8.12/lib/python3.8/asyncio/streams.py", line 723, in readexactly
await self._wait_for_data('readexactly')
File "/home/baloneo/.pyenv/versions/3.8.12/lib/python3.8/asyncio/streams.py", line 503, in _wait_for_data
raise RuntimeError(
RuntimeError: readexactly() called while another coroutine is already waiting for incoming data
The ContextVar
is set when await database.fetch_all(query)
, causing both Tasks to share a ContextVar
is there any solution(reset ContextVar
value?)?
Any solution for this one? I am facing something similar in a project with FastAPI + databases 0.6.0. It is interesting because sometimes it works and sometimes it doesn't.
@router.post("/path")
async def generic_method(mysql_driver: Database = Depends(get_mysql_driver)):
async with mysql_driver.transaction():
some_data = {"a": "b"}
await generic_save_method(mysql_driver, some_data)
tasks = []
for i in range(0,10):
# both method are declared with async def another_generic_method1/2
if i % 2 == 0:
tasks.append(another_generic_save_method1(mysql_drive, i))
else:
tasks.append(another_generic_save_method2(mysql_drive, i))
result = await asyncio.gather(*tasks)
Even if I have a option to create a workaround with batch save and call something like below, I would like to know how to fix the above
batch_for_one = []
batch_for_two = []
for i in range(0,10):
if i % 2 == 0:
batch_for_one.append(i)
else:
batch_for_two.append(i)
await save_batch_for_generic_method1(mysql_drive, batch_for_one)
await save_batch_for_generic_method2(mysql_drive, batch_for_two)
# I think would be greate to be abe to do sth like:
# await asyncio.gather(save_batch_for_generic_method1(mysql_drive, batch_for_one),save_batch_for_generic_method2(mysql_drive, batch_for_two))
# but I think we'll run again in the same issue. I didn't tested this but it sounds like the same, I proffered the above version.
# The query in batch is the one with INSERT ... VALUES (generic_value), (another_generic_value), (so_on_value)
Edit: I found some time and dig into repo issues. This one is related to https://github.com/encode/databases/issues/327 ( thanks goteguru for the workaround ) and everything is linked here https://github.com/encode/databases/issues/456 . At least in the 0.6.0 the rollback is working alright so we might get this fixed soon.
ContextVars aren't handled all that carefully in current releases. I might have solved this for you in #546