asyncpg icon indicating copy to clipboard operation
asyncpg copied to clipboard

CancelledError not correctly handled in bind_execute_many() (was: unusual asyncio-related deadlock seen with gather() where one or more tasks are failing)

Open zzzeek opened this issue 5 months ago • 5 comments

hey all, SQLAlchemy guy here, we have a user who submitted a complex deadlock example using asyncpg, and after much effort I've managed to eliminate all of SQLAlchemy as a factor and I can reproduce the problem with a very simple script using asyncpg alone. ~~I'm not really able to figure out what's actually happening here as this gets into the usual realm of asyncio tasks and cancellations~~ (see patch provided where the CancelledError should likely be handled). To try to isolate this as an asyncpg issue, I also reproduce the issue with the psycopg asyncio driver, which behaves differently and slightly better (though still has an odd timeout).

the script runs a long running INSERT statement on three separate connections, each in individual tasks sent to asyncio.gather(). A fourth task does nothing but raises an exception immediately. Once the fourth task is encountered, the first three tasks are interrupted, as asyncio.gather() is attempting to cancel all tasks. however asyncpg does not respond to the cancel and hangs instead, whereas the psycopg asyncio driver pauses for about 12 seconds when the task queue is being cancelled, but then the exception raises.

I'm running this against PostgreSQL 16.9 running on localhost. the user is running pg 17 something. The output with psycopg looks like:

Running with psycopg
Starting task 0
Starting task 1
Starting task 2
Failing - with psycopg, we expect the script to pause for about 12 seconds before raising ValueError
the long executemany() call was interrupted with: CancelledError()
the long executemany() call was interrupted with: CancelledError()
the long executemany() call was interrupted with: CancelledError()
Traceback (most recent call last):
  File "/home/classic/dev/sqlalchemy/test4.py", line 124, in <module>
    asyncio.run(main())
    ~~~~~~~~~~~^^^^^^^^
  File "/usr/lib64/python3.13/asyncio/runners.py", line 195, in run
    return runner.run(main)
           ~~~~~~~~~~^^^^^^
  File "/usr/lib64/python3.13/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^
  File "/usr/lib64/python3.13/asyncio/base_events.py", line 719, in run_until_complete
    return future.result()
           ~~~~~~~~~~~~~^^
  File "/home/classic/dev/sqlalchemy/test4.py", line 120, in main
    await asyncio.gather(*tasks)
  File "/home/classic/dev/sqlalchemy/test4.py", line 95, in crashing_task
    raise ValueError("Crashing task failed as expected")
ValueError: Crashing task failed as expected

Whereas with asyncpg, it hangs until ctrl-C is pressed. the trace after ctrl-C is also shown below

Running with asyncpg
Starting task 0
Starting task 1
Starting task 2
Failing - with asyncpg, we expect the script to hang indefinitely until ctrl-C is used
^CTraceback (most recent call last):
  File "/usr/lib64/python3.13/asyncio/runners.py", line 195, in run
    return runner.run(main)
           ~~~~~~~~~~^^^^^^
  File "/usr/lib64/python3.13/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^
  File "/usr/lib64/python3.13/asyncio/base_events.py", line 719, in run_until_complete
    return future.result()
           ~~~~~~~~~~~~~^^
  File "/home/classic/dev/sqlalchemy/test4.py", line 120, in main
    await asyncio.gather(*tasks)
  File "/home/classic/dev/sqlalchemy/test4.py", line 95, in crashing_task
    raise ValueError("Crashing task failed as expected")
ValueError: Crashing task failed as expected

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/classic/dev/sqlalchemy/test4.py", line 124, in <module>
    asyncio.run(main())
    ~~~~~~~~~~~^^^^^^^^
  File "/usr/lib64/python3.13/asyncio/runners.py", line 194, in run
    with Runner(debug=debug, loop_factory=loop_factory) as runner:
         ~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib64/python3.13/asyncio/runners.py", line 62, in __exit__
    self.close()
    ~~~~~~~~~~^^
  File "/usr/lib64/python3.13/asyncio/runners.py", line 70, in close
    _cancel_all_tasks(loop)
    ~~~~~~~~~~~~~~~~~^^^^^^
  File "/usr/lib64/python3.13/asyncio/runners.py", line 206, in _cancel_all_tasks
    loop.run_until_complete(tasks.gather(*to_cancel, return_exceptions=True))
    ~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib64/python3.13/asyncio/base_events.py", line 706, in run_until_complete
    self.run_forever()
    ~~~~~~~~~~~~~~~~^^
  File "/usr/lib64/python3.13/asyncio/base_events.py", line 677, in run_forever
    self._run_once()
    ~~~~~~~~~~~~~~^^
  File "/usr/lib64/python3.13/asyncio/base_events.py", line 1996, in _run_once
    event_list = self._selector.select(timeout)
  File "/usr/lib64/python3.13/selectors.py", line 452, in select
    fd_event_list = self._selector.poll(timeout, max_ev)
KeyboardInterrupt
Task was destroyed but it is pending!
task: <Task cancelling name='Task-2' coro=<long_commit_task() running at /home/classic/dev/sqlalchemy/test4.py:66> wait_for=<Future pending cb=[BaseProtocol._on_waiter_completed(), Task.task_wakeup()]> cb=[gather.<locals>._done_callback() at /usr/lib64/python3.13/asyncio/tasks.py:820, gather.<locals>._done_callback() at /usr/lib64/python3.13/asyncio/tasks.py:820]>
Task was destroyed but it is pending!
task: <Task cancelling name='Task-4' coro=<long_commit_task() running at /home/classic/dev/sqlalchemy/test4.py:66> wait_for=<Future pending cb=[BaseProtocol._on_waiter_completed(), Task.task_wakeup()]> cb=[gather.<locals>._done_callback() at /usr/lib64/python3.13/asyncio/tasks.py:820, gather.<locals>._done_callback() at /usr/lib64/python3.13/asyncio/tasks.py:820]>
Task was destroyed but it is pending!
task: <Task cancelling name='Task-3' coro=<long_commit_task() running at /home/classic/dev/sqlalchemy/test4.py:66> wait_for=<Future pending cb=[BaseProtocol._on_waiter_completed(), Task.task_wakeup()]> cb=[gather.<locals>._done_callback() at /usr/lib64/python3.13/asyncio/tasks.py:820, gather.<locals>._done_callback() at /usr/lib64/python3.13/asyncio/tasks.py:820]>
the long executemany() call was interrupted with: GeneratorExit()
the long executemany() call was interrupted with: GeneratorExit()
the long executemany() call was interrupted with: GeneratorExit()

For both versions, pg_stat_activity doesnt show any deadlock and instead shows ClientRead:

test=> SELECT pid, wait_event, query, query_start, state
FROM pg_stat_activity
WHERE state IS NOT NULL AND pid != PG_BACKEND_PID();
   pid   | wait_event |                             query                             |          query_start          | state  
---------+------------+---------------------------------------------------------------+-------------------------------+--------
 2072934 | ClientRead | INSERT INTO lotsa_rows_table (id, task_index) VALUES ($1, $2) | 2025-07-11 00:31:20.526354-04 | active
 2072936 | ClientRead | INSERT INTO lotsa_rows_table (id, task_index) VALUES ($1, $2) | 2025-07-11 00:31:20.526412-04 | active
 2072938 | ClientRead | INSERT INTO lotsa_rows_table (id, task_index) VALUES ($1, $2) | 2025-07-11 00:31:20.526307-04 | active
(3 rows)

I've made this script as straightforward and obvious as possible and can confirm the patch provided in the next comment resolves.

import asyncio
import contextlib
import uuid

# this script illustrates identical sequences with asyncpg and the
# psycopg async driver.   The steps here are heavily refined from a
# user-submitted script using SQLAlchemy, however here I've managed to
# reduce the issue down to just having a few simultaneous connections
# within tasks in a gather() call, while also having an exception raised
# from one task.   The issue does not seem to be related to database deadlocks.

# set to False to use asyncio psycopg instead
USE_ASYNCPG = True

# adjust URL for desired connection
URL = "postgresql://scott:tiger@localhost/test"

# when asyncpg is used, the script reaches the failing task and hangs
# indefinitely until ctrl-C is pressed.
#
# when using psycopg, the script reaches the failing task, there's a delay
# of about 12 seconds, then the executemany() calls are cancelled with
# CancelledError() and the ValueError is thrown.


async def make_a_connection():
    """make a new async DB connection from either asyncpg or psycopg"""

    if USE_ASYNCPG:
        import asyncpg

        return await asyncpg.connect(URL)
    else:
        import psycopg

        connection = await psycopg.AsyncConnection.connect(URL)
        # using autocommit to help indicate this does not seem to be a DB
        # deadlock
        await connection.set_autocommit(True)
        return connection


@contextlib.asynccontextmanager
async def execute_on(connection):
    """produce a connection or cursor with an execute()/executemany() method.

    The asyncpg and psycopg APIs differ in this regard, this function abstracts
    away that detail.

    """

    if USE_ASYNCPG:
        yield connection
    else:
        cursor = connection.cursor()
        yield cursor
        await cursor.close()


async def long_commit_task(task_index, connection):
    """runs a large executemany() INSERT statement on the given connection."""

    print(f"Starting task {task_index}")
    try:
        async with execute_on(connection) as conn:
            await conn.executemany(
                "INSERT INTO lotsa_rows_table (id, task_index) "
                + (
                    "VALUES ($1::UUID, $2::INTEGER)"
                    if USE_ASYNCPG
                    else "VALUES (%s, %s)"
                ),
                [(uuid.uuid4(), task_index) for i in range(100000)],
            )
    except BaseException as be:
        print(f"the long executemany() call was interrupted with: {be!r}")
        raise
    print(f"Task {task_index} Done")


async def crashing_task():
    """raises an error.."""

    if USE_ASYNCPG:
        print(
            "Failing - with asyncpg, we expect the "
            "script to hang indefinitely until ctrl-C is used"
        )
    else:
        print(
            "Failing - with psycopg, we expect the "
            "script to pause for about 12 seconds before raising ValueError"
        )

    raise ValueError("Crashing task failed as expected")


async def main():
    print(f"Running with {'asyncpg' if USE_ASYNCPG else 'psycopg'}")

    connection = await make_a_connection()
    async with execute_on(connection) as conn:
        await conn.execute("DROP TABLE IF EXISTS lotsa_rows_table")
        await conn.execute(
            "CREATE TABLE lotsa_rows_table (id uuid PRIMARY KEY, task_index INT)"
        )
    await connection.close()

    # create three tasks that each insert a large number of rows
    tasks = [
        asyncio.create_task(
            long_commit_task(i, connection=(await make_a_connection())),
        )
        for i in range(3)
    ]

    # create one more task that fails immediately
    tasks.append(asyncio.create_task(crashing_task()))

    await asyncio.gather(*tasks)


if __name__ == "__main__":
    asyncio.run(main())

zzzeek avatar Jul 11 '25 04:07 zzzeek

I've isolated where it goes wrong in bind_execute_many(), adding an except for CancelledError allows it to be handled. Though not sure if this should just be catching all BaseException rather than Exception. in any case, with this patch, the above script exits immediately on the ValueError and the original user-provided example with SQLAlchemy also exits immediately as well upon error.

diff --git a/asyncpg/protocol/protocol.pyx b/asyncpg/protocol/protocol.pyx
index bd2ad05..ce2f304 100644
--- a/asyncpg/protocol/protocol.pyx
+++ b/asyncpg/protocol/protocol.pyx
@@ -264,6 +264,9 @@ cdef class BaseProtocol(CoreProtocol):
         except Exception as ex:
             waiter.set_exception(ex)
             self._coreproto_error()
+        except asyncio.CancelledError as ce:
+            waiter.set_exception(ce)
+            self._coreproto_error()
         finally:
             return await waiter

zzzeek avatar Jul 11 '25 14:07 zzzeek

looking at the file there are a lot of other places that have a similar code, so maybe that should be a more general refactoring. @zzzeek did you also try if doing like what's done for a timeout also fixes the error? https://github.com/MagicStack/asyncpg/blob/5b14653e0b447d956aa01ec658562138e19f0293/asyncpg/protocol/protocol.pyx#L261-L263

CaselIT avatar Jul 12 '25 10:07 CaselIT

looking at the file there are a lot of other places that have a similar code, so maybe that should be a more general refactoring. @zzzeek did you also try if doing like what's done for a timeout also fixes the error?

i dont think so, no

zzzeek avatar Jul 12 '25 13:07 zzzeek

We're also seeing an issue with asyncpg via sqlalchemy where we're not deadlocking but also holding a lot of idle connections open in a pooled setting. @zzzeek would these result in idle connections that would be occupied?

cliandy avatar Jul 15 '25 18:07 cliandy

We're also seeing an issue with asyncpg via sqlalchemy where we're not deadlocking but also holding a lot of idle connections open in a pooled setting. @zzzeek would these result in idle connections that would be occupied?

that would not likely be this issue, no

it's something we see continuously with people using FastAPI though though the ultimate causes vary greatly

zzzeek avatar Jul 15 '25 18:07 zzzeek