async-timeout
async-timeout copied to clipboard
aiohttp swallows asyncio.CancelledError during connection timeout
Describe the bug
There is a race condition in code that handles connection timeout. If you call cancel on a task that is currently pending in create_connection and connection timeout was already fired then asyncio.CancelledError is not propagated and you get asyncio.TimeoutError instead. The main problem is in how timeouts are handled in async_timeout package. When exitting the context manager after timeout had passed all CancelledError exceptions are swallowed and TimeoutError is raised instead. Unfortunately this is true also if you explicitly cancel the task yourself.
The main problem is that you cannot cancel a task that is using aiohttp because you never know if CancelledError will be raised.
To Reproduce
EDIT: THIS REPRODUCER DOES NOT SHOW THE BEHAVIOUR CORRECTLY - PLEASE REFER TO COMMENTS BELLOW!
import asyncio
from async_timeout import timeout
async def test_task():
with timeout(1):
await asyncio.sleep(10)
async def main():
t = asyncio.create_task(test_task())
await asyncio.sleep(2)
t.cancel()
try:
await t
except asyncio.TimeoutError:
print("Raised TimeoutError")
except asyncio.CancelledError:
print("Raised CancelledError")
asyncio.run(main())
Expected behavior
asyncio.CancelledError should never be suppressed when you cancel the task explicitly.
Logs/tracebacks
---
Python Version
Python 3.8.10
aiohttp Version
3.7.4.post0
multidict Version
4.7.6
yarl Version
1.6.0
OS
Linux
Related component
Client
Additional context
No response
Code of Conduct
- [x] I agree to follow the aio-libs Code of Conduct
Firstly, this is not the async-timeout repo (you just linked to it). Secondly, the timeout happens after 1 second, your cancellation happens after 2 seconds. So, the timeout has already happened and the behaviour looks correct to me. If I change 1 -> 5 I get a CancelledError as expected.
I noticed this problem when using aiohttp so this is the reason I created the bug here. I understand your point, however how are you supposed to cancel task that is running aiohttp code when it can swallow the CancelledError exception?
I'm not sure what you mean, nothing was swallowed? The task was cancelled by the timeout, your t.cancel() is a noop as the task was already cancelled.
If you can demonstrate the confusing behaviour in aiohttp, or explain where it happens, then maybe we can help..
Imagine you have a retry logic that retries the request when they fail. Something along these lines:
async def worker(cl: aiohttp.ClientSession):
while True:
try:
r = await cl.get(..., timeout=...)
except (asyncio.TimeoutError, OSError):
await asyncio.sleep(10)
else:
do_something(r)
If you want to cancel this task, there is a risk that it will never exit.
Right, so this would better show the problem, right?
import asyncio
from async_timeout import timeout
async def test_task():
while True:
print("RETRY")
try:
with timeout(1):
await asyncio.sleep(10)
except asyncio.TimeoutError:
pass
async def main():
t = asyncio.create_task(test_task())
await asyncio.sleep(2)
t.cancel()
await t
asyncio.run(main())
This will sometimes run forever without cancelling the task.
Let me think about it....
The main problem is that these two operations are not atomic:
- task cancellation done by the timeout trigger task spawned in async-timeout
- raising TimeoutError in timeout context manager
You can actually squeeze an explicit Task.cancel() operation between them and thus loose the error. Now I realize my first reproducer was not really showing what I meant.
Right, so this would better show the problem, right?
import asyncio from async_timeout import timeout async def test_task(): while True: print("RETRY") try: with timeout(1): await asyncio.sleep(10) except asyncio.TimeoutError: pass async def main(): t = asyncio.create_task(test_task()) await asyncio.sleep(2) t.cancel() await t asyncio.run(main())Let me think about it....
Your example would not cause the problem. It is really hard to create good reproducer for it. It is highly dependent on timing and I am getting the issue in about 5 cancellations in 1000.
I get about 50/50 on that example whether the task cancels or sits in an infinite loop.
Oh ok, I was confused because for me it terminates every time, however when I added one print inside the loop, then it runs in infinite loop. So yeah - it is dependent on the timing, however it pretty much demonstrates what I had on mind.
As I wrote the problem is that the timeout triggered cancellation and the actual raise TimeoutError are done in two separate operations and not atomically. You can therefore call explicit Task.cancel() on your task and loose the Exception if you time the cancel wrong - (timeout cancellation task ran by the context manager has already triggered, however TimeoutError was not yet raised). The problem here is the asyncio context switch upon await inside the context manager.
It probably creates issues also if you sequence your explicit cancellation right before the timeout firing as well. During high contention I get errors pretty much every run even when not trying to reproduce the issue.
As I thought it works also the opposite way (sequencing explicit cancellation right before firing the timeout):
import asyncio
from async_timeout import timeout
t = None
async def test_task2():
try:
with timeout(1):
print("Waiting for timeout")
await asyncio.sleep(10)
except:
print("Done waiting for timeout")
raise
async def test_task1():
print("Starting explicit cancellation task")
await asyncio.sleep(1)
print("Explicitly cancelling the task")
t.cancel()
async def main():
asyncio.create_task(test_task1())
global t
t = asyncio.create_task(test_task2())
await asyncio.sleep(3)
try:
await t
except BaseException as e:
print(f"{type(e).__name__}({e})")
asyncio.run(main())
Produces this output for me:
Starting explicit cancellation task
Waiting for timeout
Explicitly cancelling the task
Done waiting for timeout
TimeoutError()
Clearly explicit cancellation was done before the timeout was fired yet it still raised TimeoutError.
If you try to change the sleep in task1 to 0.9 then you get CancelledError because asyncio gets chance to schedule task2 before the timeout is fired and thus can raise the exception early before overwriting it to TimeoutError
OK, previous examples make it clear why it's a problem. This is a minimal reproducer that can be used as a test. If you add a print to Timeout._on_timeout(), you'll see that the cancel happens before the timeout, but the exit condition will still override this to a cancelled error:
async def test_task(deadline, loop):
print("TRY")
with Timeout(deadline, loop):
await asyncio.sleep(10)
async def main():
loop = asyncio.get_running_loop()
deadline = loop.time() + 1
t = asyncio.create_task(test_task(deadline, loop))
def cancel():
print("CANCEL")
t.cancel()
loop.call_at(deadline, cancel)
await asyncio.sleep(2)
await t
This is rather tricky to resolve as I just don't see any information being available to figure out where the cancellations came from (or how many there were).
Playing with private variables, I can get the previous example to work by changing Timeout._on_timeout() to this:
if task._fut_waiter and task._fut_waiter.cancelled():
return
task.cancel()
self._state = _State.TIMEOUT
But, if the cancel happens between the timeout and the exit, I can't see any way to tell that the task has been cancelled a second time, which I can reproduce with this example:
async def test_task(deadline, loop):
print("TRY")
with Timeout(deadline, loop):
await asyncio.sleep(10)
async def main():
loop = asyncio.get_running_loop()
deadline = loop.time() + 1
t = asyncio.create_task(test_task(deadline, loop))
def cancel():
print("CANCEL")
t.cancel()
loop.call_at(deadline+.000001, cancel)
await asyncio.sleep(2)
await t
Since python 3.9 you could change task.cancel() in Timeout._on_timeout() to:
task.cancel(msg="async_timeout._on_cancel")
and then catch it in Timeout._do_exit() with:
self._task._cancel_message
I tried it and in case you call explicit Task.cancel() without msg argument then self._task._cancel_message returns None so you can detect that cancel was called after timeout.
I am not sure though that _cancel_message will be propagated correctly every time - looking at Task.cancel() it seems it might be set on _fut_waiter only in some cases. And unfortunately it is python 3.9 only.
Well, if you fancy playing with it on my branch at #230, and see if you can get it working and tests passing, that would be great. Better to have a fix for 3.9+ than no fix at all.
You can wrap the code with if sys.version_info >= (3, 9): or similar.
#230 uses hacky access to private Task attribute.
The following PR will provide a fix proposed by https://github.com/aio-libs/async-timeout/issues/229#issuecomment-908671359
Also, the fix only fixes half the problem currently.
i.e. It works when the flow of execution is: -> t.cancel() -> Timeout._on_timeout() -> Timeout.exit()
But will still fail (as per the failing test) when it's like: -> Timeout._on_timeout() -> t.cancel() -> Timeout.exit()
The second case requires some way to recognise that 2 cancellations have happened, which I don't think is possible with the current implementation of asyncio.Task. So, maybe it requires a change in asyncio before there's any way for us to fix it here...
@Dreamsorcerer in case you call cancel multiple times before the Task you want to cancel gets scheduled then _cancel_message stores the last message you called cancel with - So fix from https://github.com/aio-libs/async-timeout/issues/229#issuecomment-908671359 should work. Albeit only for Python 3.9...
By the way official asyncio.wait_for function has the same problem that it can swallow the CancelledError exception during a race condition - see this line. I start to doubt if I should rely on CancelledError propagation at all or rather implement some other custom cancellation mechanism - e.g.: Create select function via Future.add_done_callback (Similar to how asyncio.wait works) and then pass a cancellation future to every async function to allow cancellation during await operations.
In case you are interested in reproducer how asyncio.wait_for is broken as well see this:
import asyncio
SLEEP_TIME = 1
async def test_task():
# Sleep with wait_for (timeout always larger than sleep time)
try:
t = asyncio.create_task(asyncio.sleep(SLEEP_TIME))
try:
await asyncio.wait_for(t, timeout=SLEEP_TIME+10)
except asyncio.TimeoutError:
print("Sleep 1 timeouted")
else:
print("Sleep 1 finished")
except asyncio.CancelledError:
print("Sleep 1 was cancelled")
# Sleep again in case the cancel missed the first sleep
try:
await asyncio.sleep(5)
except asyncio.CancelledError:
print("Sleep 2 was cancelled")
else:
print("Sleep 2 finished")
async def main():
t = asyncio.create_task(test_task())
await asyncio.sleep(SLEEP_TIME)
# This cancel should happen before wait_for exits its waiter but after first sleep task is done
t.cancel()
print("test_task was cancelled")
await t
asyncio.run(main())
I would expect to see CancelledError to raise either from first or the second sleep, however it never raises instead and produces this output:
test_task was cancelled
Sleep 1 finished
Sleep 2 finished
So how do you cancel a task reliably? Should I check some variable after each await to see if the task got cancelled? What is then the purpose of the CancelledError anyway?
This appears to already be reported: https://bugs.python.org/issue42130 Looks like it could be fixed in a similar way by adding an identifier to the cancellation, so it only ignores its own cancellations, rather than external ones.
@Dreamsorcerer in case you call cancel multiple times before the Task you want to cancel gets scheduled then
_cancel_messagestores the last message you called cancel with
Nope, it stores the first one. The code shortcuts in future calls and doesn't update it: https://github.com/python/cpython/blob/main/Lib/asyncio/tasks.py#L209
No matter what I do, I can't seem to figure out a way to detect the latter case. Again, feel free to expand on #235 if you can figure anything out.
In every case I see the same results as for a normal timeout. i.e. I can't tell the difference between these flows:
Timeout._on_timeout()
Timeout.__exit__()
and
Timeout._on_timeout()
t.cancel()
Timeout.__exit__()
Both end up with the sentinel in the exception and None for _cancel_message.
It seems the t.cancel() does not propagate any information at all. Following the code, I think the only way I might be able to tell is by looking at self._task._fut_waiter._cancel_message, however fut_waiter has become None by the time __exit__() is called.
After experimenting, I have a conclusion that is very close to @Dreamsorcerer results: the solution doesn't work well.
#237 provides an alternative approach: on_timeout event schedules the task cancellation on the next event loop iteration (loop.call_soon()). It guarantees that any task.cancel() call on the current loop iteration takes a priority over cancellation-by-timeout.
The approach has the obvious drawback: raising TimeoutError requires one more loop iteration.
I think it's ok: happy-path has the same performance, only the timeout case works slower. I believe there is no source code that expects the timeout as a normal (fast) path; a little slower reaction is acceptable.
In case you are interested in reproducer how
asyncio.wait_foris broken as well see this:
Not sure why I'm spending all this time on it, but I've fixed wait_for() for you as well:
https://github.com/python/cpython/pull/28149
Sorry, I'm pretty busy these days. Will return to the issue in a few days on this week.
Sorry, I'm pretty busy these days. Will return to the issue in a few days on this week.
Of course. To summarise the current state:
- I think #237 is probably wrong. It changes the timing in order to pass the first test, but I don't think it actually fixes the issue.
- I think you managed to trick the second test into passing on your machine by making the delay too small. To ensure the tests are running correctly I've checked the call order as part of the tests now (#239). If you rerun with these tests on your machine, it should never pass as we don't have a fix.
- I've added the better approach in #235 for Python 3.9+, but this will still fail the second case.
- To fix the second case, I believe this needs to be added to asyncio in cpython: https://bugs.python.org/issue45098
Found this issue debugging a weird memory leak in a code using aiohttp. It is not easy to to reproduce but when you set connect and sock_read timeouts in a ClientTimeout structure you periodically get this nasty leak that eats all the ram in 10 seconds. Had to debug it in a limited by memory docker container. Well. If I found this issue, it means I was on the right way and need to wait for a fix.
Found this issue debugging a weird memory leak in a code using aiohttp. It is not easy to to reproduce but when you set
connectandsock_readtimeouts in aClientTimeoutstructure you periodically get this nasty leak that eats all the ram in 10 seconds. Had to debug it in a limited by memory docker container. Well. If I found this issue, it means I was on the right way and need to wait for a fix.
You can check if you are leaking tasks by running asyncio.Task.all_tasks() in a loop. You can also check where all the tasks currently block this way.
@Dreamsorcerer thank you for https://github.com/python/cpython/pull/28149 . In the meantime I created a workaround wait function that does not cancel the task implicitly like wait_for but rather just return after the timeout and lets you cancel the task and handle everything explicitly.
Kind of makes you think if the approach that Go took, where timeouts and deadlines are the responsibility of the running coroutine and not the event loop, isn't better. Sure, it places the burden of implementation on the library side and requires you to pass context variable to every function you want to be able to cancel or timeout, however the implementation seems much more explicit to me and harder to get wrong. As a bonus you can decide to not run some code (e.g. TCP connection) at all if you realize from the context that there is not enough time left for that. After all we are talking cooperative multitasking so why does Python implement cancel() in preemptive way?