cpython
cpython copied to clipboard
asyncio.create_subprocess_exec does not respond properly to asyncio.CancelledError
Bug report
asyncio programs that call proc = await asyncio.create_subprocess_exec but do not reach the call to await proc.communicate are not properly cancelled.
This can be observed in the following script (it may take a few runs to observe):
import asyncio
import functools
import signal
counter = 0
async def run_bash_sleep():
global counter
counter += 1
local_counter = counter
try:
print(f"Started - {local_counter}")
proc = await asyncio.create_subprocess_exec(
'bash', '-c', 'sleep .001',
stdout = asyncio.subprocess.PIPE,
stderr = asyncio.subprocess.PIPE,
start_new_session = True
)
print(f"Waiting - {local_counter}")
stdout, stderr = await proc.communicate()
print(f"Done - {local_counter}!")
except asyncio.CancelledError:
print(f"Canceled - {local_counter}!")
async def run_loop(loop):
max_jobs = 8
active_tasks = []
while True:
try:
# Add jobs to the list of active jobs
while len(active_tasks) < max_jobs:
active_tasks.append(loop.create_task(run_bash_sleep()))
# All tasks have finished, end the loop
if len(active_tasks) == 0:
break
# Wait for a test to finish (or a 1 second timeout)
done, pending = await asyncio.wait(
active_tasks,
timeout = 1,
return_when = asyncio.FIRST_COMPLETED
)
print(f"Running jobs: {len(active_tasks)}")
# Update the active jobs
active_tasks = list(pending)
except asyncio.CancelledError:
max_jobs = 0
def stop_asyncio_loop(signame, loop):
for task in asyncio.all_tasks(loop):
task.cancel()
def main():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
for signame in {'SIGINT', 'SIGTERM'}:
loop.add_signal_handler(
getattr(signal, signame),
functools.partial(stop_asyncio_loop, signame, loop)
)
loop.run_until_complete(loop.create_task(run_loop(loop)))
main()
When the signal handler cancels the tasks, any task that hasn't made it to await proc.communicate() will never complete.
A subsequent SIGTERM to the script can then actually terminate the task; however, I'd expect the first call to cancel() to disrupt the coroutine.
Your environment
- CPython versions tested on: 3.11.2, 3.11.3
- Operating system and architecture: Fedora 38, x86_64
Curiously this doesn't seem to affect Python 3.6.8 (running on 3.6 requires a change to the stop_asyncio_loop function, switching to asyncio.Task.all_tasks instead of asyncio.all_tasks).
Instead of an indefinite hang, a message is printed akin to Unknown child process pid 37535, will report returncode 255
That looks like a nasty problem. Do you want to help by checking the logic of asyncio signals and subprocess creation? There might even be a simple fix.
I don't currently have the bandwidth personally or professionally at the moment unfortunately. :(
I took a quick look through the create_subprocess_exec logic before reporting the issue, and didn't see anything that jumped out to me, but that doesn't mean there's nothing there.
I’m in a similar situation, so we‘ll have to leave this open for a while.
Copying my analysis from: https://github.com/python/cpython/issues/125502#issuecomment-2413536175
the problem occurs when asyncio.runners._cancel_all_tasks is run at an inopportune instant when connecting pipes:
This task gets cancelled:
https://github.com/python/cpython/blob/92af191a6a5f266b71373f5374ca0c9c522d62d9/Lib/asyncio/base_subprocess.py#L56
which means self._pending_calls is never run:
https://github.com/python/cpython/blob/92af191a6a5f266b71373f5374ca0c9c522d62d9/Lib/asyncio/base_subprocess.py#L199-L202
so when _try_finish appends self._call_connection_lost to self._pending_calls: https://github.com/python/cpython/blob/92af191a6a5f266b71373f5374ca0c9c522d62d9/Lib/asyncio/base_subprocess.py#L257
call_connection_lost is never called, which means self._exit_waiters are never woken: https://github.com/python/cpython/blob/92af191a6a5f266b71373f5374ca0c9c522d62d9/Lib/asyncio/base_subprocess.py#L259-L270
Here's a demo that hangs every time for me:
import sys
import inspect
import asyncio
from subprocess import PIPE
async def run_sleep():
proc = await asyncio.create_subprocess_exec(
"sleep",
"0.002",
stdout=PIPE,
)
await proc.communicate()
async def amain():
loop = asyncio.get_running_loop()
task = asyncio.current_task(loop)
coro = task.get_coro()
called_cancel = False
def cancel_eventually():
my_coro = coro
while inspect.iscoroutine(my_coro.cr_await):
my_coro = my_coro.cr_await
if my_coro.cr_code is loop._make_subprocess_transport.__code__:
print("_cancel_all_tasks")
tasks = asyncio.all_tasks()
for task in tasks:
task.cancel()
else:
loop.call_soon(cancel_eventually)
loop.call_soon(cancel_eventually)
await run_sleep()
def main():
asyncio.run(amain())
if __name__ == "__main__":
sys.exit(main())
@kumaraditya303 I think the fix is to use:
self._waiter = waiter
self._task = task = self._loop.create_task(self._connect_pipes())
task.add_done_callback(self._wake_waiter_and_call_pending_calls_or_close)
Then in def _wake_waiter_and_call_pending_calls_or_close(self, task): ... check the exception/done/cancelled state of task and do the right thing
I've spent a little time fiddling around but can't get the tests to pass and not spray a bunch of errors!
Are these the same issue?
https://github.com/python/cpython/issues/115787 https://github.com/python/cpython/issues/105288
@graingert Happy to take a look at this if no one else has started!
@kumaraditya303 I think the fix is to use:
I think the same although I don't think it's that simple, we possibly need to rework and audit whole cancelation of subprocess, I am pretty sure it is broken at more places than this.
@graingert @kumaraditya303
I think the fix is to use: self._waiter = waiter self._task = task = self._loop.create_task(self._connect_pipes()) task.add_done_callback(self._wake_waiter_and_call_pending_calls_or_close)
I'm not so sure. This might fix the problem now, but IMO lead to code that's hard to maintain. If, say, connect_pipes task in time gets some other code that mustn't be cancelled and instead should be put into the callback.
In general, I think that separating logic between async/await code and done callbacks is a big antipattern.
Instead I'd think about shielding the connect_pipes/init code from cancellation, or handling it explicitly.
The easiest fix I can think of this is to just shield the whole subprocess creation using asyncio.shield and that seems to fix the issue.
diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py
index f69c6a64c39..73540a0b7e7 100644
--- a/Lib/asyncio/unix_events.py
+++ b/Lib/asyncio/unix_events.py
@@ -198,24 +198,27 @@ def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
async def _make_subprocess_transport(self, protocol, args, shell,
stdin, stdout, stderr, bufsize,
extra=None, **kwargs):
- watcher = self._watcher
- waiter = self.create_future()
- transp = _UnixSubprocessTransport(self, protocol, args, shell,
- stdin, stdout, stderr, bufsize,
- waiter=waiter, extra=extra,
- **kwargs)
- watcher.add_child_handler(transp.get_pid(),
- self._child_watcher_callback, transp)
- try:
- await waiter
- except (SystemExit, KeyboardInterrupt):
- raise
- except BaseException:
- transp.close()
- await transp._wait()
- raise
+ async def shielded():
+ watcher = self._watcher
+ waiter = self.create_future()
+ transp = _UnixSubprocessTransport(self, protocol, args, shell,
+ stdin, stdout, stderr, bufsize,
+ waiter=waiter, extra=extra,
+ **kwargs)
+ watcher.add_child_handler(transp.get_pid(),
+ self._child_watcher_callback, transp)
+ try:
+ await waiter
+ except (SystemExit, KeyboardInterrupt):
+ raise
+ except BaseException:
+ transp.close()
+ await transp._wait()
+ raise
+
+ return transp
- return transp
+ return await tasks.shield(self.create_task(shielded()))
def _child_watcher_callback(self, pid, returncode, transp):
self.call_soon_threadsafe(transp._process_exited, returncode)