aiomysql
aiomysql copied to clipboard
The latest data cannot be queried when I use the pool
When I use the connection pool, I add or modify data to the database, and then get a connection from the connection pool to query. The resulting data is still old data. like this:
loop = asyncio.get_event_loop()
async def go():
pool = await aiomysql.create_pool(host='127.0.0.1', port=3306,
user='root', password='xxx',
db='study', loop=loop)
while True:
await asyncio.sleep(2)
async with pool.acquire() as conn: # type: aiomysql.Connection
async with conn.cursor() as cur:
await cur.execute("SELECT nickname FROM tb_user ORDER BY id DESC LIMIT 1")
# print(cur.description)
(r,) = await cur.fetchone()
print(r)
print(pool.size)
loop.run_until_complete(go())
When I change the way I write it he can normally query the new data. like this:
loop = asyncio.get_event_loop()
async def go():
pool = await aiomysql.create_pool(host='127.0.0.1', port=3306,
user='root', password='xxx',
db='study', loop=loop)
while True:
await asyncio.sleep(1)
async with await pool.acquire() as conn: # type: aiomysql.Connection
async with conn.cursor() as cur:
await cur.execute("SELECT nickname FROM tb_user ORDER BY id DESC LIMIT 1")
# print(cur.description)
(r,) = await cur.fetchone()
pool.release(conn)
print(r)
print(pool.size)
loop.run_until_complete(go())
But then I got a warning:
An open stream object is being garbage collected; call "stream.close()" explicitly.
I don't know how to get real-time update data from the database in the connection pool normally.
@lastshusheng i guess this is a bug, i tested, and found a solution
whenever you get a connection instance
you can call its select_db
api, whether you provide db
option at initiallization
the code below is an example
import asyncio
import time
import aiomysql
db_config = {
'host': 'localhost',
'port': 32769,
'user': 'root',
'password': 'password here',
'db': 'db name'
}
query_timeout = 10
async def main():
start_time = time.time()
my_pool = await aiomysql.create_pool(**db_config)
while True:
if time.time() - start_time > query_timeout:
break
my_conn = await my_pool.acquire()
# use select_db here, important can get the latest data from db
await my_conn.select_db('tdb')
my_cursor = await my_conn.cursor()
await my_cursor.execute("select * from tb_users")
fetch_result = await my_cursor.fetchall()
print(fetch_result)
await my_cursor.close()
my_pool.release(my_conn)
await asyncio.sleep(2)
my_pool.close()
await my_pool.wait_closed()
async def main2():
start_time = time.time()
my_pool = await aiomysql.create_pool(**db_config)
while True:
if time.time() - start_time > query_timeout:
break
async with my_pool.acquire() as my_conn:
# use select_db here, important can get the latest data from db
await my_conn.select_db('tdb')
async with my_conn.cursor() as my_cursor:
await my_cursor.execute("select * from tb_users")
fetch_result = await my_cursor.fetchall()
print(fetch_result)
await asyncio.sleep(2)
my_pool.close()
await my_pool.wait_closed()
if __name__ == '__main__':
asyncio.run(main2())
you can copy and paste it and test it, i hope can help you
@suiyuex thanks for your anwser I tried with your anwser, but i still got a warning. The difference is that not every time there's a warning, a warning is given each time the loop executes the second time, my print likes this:
Co用户-cQ6m
1
An open stream object is being garbage collected; call "stream.close()" explicitly.
Co用户-cQ6m
0
Co用户-cQ6m
1
An open stream object is being garbage collected; call "stream.close()" explicitly.
Co用户-cQ6m
0
And my script likes this:
pool = await aiomysql.create_pool(host=host, port=port, user=user, password=password, loop=loop)
while True:
await asyncio.sleep(1)
async with pool.acquire() as conn: # type: aiomysql.Connection
await conn.select_db('study')
async with conn.cursor() as cur:
await cur.execute("SELECT nickname FROM tb_user ORDER BY id DESC LIMIT 1")
(r,) = await cur.fetchone()
print(r)
print(pool.size, '\n')
This warning appears when I use python3.8, with python3.7 that not appear
@lastshusheng I ran the code you provided, But there is no warning message
my code like this
import asyncio
import time
import aiomysql
db_config = {
'host': 'localhost',
'port': 32769,
'user': 'root',
'password': '*',
}
query_timeout = 10
async def main2():
start_time = time.time()
my_pool = await aiomysql.create_pool(**db_config)
while True:
if time.time() - start_time > query_timeout:
break
async with my_pool.acquire() as my_conn:
# use select_db here, important can get the latest data from db
await my_conn.select_db('tdb')
async with my_conn.cursor() as my_cursor:
await my_cursor.execute("select name from tb_users order by age desc limit 10")
r = await my_cursor.fetchone()
print(r)
print(my_pool.size, '\n')
await asyncio.sleep(2)
my_pool.close()
await my_pool.wait_closed()
if __name__ == '__main__':
asyncio.run(main2())
this is output
my python version is 3.8.5 (tags/v3.8.5:580fbb0, Jul 20 2020, 15:57:54) [MSC v.1924 64 bit (AMD64)]
This puzzles me, and im not sure what caused it, maybe mysql version
and im mysql version is
+-----------+
| version() |
+-----------+
| 8.0.21 |
+-----------+
😵😵😵
but, Can you get the latest data using that method provided above?
@suiyuex I can get the latest data when i used select_db api. This warning probably only a bug in python3.8.0. I used aiomysql in my django project instead of the synchronized ORM, but when I saw this warning I was worried about a memory leak online, in your opinion, will this happen? Finally, Chinese?
@lastshusheng do not worry, This will not cause memory leak, According to the warning message,that stream has been recycled by the garbage collection mechanism,so that is destroyed!yes ,Im Chinese。
It looks like you aren't setting "autocommit": True
in your db_config
. I think this means your select statements will be opening implicit transactions. When a connection is reused, it has an open transaction from the previous query, so it doesn't see the new data.
transaction
@matthewswain Thanks,i have tried it and it worked. I don't know transaction will affect the select statement. I was so stupid.
Setting the proper transaction isolation level in every brand new connection helped me. Actually, I wrapped connection into my class with __aenter__
and __aexit__
, and perform the query inside my __aenter__
. Something like this:
async def __aenter__(self):
self.context = self.conn_pool.acquire()
self.conn = await self.context.__aenter__()
async with self.conn.cursor() as cur:
await cur.execute("SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED;")
return self
async def __aexit__(self, *args):
return await self.context.__aexit__(*args)