cpython icon indicating copy to clipboard operation
cpython copied to clipboard

asyncio.create_subprocess_exec does not respond properly to asyncio.CancelledError

Open DarkArc opened this issue 2 years ago • 9 comments

Bug report

asyncio programs that call proc = await asyncio.create_subprocess_exec but do not reach the call to await proc.communicate are not properly cancelled.

This can be observed in the following script (it may take a few runs to observe):

import asyncio
import functools
import signal

counter = 0

async def run_bash_sleep():
  global counter
  counter += 1
  local_counter = counter
  try:
    print(f"Started - {local_counter}")
    proc = await asyncio.create_subprocess_exec(
      'bash', '-c', 'sleep .001',
      stdout = asyncio.subprocess.PIPE,
      stderr = asyncio.subprocess.PIPE,
      start_new_session = True
    )

    print(f"Waiting - {local_counter}")
    stdout, stderr = await proc.communicate()
    print(f"Done - {local_counter}!")
  except asyncio.CancelledError:
    print(f"Canceled - {local_counter}!")

async def run_loop(loop):
  max_jobs = 8
  active_tasks = []
  while True:
    try:
      # Add jobs to the list of active jobs
      while len(active_tasks) < max_jobs:
        active_tasks.append(loop.create_task(run_bash_sleep()))

      # All tasks have finished, end the loop
      if len(active_tasks) == 0:
        break

      # Wait for a test to finish (or a 1 second timeout)
      done, pending = await asyncio.wait(
        active_tasks,
        timeout = 1,
        return_when = asyncio.FIRST_COMPLETED
      )

      print(f"Running jobs: {len(active_tasks)}")

      # Update the active jobs
      active_tasks = list(pending)
    except asyncio.CancelledError:
      max_jobs = 0

def stop_asyncio_loop(signame, loop):
  for task in asyncio.all_tasks(loop):
    task.cancel()

def main():
  loop = asyncio.new_event_loop()

  asyncio.set_event_loop(loop)

  for signame in {'SIGINT', 'SIGTERM'}:
    loop.add_signal_handler(
      getattr(signal, signame),
      functools.partial(stop_asyncio_loop, signame, loop)
    )

  loop.run_until_complete(loop.create_task(run_loop(loop)))

main()

When the signal handler cancels the tasks, any task that hasn't made it to await proc.communicate() will never complete.

A subsequent SIGTERM to the script can then actually terminate the task; however, I'd expect the first call to cancel() to disrupt the coroutine.

Your environment

  • CPython versions tested on: 3.11.2, 3.11.3
  • Operating system and architecture: Fedora 38, x86_64

DarkArc avatar Apr 25 '23 17:04 DarkArc

Curiously this doesn't seem to affect Python 3.6.8 (running on 3.6 requires a change to the stop_asyncio_loop function, switching to asyncio.Task.all_tasks instead of asyncio.all_tasks).

Instead of an indefinite hang, a message is printed akin to Unknown child process pid 37535, will report returncode 255

DarkArc avatar Apr 25 '23 18:04 DarkArc

That looks like a nasty problem. Do you want to help by checking the logic of asyncio signals and subprocess creation? There might even be a simple fix.

gvanrossum avatar Apr 26 '23 15:04 gvanrossum

I don't currently have the bandwidth personally or professionally at the moment unfortunately. :(

I took a quick look through the create_subprocess_exec logic before reporting the issue, and didn't see anything that jumped out to me, but that doesn't mean there's nothing there.

DarkArc avatar May 01 '23 20:05 DarkArc

I’m in a similar situation, so we‘ll have to leave this open for a while.

gvanrossum avatar May 02 '23 00:05 gvanrossum

Copying my analysis from: https://github.com/python/cpython/issues/125502#issuecomment-2413536175

the problem occurs when asyncio.runners._cancel_all_tasks is run at an inopportune instant when connecting pipes:

This task gets cancelled:

https://github.com/python/cpython/blob/92af191a6a5f266b71373f5374ca0c9c522d62d9/Lib/asyncio/base_subprocess.py#L56

which means self._pending_calls is never run:

https://github.com/python/cpython/blob/92af191a6a5f266b71373f5374ca0c9c522d62d9/Lib/asyncio/base_subprocess.py#L199-L202

so when _try_finish appends self._call_connection_lost to self._pending_calls: https://github.com/python/cpython/blob/92af191a6a5f266b71373f5374ca0c9c522d62d9/Lib/asyncio/base_subprocess.py#L257

call_connection_lost is never called, which means self._exit_waiters are never woken: https://github.com/python/cpython/blob/92af191a6a5f266b71373f5374ca0c9c522d62d9/Lib/asyncio/base_subprocess.py#L259-L270

Here's a demo that hangs every time for me:

import sys
import inspect
import asyncio
from subprocess import PIPE


async def run_sleep():
    proc = await asyncio.create_subprocess_exec(
        "sleep",
        "0.002",
        stdout=PIPE,
    )
    await proc.communicate()


async def amain():
    loop = asyncio.get_running_loop()
    task = asyncio.current_task(loop)
    coro = task.get_coro()

    called_cancel = False

    def cancel_eventually():
        my_coro = coro
        while inspect.iscoroutine(my_coro.cr_await):
            my_coro = my_coro.cr_await
        if my_coro.cr_code is loop._make_subprocess_transport.__code__:
            print("_cancel_all_tasks")
            tasks = asyncio.all_tasks()
            for task in tasks:
                task.cancel()
        else:
            loop.call_soon(cancel_eventually)

    loop.call_soon(cancel_eventually)
    await run_sleep()


def main():
    asyncio.run(amain())


if __name__ == "__main__":
    sys.exit(main())

graingert avatar Oct 19 '24 18:10 graingert

@kumaraditya303 I think the fix is to use:

self._waiter = waiter
self._task = task = self._loop.create_task(self._connect_pipes())
task.add_done_callback(self._wake_waiter_and_call_pending_calls_or_close)

Then in def _wake_waiter_and_call_pending_calls_or_close(self, task): ... check the exception/done/cancelled state of task and do the right thing

I've spent a little time fiddling around but can't get the tests to pass and not spray a bunch of errors!

graingert avatar Oct 19 '24 18:10 graingert

Are these the same issue?

https://github.com/python/cpython/issues/115787 https://github.com/python/cpython/issues/105288

graingert avatar Oct 21 '24 21:10 graingert

@graingert Happy to take a look at this if no one else has started!

savannahostrowski avatar Oct 21 '24 23:10 savannahostrowski

@kumaraditya303 I think the fix is to use:

I think the same although I don't think it's that simple, we possibly need to rework and audit whole cancelation of subprocess, I am pretty sure it is broken at more places than this.

kumaraditya303 avatar Oct 24 '24 14:10 kumaraditya303

@graingert @kumaraditya303

I think the fix is to use: self._waiter = waiter self._task = task = self._loop.create_task(self._connect_pipes()) task.add_done_callback(self._wake_waiter_and_call_pending_calls_or_close)

I'm not so sure. This might fix the problem now, but IMO lead to code that's hard to maintain. If, say, connect_pipes task in time gets some other code that mustn't be cancelled and instead should be put into the callback.

In general, I think that separating logic between async/await code and done callbacks is a big antipattern.

Instead I'd think about shielding the connect_pipes/init code from cancellation, or handling it explicitly.

1st1 avatar Oct 29 '24 23:10 1st1

The easiest fix I can think of this is to just shield the whole subprocess creation using asyncio.shield and that seems to fix the issue.

diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py
index f69c6a64c39..73540a0b7e7 100644
--- a/Lib/asyncio/unix_events.py
+++ b/Lib/asyncio/unix_events.py
@@ -198,24 +198,27 @@ def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
     async def _make_subprocess_transport(self, protocol, args, shell,
                                          stdin, stdout, stderr, bufsize,
                                          extra=None, **kwargs):
-        watcher = self._watcher
-        waiter = self.create_future()
-        transp = _UnixSubprocessTransport(self, protocol, args, shell,
-                                        stdin, stdout, stderr, bufsize,
-                                        waiter=waiter, extra=extra,
-                                        **kwargs)
-        watcher.add_child_handler(transp.get_pid(),
-                                self._child_watcher_callback, transp)
-        try:
-            await waiter
-        except (SystemExit, KeyboardInterrupt):
-            raise
-        except BaseException:
-            transp.close()
-            await transp._wait()
-            raise
+        async def shielded():
+            watcher = self._watcher
+            waiter = self.create_future()
+            transp = _UnixSubprocessTransport(self, protocol, args, shell,
+                                            stdin, stdout, stderr, bufsize,
+                                            waiter=waiter, extra=extra,
+                                            **kwargs)
+            watcher.add_child_handler(transp.get_pid(),
+                                    self._child_watcher_callback, transp)
+            try:
+                await waiter
+            except (SystemExit, KeyboardInterrupt):
+                raise
+            except BaseException:
+                transp.close()
+                await transp._wait()
+                raise
+
+            return transp
 
-        return transp
+        return await tasks.shield(self.create_task(shielded()))
 
     def _child_watcher_callback(self, pid, returncode, transp):
         self.call_soon_threadsafe(transp._process_exited, returncode)

kumaraditya303 avatar May 27 '25 09:05 kumaraditya303