asyncssh
asyncssh copied to clipboard
task hangs when another exception is raised when options.waiter is cancelled.
Asyncssh 2.14.1 Linux mint 21.3 (ubuntu jammy) Python 3.12.0
The issue is reproducable using the following code
import asyncio
from traceback import print_exc
from typing import TypedDict
import asyncssh
ops_config = {
#config dict
}
class SshCred(TypedDict):
host: str
username: str
password: str
class Config(TypedDict):
ssh: SshCred
QUERY = "Some Query"
class ErrorCase:
def __init__(
self, config: Config
) -> None:
cfg: Config = config
self._ssh_cred: SshCred = cfg["ssh"]
self.task_q: asyncio.Queue[tuple[str, asyncio.Queue]] = asyncio.Queue()
self.terminate = asyncio.Event()
async def query_task(self):
listener: None | asyncssh.SSHListener
try:
async with asyncssh.connect(**self._ssh_cred) as conn:
listener = await conn.forward_local_port("", 3306, "localhost", 3306)
raise ZeroDivisionError
finally:
if listener is not None:
listener.close()
await listener.wait_closed()
async def query(self, query: str):
loop = asyncio.get_running_loop()
resp_q = asyncio.Queue()
await self.task_q.put((query, resp_q))
try:
async with asyncio.timeout(5) as cm:
await self.task_q.join()
cm.reschedule(loop.time() + 1)
return await resp_q.get()
except TimeoutError:
return
async def main_task(self):
while not self.terminate.is_set():
try:
await self.query_task()
except ZeroDivisionError:
pass
except Exception:
print_exc()
finally:
await asyncio.sleep(3)
async def main():
db = ErrorCase(ops_config)
task = asyncio.create_task(db.main_task())
for _ in range(3):
print(await db.query(QUERY))
# db.terminate.set()
task.cancel()
await task
asyncio.run(main())
Running this code results in this traceback. Which is not odd, but it hangs there indefinitely.
None
None
None
Traceback (most recent call last):
File "/home/spencer/repos/local/python/ops_db/error_reproduce.py", line 42, in query_task
async with asyncssh.connect(**self._ssh_cred) as conn:
File "/home/spencer/.pyenv/versions/3.12.0/envs/test-12/lib/python3.12/site-packages/asyncssh/misc.py", line 274, in __aenter__
self._coro_result = await self._coro
^^^^^^^^^^^^^^^^
File "/home/spencer/.pyenv/versions/3.12.0/envs/test-12/lib/python3.12/site-packages/asyncssh/connection.py", line 8231, in connect
return await asyncio.wait_for(
^^^^^^^^^^^^^^^^^^^^^^^
File "/home/spencer/.pyenv/versions/3.12.0/lib/python3.12/asyncio/tasks.py", line 510, in wait_for
return await fut
^^^^^^^^^
File "/home/spencer/.pyenv/versions/3.12.0/envs/test-12/lib/python3.12/site-packages/asyncssh/connection.py", line 441, in _connect
await options.waiter
asyncio.exceptions.CancelledError
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/spencer/repos/local/python/ops_db/error_reproduce.py", line 67, in main_task
await self.query_task()
File "/home/spencer/repos/local/python/ops_db/error_reproduce.py", line 46, in query_task
if listener is not None:
^^^^^^^^
UnboundLocalError: cannot access local variable 'listener' where it is not associated with a value
^CTraceback (most recent call last):
File "/home/spencer/repos/local/python/ops_db/error_reproduce.py", line 42, in query_task
async with asyncssh.connect(**self._ssh_cred) as conn:
File "/home/spencer/.pyenv/versions/3.12.0/envs/test-12/lib/python3.12/site-packages/asyncssh/misc.py", line 274, in __aenter__
self._coro_result = await self._coro
^^^^^^^^^^^^^^^^
File "/home/spencer/.pyenv/versions/3.12.0/envs/test-12/lib/python3.12/site-packages/asyncssh/connection.py", line 8231, in connect
return await asyncio.wait_for(
^^^^^^^^^^^^^^^^^^^^^^^
File "/home/spencer/.pyenv/versions/3.12.0/lib/python3.12/asyncio/tasks.py", line 510, in wait_for
return await fut
^^^^^^^^^
File "/home/spencer/.pyenv/versions/3.12.0/envs/test-12/lib/python3.12/site-packages/asyncssh/connection.py", line 441, in _connect
await options.waiter
asyncio.exceptions.CancelledError
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/spencer/repos/local/python/ops_db/error_reproduce.py", line 67, in main_task
await self.query_task()
File "/home/spencer/repos/local/python/ops_db/error_reproduce.py", line 46, in query_task
if listener is not None:
^^^^^^^^
UnboundLocalError: cannot access local variable 'listener' where it is not associated with a value
When I ran the debugger, the main_task is not cancelled, and still runs the loop.
I'm not sure this is an AsyncSSH problem.
Even though you are canceling main_task(), you have a catch-all "except Exception" in there, which I think will end up capturing the CancelledError from your task.cancel(), and that will prevent the task from being cancelled. Instead, it will go back up and check if self.terminate is set, but it won't be since you have that commented out.
As for the secondary error, that's because you are successfully cancelling the asyncssh.connect() call in query_task, and that causes it to attempt to run the "finally" block which references listener, but listener is only assigned in the case where the asyncssh.connect() succeeds. You should probably move the "try" to be after listener is assigned if you want to refer to it in the "finally" block, or you should assign its initial value to be None prior to the "try".
I haven't tested it, but if you put in an except CancelledError in main_task which raises the error to prevent the except Exception from capturing it, I think it could fix the problem. Alternately, add a raise after print_exc() in the except Exception block, or explicitly break out of the loop when you see a CancelledError.
Thanks for the swift reply, here are a few comments
Even though you are canceling main_task(), you have a catch-all "except Exception" in there, which I think will end up capturing the CancelledError from your
task.cancel(), and that will prevent the task from being cancelled. Instead, it will go back up and check ifself.terminateis set, but it won't be since you have that commented out.
From what I know, this used to be the early behavior of Exception class, and now asyncio.CancelledError is categorized differently. To add up, changing the main_task like this does not change the behavior.
async def main_task(self):
while not self.terminate.is_set():
try:
await self.query_task()
except ZeroDivisionError:
pass
except asyncio.CancelledError:
raise
except Exception:
print_exc()
finally:
await asyncio.sleep(3)
As for the secondary error, that's because you are successfully cancelling the
asyncssh.connect()call inquery_task, and that causes it to attempt to run the "finally" block which referenceslistener, butlisteneris only assigned in the case where theasyncssh.connect()succeeds. You should probably move the "try" to be after listener is assigned if you want to refer to it in the "finally" block, or you should assign its initial value to beNoneprior to the "try".
Omitting listener initialization was my mistake in the beginning, and I fixed it in my main code that I'm using. I left it there because it could reproduce the problem. Even with the unbound local variable mistake, the task should be able to cancel when it was asked to cancel, or at least that is what I think.
Sorry - it looks like I missed your last post here. Were you able to resolve this issue?