asyncpg icon indicating copy to clipboard operation
asyncpg copied to clipboard

Asyncio task fails to cancel with asyncpg, gets stuck in a permanent loop

Open PookieBuns opened this issue 3 years ago • 0 comments

  • asyncpg version: 0.25.0, combined with sqlalchemy1.4.22, can also be reproduced with 0.24.0
  • PostgreSQL version: 13.4
  • Do you use a PostgreSQL SaaS? If so, which? Can you reproduce the issue with a local PostgreSQL install?: Reproduced Locally
  • Python version: Python3.9.7, also Python3.8.8
  • Platform: Windows 10 x64, also Centos7
  • Do you use pgbouncer?: No
  • Did you install asyncpg with pip?: yes
  • If you built asyncpg locally, which version of Cython did you use?:
  • Can the issue be reproduced under both asyncio and uvloop?: yes

when creating an asyncio task that does database updates with sqlalchemy and asyncpg, in certain circumstances when the task is cancelled, the task will fail to cancel and the task will end up in a pending state that will never be able to finish image When stuck in this state, exceptions can't even be raised to break out of the loop image The only way to exit out of this loop is to cancel the task again, either through keyboardInterrupt or executing task.cancel() again. However, there are chances when the second task.cancel() fails as well.

This is quite hard to reproduce as I am unsure exactly at which point in asyncpg running task.cancel() will cause this issue, but by fiddling around with some magic numbers, I am able to consistently reporduce this issue by retrying it a large amount of times.

Attached below is the code that i used to reproduce this issue

# Use postgres/example user/password credentials
version: '3.1'

services:

  db:
    image: postgres:13.4
    restart: always
    environment:
      POSTGRES_PASSWORD: example
    ports:
      - 5432:5432
# coding: utf-8
from sqlalchemy import String, text, Column, BigInteger
from sqlalchemy.dialects.postgresql import TIMESTAMP
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()
metadata = Base.metadata


class DummyTable(Base):
    __tablename__ = 'dummy_table'

    id = Column(BigInteger, primary_key=True)
    col_1 = Column(String(100), nullable=False, server_default=text("''::character varying"))
    col_2 = Column(String(100), nullable=False, server_default=text("''::character varying"))
    col_3 = Column(String(100), nullable=False, server_default=text("''::character varying"))

import sys
from pathlib import Path

sys.path.append(str(Path(__file__).parents[2]))

import asyncio
from loguru import logger
from dummy_table import metadata, DummyTable
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.pool import NullPool
from sqlalchemy import insert, update, bindparam

db_info = {
    "user":"postgres",
    "password":"example",
    "host":"localhost",
    "port":5432,
    "db":"postgres"
}

error_time_offset = 0.0
error_time_diff = 0.069

async def insert_to_db():
    engine = create_async_engine(
        "postgresql+asyncpg://{user}:{password}@{host}:{port}/{db}".format(
            **db_info
        ),
        poolclass=NullPool,
        # echo=True
    )
    async with engine.begin() as session:
        await asyncio.sleep(error_time_offset)
        update_values = [
            {
                "col_1":str(i+1),
                "bind_col_2":str(i)
            } for i in range(10000)
        ]
        stmt = DummyTable.__table__.update().where(
            DummyTable.col_2 == bindparam("bind_col_2")
        )
        logger.info("start update")
        await session.execute(stmt, update_values)
        logger.info("done update")

async def test_cancel():
    while True:
        logger.info("start_task")
        task = asyncio.create_task(insert_to_db())
        logger.info(task)
        await asyncio.sleep(error_time_offset + error_time_diff)
        task.cancel()
        logger.info(task)
        await asyncio.sleep(0.1)
        logger.info(task)
        print('hi')
        while not task.done():
            logger.error("cancel failed")
            # logger.warning(task)
            # print(task.get_stack)
            # break
            logger.info("throw exception now")
            raise Exception()
            # logger.info(task)
            # task.cancel()
            # logger.info(task)
            await asyncio.sleep(0.1)
        logger.success("task cancelled")

async def create_dummy_table():
    engine = create_async_engine(
        "postgresql+asyncpg://{user}:{password}@{host}:{port}/{db}".format(
            **db_info
        ),
        poolclass=NullPool,
        echo=False
    )
    async with engine.connect() as conn:
        await conn.run_sync(metadata.drop_all)
        await conn.run_sync(metadata.create_all)
        insert_values = [
            {
                "col_1":str(i),
                "col_2":str(i),
                "col_3":str(i),
            } for i in range(10000)
        ]
        stmt = insert(
            DummyTable
        ).values(
            insert_values
        )
        await conn.execute(stmt)
        await conn.commit()

async def main():
    await create_dummy_table()
    await test_cancel()


if __name__ == "__main__":
    asyncio.run(main())

you might have to fiddle around with the error_time_diff to reproduce this issue. What I think is causing the issue is the task is being cancelled right at the moment the asyncpg starts communicating with the database, shown in the log below, which is around the code's error_time_diff which is 0.07ms image

PookieBuns avatar Jan 07 '22 13:01 PookieBuns