asyncpg icon indicating copy to clipboard operation
asyncpg copied to clipboard

Asyncpg.pool creates more connections than its max_size

Open no-to-mediocrity opened this issue 1 year ago • 0 comments

  • asyncpg version:0.29.0
  • PostgreSQL version: "PostgreSQL 16.0 (Debian 16.0-1.pgdg120+1) on aarch64-unknown-linux-gnu, compiled by gcc (Debian 12.2.0-14) 12.2.0, 64-bit"
  • Do you use a PostgreSQL SaaS? If so, which? Can you reproduce the issue with a local PostgreSQL install?: Nope, using Dockerized container
  • Python version: 3.11.5
  • Platform: MacBook-Air Darwin Kernel Version 21.1.0: Wed Oct 13 17:33:24 PDT 2021; root:xnu-8019.41.5~1/RELEASE_ARM64_T8101 arm64
  • Do you use pgbouncer?: Nope
  • Did you install asyncpg with pip?: Yes
  • If you built asyncpg locally, which version of Cython did you use?: n/a
  • Can the issue be reproduced under both asyncio and uvloop?:Nope

I have a singleton for a Database entity, which is then used to write some primitive data. When I run the write function 1000 times concurrently using asyncio.gather(), the database reports that there is more connections than the max_size of the asyncpg.pool. For example, when I was testing there were 857 active db connections, but only 62 active pool connections. No other clients/operations were running during the test. When I use uvloop to do the same thing, it just crashes with ConnectionResetError: [Errno 54] Connection reset by peer if I try to run more tasks than the size of the pool.

Is this a normal pool behavior?

I use code below (the write function is simplified though):

The database code:

  class Database:
      _instance = None
      _pool = None
      db_params = { 
                  'host': os.getenv('DATABASE_HOST'),
                  'port': os.getenv('DATABASE_PORT'),
                  'database': os.getenv('DATABASE_NAME'),
                  'user': os.getenv('DATABASE_USER'),
                  'password': os.getenv('DATABASE_PASSWORD')
              }
  
      def __new__(cls, *args, **kwargs):
          if cls._instance is None:
              cls._instance = super(Database, cls).__new__(cls)
              #print(cls._instance)
          return cls._instance
  
      @classmethod
      async def get_pool(cls):
          if cls._pool is None:
              cls._pool = await asyncpg.create_pool(**cls.db_params, min_size=1, max_size=150)
              #print(cls._pool)
          return cls._pool

    @classmethod
    async def write(cls, result):
            pool = await cls.get_pool()
            try:
                    async with pool.acquire() as connection:
                        result = await connection.execute('''
                            INSERT INTO tables.results(
                                result
                            ) VALUES($1)
                        ''', result)
                        return
            except Exception as e:
                raise e

The demo write code

async def fake_result(i):
    print(f'generating fake result {i}')
    await db.record_result(i)
    return

async def run_functions_concurrently():
   tasks = [fake_result(i) for i in range(1000)]
   await asyncio.gather(*tasks)

def main():
    asyncio.run(run_functions_concurrently())

if __name__ == "__main__":
    main()

no-to-mediocrity avatar Nov 19 '23 12:11 no-to-mediocrity