asyncpg icon indicating copy to clipboard operation
asyncpg copied to clipboard

asyncpg + aiogram is not working

Open rive-n opened this issue 3 years ago • 1 comments

  • asyncpg version: 0.25.0
  • PostgreSQL version: latest
  • Do you use a PostgreSQL SaaS? No the issue with a local PostgreSQL install?: No
  • Python version: 3.8
  • Platform: MacOS
  • Do you use pgbouncer?: No
  • Did you install asyncpg with pip?: Yes
  • If you built asyncpg locally, which version of Cython did you use?: -
  • Can the issue be reproduced under both asyncio and uvloop?: I dont know

There is some code abstraction coded with aiogram:

async def check_rights(message: types.Message):
			return await bot['db_pool'].fetch_query('SELECT is_admin FROM users WHERE id = $1::varchar(30) and username = $2::varchar(30)', 
			str(message.chat.id), message.from_user.username)


@bot_dispatcher.message_handler(check_rights, commands=['add_admin'])
async def add_admin(message: types.Message):
	print('is_admin!')

That's how bot['db_pool'] is created:

async def on_startup():
	db = Database()
	bot = Bot(token=token)
	bot_dispatcher = Dispatcher(bot)

	try:
		await db.setup_pool()
	except asyncpg.exception.InvalidPasswordError:
		logging.ERROR("Username or password if incorrect")
		exit(-1)
	else:
		bot['db_pool'] = db
		bot['getMeInfo'] = await bot.get_me()
		print(await bot['db_pool'].test())
		return bot, bot_dispatcher

And database object as well:

class Database(object):
	def __init__(self):
		if environ.get('DEBUG'):
			self.host, self.port, self.database = '127.0.0.1', 5432, 'test'
		else:
			self.host, self.port, self.database = environ.get('PG_HOST', 'postgres'), environ.get('PG_PORT', 5432), environ.get("PG_DATABASE")

	@property
	def get_pool(self):
		return self.pool

	async def setup_pool(self):
			self.pool = await create_pool(
				user = environ.get('PG_USERNAME'),
				password = environ.get('PG_PASSWORD'),
				host = self.host, port = self.port,
				database = self.database
			)

	async def fetch_query(self, query: str, *args):
		async with self.pool.acquire() as connection:
			async with connection.transaction():
				return await connection.fetchval(query, *args)

	async def test(self):
		return await self.fetch_query("select 2 ^ $1", 1)

Well if I am creating event loop like this:

loop = new_event_loop()
bot, bot_dispatcher = loop.run_until_complete(on_startup())

And starting bot like this:

if __name__ == "__main__":
	print(f"[+] Bot started with username: {bot['getMeInfo'].username}")
	executor.start_polling(bot_dispatcher, skip_updates=True)

I can't execute POOL methods, for example:

async def check_rights(message: types.Message):
			return await bot['db_pool'].fetch_query('SELECT is_admin FROM users WHERE id = $1::varchar(30) and username = $2::varchar(30)', 
			str(message.chat.id), message.from_user.username)

Error trace:

Task exception was never retrieved
future: <Task finished name='Task-32' coro=<Dispatcher._process_polling_updates() done, defined at /Users/riven/Desktop/Projects/botname/src/venv/lib/python3.8/site-packages/aiogram/dispatcher/dispatcher.py:407> exception=InterfaceError('cannot perform operation: another operation is in progress')>
Traceback (most recent call last):
  File "/Users/riven/Desktop/Projects/botname/src/venv/lib/python3.8/site-packages/aiogram/dispatcher/dispatcher.py", line 415, in _process_polling_updates
    for responses in itertools.chain.from_iterable(await self.process_updates(updates, fast)):
  File "/Users/riven/Desktop/Projects/botname/src/venv/lib/python3.8/site-packages/aiogram/dispatcher/dispatcher.py", line 235, in process_updates
    return await asyncio.gather(*tasks)
  File "/Users/riven/Desktop/Projects/botname/src/venv/lib/python3.8/site-packages/aiogram/dispatcher/handler.py", line 116, in notify
    response = await handler_obj.handler(*args, **partial_data)
  File "/Users/riven/Desktop/Projects/botname/src/venv/lib/python3.8/site-packages/aiogram/dispatcher/dispatcher.py", line 256, in process_update
    return await self.message_handlers.notify(update.message)
  File "/Users/riven/Desktop/Projects/botname/src/venv/lib/python3.8/site-packages/aiogram/dispatcher/handler.py", line 107, in notify
    data.update(await check_filters(handler_obj.filters, args))
  File "/Users/riven/Desktop/Projects/botname/src/venv/lib/python3.8/site-packages/aiogram/dispatcher/filters/filters.py", line 72, in check_filters
    f = await execute_filter(filter_, args)
  File "/Users/riven/Desktop/Projects/botname/src/venv/lib/python3.8/site-packages/aiogram/dispatcher/filters/filters.py", line 56, in execute_filter
    return await filter_.filter(*args, **filter_.kwargs)
  File "__main__.py", line 65, in check_rights
    return await bot['db_pool'].fetch_query('SELECT is_admin FROM users WHERE id = $1::varchar(30) and username = $2::varchar(30)',
  File "/Users/riven/Desktop/Projects/botname/src/database.py", line 33, in fetch_query
    return await connection.fetchval(query, *args)
  File "/Users/riven/Desktop/Projects/botname/src/venv/lib/python3.8/site-packages/asyncpg/pool.py", line 993, in __aexit__
    await self.pool.release(con)
  File "/Users/riven/Desktop/Projects/botname/src/venv/lib/python3.8/site-packages/asyncpg/pool.py", line 867, in release
    return await asyncio.shield(ch.release(timeout))
  File "/Users/riven/Desktop/Projects/botname/src/venv/lib/python3.8/site-packages/asyncpg/pool.py", line 224, in release
    raise ex
  File "/Users/riven/Desktop/Projects/botname/src/venv/lib/python3.8/site-packages/asyncpg/pool.py", line 214, in release
    await self._con.reset(timeout=budget)
  File "/Users/riven/Desktop/Projects/botname/src/venv/lib/python3.8/site-packages/asyncpg/connection.py", line 1367, in reset
    await self.execute(reset_query, timeout=timeout)
  File "/Users/riven/Desktop/Projects/botname/src/venv/lib/python3.8/site-packages/asyncpg/connection.py", line 318, in execute
    return await self._protocol.query(query, timeout)
  File "asyncpg/protocol/protocol.pyx", line 323, in query
  File "asyncpg/protocol/protocol.pyx", line 707, in asyncpg.protocol.protocol.BaseProtocol._check_state
asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress

cannot perform operation: another operation is in progress - Which one? I am doing only 1 request to database 0-o

rive-n avatar Jun 19 '22 18:06 rive-n

I found a solution, but that's really strange one...

Before of starting pool: executor.start_polling(bot_dispatcher, skip_updates=True) we need to configure Database object like this:

loop = get_event_loop()
db = Database(loop)
bot['db_pool'] = db

And Database object:

class Database(object):
	def __init__(self, loop):
		if environ.get('DEBUG'):
			self.host, self.port, self.database = '127.0.0.1', 5432, 'test'
		else:
			self.host, self.port, self.database = environ.get('PG_HOST', 'postgres'), environ.get('PG_PORT', 5432), environ.get("PG_DATABASE")

		self.pool = loop.run_until_complete(create_pool(
				user = environ.get('PG_USERNAME'),
				password = environ.get('PG_PASSWORD'),
				host = self.host, port = self.port,
				database = self.database
			))

This part is pretty strange for me:

loop.run_until_complete(create_pool(
				user = environ.get('PG_USERNAME'),
				password = environ.get('PG_PASSWORD'),
				host = self.host, port = self.port,
				database = self.database
			))

Why we should create pool via loop object?

rive-n avatar Jun 19 '22 19:06 rive-n