databases icon indicating copy to clipboard operation
databases copied to clipboard

MySQL Connection Pool Doesn't Seem to Work

Open Vastxiao opened this issue 2 years ago • 5 comments

I am experiencing an issue with the MySQL connection pool. I have been testing the following code:

DATABASE_URL = "mysql+aiomysql://root:root192.168.62.195:3306/test?charset=utf8mb4"

# Additional database URL parameters can also be passed in the Database constructor
database = Database(DATABASE_URL, min_size=3, max_size=10, charset="utf8mb4")

async def exec_sql(exec_func, sql: str, tid):
    while True:
        try:
            start_time = time.time()
            result = await exec_func(sql)
            logger.info(f'task({tid}|{start_time:.0f}|{time.time() - start_time:.3f}) {exec_func} {sql} {result}')
        except MySQLError as err:
            logger.error(f'task({tid}){exec_func} {sql} {err}')

async def main():
    await database.connect()

    async with asyncio.TaskGroup() as tg:
        for tid in range(1, 100000):
            tg.create_task(exec_sql(database.fetch_all, 'show tables', tid))
            tg.create_task(exec_sql(database.fetch_one, 'select * from tbl_sms_record', tid))
            tg.create_task(exec_sql(database.fetch_val, 'select * from tbl_sms_record', tid))

    await database.disconnect()

asyncio.run(main())

During concurrency testing, I noticed that only one TCP connection is being utilized for executing queries, while the other two initial connections remain idle.

MySQL process list output:

mysql> show processlist;
+-----+-----------------+---------------------+--------+---------+----------+----------------------------+------------------------------+
| Id  | User            | Host                | db     | Command | Time     | State                      | Info                         |
+-----+-----------------+---------------------+--------+---------+----------+----------------------------+------------------------------+
|  99 | root            | 192.168.51.70:59810 | db_sms | Sleep   |       46 |                            | NULL                         |
| 100 | root            | 192.168.51.70:59826 | db_sms | Query   |        0 | waiting for handler commit | select * from tbl_sms_record |
| 101 | root            | 192.168.51.70:59832 | db_sms | Sleep   |       46 |                            | NULL                         |
+-----+-----------------+---------------------+--------+---------+----------+----------------------------+------------------------------+

TCP Connections:

State        Recv-Q   Send-Q           Local Address:Port                  Peer Address:Port    Process
ESTAB 0      0      [::ffff:10.0.0.161]:3306  [::ffff:192.168.51.70]:37426 users:(("mysqld",pid=542638,fd=39))
ESTAB 0      1400   [::ffff:10.0.0.161]:3306  [::ffff:192.168.51.70]:37412 users:(("mysqld",pid=542638,fd=38))
ESTAB 0      0      [::ffff:10.0.0.161]:3306  [::ffff:192.168.51.70]:37408 users:(("mysqld",pid=542638,fd=37))

I expected that with 100,000 concurrent tasks, more connections would be utilized. Is this an issue with my code or the databases library?

Vastxiao avatar Jul 14 '23 08:07 Vastxiao

yeah, it's an issue with databases library

test the working code:

import aiomysql
import asyncio


async def create_pool():
    pool = await aiomysql.create_pool(
        host='192.168.62.195',
        port=3306,
        user='root',
        password='xiao',
        db='db_sms',
        minsize=3,
        maxsize=10
    )
    return pool


async def execute_query(pool, query):
    while True:
        async with pool.acquire() as conn:
            async with conn.cursor() as cur:
                await cur.execute(query)
                result = await cur.fetchall()
                # return result
                print(result)


async def run_concurrent_queries(pool, num_queries):
    tasks = []
    query = "select * from tbl_sms_record"

    async with asyncio.TaskGroup() as tg:
        for tid in range(num_queries):
            # tg.create_task(exec_sql(database.fetch_all, 'show tables', tid))
            tg.create_task(execute_query(pool, query))


async def main():
    pool = await create_pool()
    num_queries = 100000
    await run_concurrent_queries(pool, num_queries)
    pool.close()
    await pool.wait_closed()


asyncio.run(main())

TCP Connections:

ESTAB 0      1400   [::ffff:10.0.0.161]:3306  [::ffff:192.168.50.139]:53520 users:(("mysqld",pid=542638,fd=37))                        
ESTAB 0      1400   [::ffff:10.0.0.161]:3306  [::ffff:192.168.50.139]:53560 users:(("mysqld",pid=542638,fd=42))                        
ESTAB 0      1400   [::ffff:10.0.0.161]:3306  [::ffff:192.168.50.139]:53574 users:(("mysqld",pid=542638,fd=45))                        
ESTAB 0      1400   [::ffff:10.0.0.161]:3306  [::ffff:192.168.50.139]:53530 users:(("mysqld",pid=542638,fd=38))                        
ESTAB 0      1400   [::ffff:10.0.0.161]:3306  [::ffff:192.168.50.139]:53564 users:(("mysqld",pid=542638,fd=44))                        
ESTAB 0      1400   [::ffff:10.0.0.161]:3306  [::ffff:192.168.50.139]:53548 users:(("mysqld",pid=542638,fd=41))                        
ESTAB 0      1400   [::ffff:10.0.0.161]:3306  [::ffff:192.168.50.139]:53510 users:(("mysqld",pid=542638,fd=23))                        
ESTAB 0      1400   [::ffff:10.0.0.161]:3306  [::ffff:192.168.50.139]:53538 users:(("mysqld",pid=542638,fd=39))                        
ESTAB 0      1400   [::ffff:10.0.0.161]:3306  [::ffff:192.168.50.139]:53562 users:(("mysqld",pid=542638,fd=43))                        
ESTAB 0      1400   [::ffff:10.0.0.161]:3306  [::ffff:192.168.50.139]:53590 users:(("mysqld",pid=542638,fd=46))

and qps to 1k databases just qps 600

Vastxiao avatar Jul 15 '23 02:07 Vastxiao

Thanks for the report! Let me know if you determine the cause — there's not a lot of activity from people that would know the reason off the top of their head.

zanieb avatar Jul 15 '23 03:07 zanieb

@Vastxiao which library works for making async database connection which supports connection pooling, raw SQL execution ?

Priyansh2 avatar Jul 15 '23 07:07 Priyansh2