async-timeout icon indicating copy to clipboard operation
async-timeout copied to clipboard

aiohttp swallows asyncio.CancelledError during connection timeout

Open ssigut opened this issue 4 years ago • 33 comments

Describe the bug

There is a race condition in code that handles connection timeout. If you call cancel on a task that is currently pending in create_connection and connection timeout was already fired then asyncio.CancelledError is not propagated and you get asyncio.TimeoutError instead. The main problem is in how timeouts are handled in async_timeout package. When exitting the context manager after timeout had passed all CancelledError exceptions are swallowed and TimeoutError is raised instead. Unfortunately this is true also if you explicitly cancel the task yourself.

The main problem is that you cannot cancel a task that is using aiohttp because you never know if CancelledError will be raised.

To Reproduce

EDIT: THIS REPRODUCER DOES NOT SHOW THE BEHAVIOUR CORRECTLY - PLEASE REFER TO COMMENTS BELLOW!

import asyncio
from async_timeout import timeout


async def test_task():
    with timeout(1):
        await asyncio.sleep(10)


async def main():
    t = asyncio.create_task(test_task())
    await asyncio.sleep(2)
    t.cancel()
    try:
        await t
    except asyncio.TimeoutError:
        print("Raised TimeoutError")
    except asyncio.CancelledError:
        print("Raised CancelledError")

asyncio.run(main())

Expected behavior

asyncio.CancelledError should never be suppressed when you cancel the task explicitly.

Logs/tracebacks

---

Python Version

Python 3.8.10

aiohttp Version

3.7.4.post0

multidict Version

4.7.6

yarl Version

1.6.0

OS

Linux

Related component

Client

Additional context

No response

Code of Conduct

  • [x] I agree to follow the aio-libs Code of Conduct

ssigut avatar Aug 30 '21 16:08 ssigut

Firstly, this is not the async-timeout repo (you just linked to it). Secondly, the timeout happens after 1 second, your cancellation happens after 2 seconds. So, the timeout has already happened and the behaviour looks correct to me. If I change 1 -> 5 I get a CancelledError as expected.

Dreamsorcerer avatar Aug 30 '21 16:08 Dreamsorcerer

I noticed this problem when using aiohttp so this is the reason I created the bug here. I understand your point, however how are you supposed to cancel task that is running aiohttp code when it can swallow the CancelledError exception?

ssigut avatar Aug 30 '21 16:08 ssigut

I'm not sure what you mean, nothing was swallowed? The task was cancelled by the timeout, your t.cancel() is a noop as the task was already cancelled.

If you can demonstrate the confusing behaviour in aiohttp, or explain where it happens, then maybe we can help..

Dreamsorcerer avatar Aug 30 '21 16:08 Dreamsorcerer

Imagine you have a retry logic that retries the request when they fail. Something along these lines:

async def worker(cl: aiohttp.ClientSession):
    while True:
        try:
            r = await cl.get(..., timeout=...)
        except (asyncio.TimeoutError, OSError):
            await asyncio.sleep(10)
        else:
            do_something(r)

If you want to cancel this task, there is a risk that it will never exit.

ssigut avatar Aug 30 '21 16:08 ssigut

Right, so this would better show the problem, right?

import asyncio
from async_timeout import timeout


async def test_task():
    while True:
        print("RETRY")
        try:
            with timeout(1):
                await asyncio.sleep(10)
        except asyncio.TimeoutError:
            pass


async def main():
    t = asyncio.create_task(test_task())
    await asyncio.sleep(2)
    t.cancel()
    await t

asyncio.run(main())

This will sometimes run forever without cancelling the task.

Let me think about it....

Dreamsorcerer avatar Aug 30 '21 16:08 Dreamsorcerer

The main problem is that these two operations are not atomic:

  • task cancellation done by the timeout trigger task spawned in async-timeout
  • raising TimeoutError in timeout context manager

You can actually squeeze an explicit Task.cancel() operation between them and thus loose the error. Now I realize my first reproducer was not really showing what I meant.

ssigut avatar Aug 30 '21 16:08 ssigut

Right, so this would better show the problem, right?

import asyncio
from async_timeout import timeout


async def test_task():
    while True:
        print("RETRY")
        try:
            with timeout(1):
                await asyncio.sleep(10)
        except asyncio.TimeoutError:
            pass


async def main():
    t = asyncio.create_task(test_task())
    await asyncio.sleep(2)
    t.cancel()
    await t

asyncio.run(main())

Let me think about it....

Your example would not cause the problem. It is really hard to create good reproducer for it. It is highly dependent on timing and I am getting the issue in about 5 cancellations in 1000.

ssigut avatar Aug 30 '21 16:08 ssigut

I get about 50/50 on that example whether the task cancels or sits in an infinite loop.

Dreamsorcerer avatar Aug 30 '21 16:08 Dreamsorcerer

Oh ok, I was confused because for me it terminates every time, however when I added one print inside the loop, then it runs in infinite loop. So yeah - it is dependent on the timing, however it pretty much demonstrates what I had on mind.

ssigut avatar Aug 30 '21 16:08 ssigut

As I wrote the problem is that the timeout triggered cancellation and the actual raise TimeoutError are done in two separate operations and not atomically. You can therefore call explicit Task.cancel() on your task and loose the Exception if you time the cancel wrong - (timeout cancellation task ran by the context manager has already triggered, however TimeoutError was not yet raised). The problem here is the asyncio context switch upon await inside the context manager.

It probably creates issues also if you sequence your explicit cancellation right before the timeout firing as well. During high contention I get errors pretty much every run even when not trying to reproduce the issue.

ssigut avatar Aug 30 '21 17:08 ssigut

As I thought it works also the opposite way (sequencing explicit cancellation right before firing the timeout):

import asyncio
from async_timeout import timeout

t = None

async def test_task2():
    try:
        with timeout(1):
            print("Waiting for timeout")
            await asyncio.sleep(10)
    except:
        print("Done waiting for timeout")
        raise

async def test_task1():
    print("Starting explicit cancellation task")
    await asyncio.sleep(1)
    print("Explicitly cancelling the task")
    t.cancel()


async def main():
    asyncio.create_task(test_task1())
    global t
    t = asyncio.create_task(test_task2())
    await asyncio.sleep(3)
    try:
        await t
    except BaseException as e:
        print(f"{type(e).__name__}({e})")

asyncio.run(main())

Produces this output for me:

Starting explicit cancellation task
Waiting for timeout
Explicitly cancelling the task
Done waiting for timeout
TimeoutError()

Clearly explicit cancellation was done before the timeout was fired yet it still raised TimeoutError.

If you try to change the sleep in task1 to 0.9 then you get CancelledError because asyncio gets chance to schedule task2 before the timeout is fired and thus can raise the exception early before overwriting it to TimeoutError

ssigut avatar Aug 30 '21 17:08 ssigut

OK, previous examples make it clear why it's a problem. This is a minimal reproducer that can be used as a test. If you add a print to Timeout._on_timeout(), you'll see that the cancel happens before the timeout, but the exit condition will still override this to a cancelled error:

async def test_task(deadline, loop):
    print("TRY")
    with Timeout(deadline, loop):
        await asyncio.sleep(10)


async def main():
    loop = asyncio.get_running_loop()
    deadline = loop.time() + 1
    t = asyncio.create_task(test_task(deadline, loop))

    def cancel():
        print("CANCEL")
        t.cancel()

    loop.call_at(deadline, cancel)
    await asyncio.sleep(2)
    await t

Dreamsorcerer avatar Aug 30 '21 19:08 Dreamsorcerer

This is rather tricky to resolve as I just don't see any information being available to figure out where the cancellations came from (or how many there were).

Playing with private variables, I can get the previous example to work by changing Timeout._on_timeout() to this:

        if task._fut_waiter and task._fut_waiter.cancelled():
            return
        task.cancel()
        self._state = _State.TIMEOUT

But, if the cancel happens between the timeout and the exit, I can't see any way to tell that the task has been cancelled a second time, which I can reproduce with this example:

async def test_task(deadline, loop):
    print("TRY")
    with Timeout(deadline, loop):
        await asyncio.sleep(10)


async def main():
    loop = asyncio.get_running_loop()
    deadline = loop.time() + 1
    t = asyncio.create_task(test_task(deadline, loop))

    def cancel():
        print("CANCEL")
        t.cancel()

    loop.call_at(deadline+.000001, cancel)
    await asyncio.sleep(2)
    await t

Dreamsorcerer avatar Aug 30 '21 19:08 Dreamsorcerer

Since python 3.9 you could change task.cancel() in Timeout._on_timeout() to:

task.cancel(msg="async_timeout._on_cancel")

and then catch it in Timeout._do_exit() with:

self._task._cancel_message

I tried it and in case you call explicit Task.cancel() without msg argument then self._task._cancel_message returns None so you can detect that cancel was called after timeout.

I am not sure though that _cancel_message will be propagated correctly every time - looking at Task.cancel() it seems it might be set on _fut_waiter only in some cases. And unfortunately it is python 3.9 only.

ssigut avatar Aug 30 '21 20:08 ssigut

Well, if you fancy playing with it on my branch at #230, and see if you can get it working and tests passing, that would be great. Better to have a fix for 3.9+ than no fix at all.

You can wrap the code with if sys.version_info >= (3, 9): or similar.

Dreamsorcerer avatar Aug 30 '21 20:08 Dreamsorcerer

#230 uses hacky access to private Task attribute. The following PR will provide a fix proposed by https://github.com/aio-libs/async-timeout/issues/229#issuecomment-908671359

asvetlov avatar Sep 03 '21 12:09 asvetlov

Also, the fix only fixes half the problem currently.

i.e. It works when the flow of execution is: -> t.cancel() -> Timeout._on_timeout() -> Timeout.exit()

But will still fail (as per the failing test) when it's like: -> Timeout._on_timeout() -> t.cancel() -> Timeout.exit()

Dreamsorcerer avatar Sep 03 '21 12:09 Dreamsorcerer

The second case requires some way to recognise that 2 cancellations have happened, which I don't think is possible with the current implementation of asyncio.Task. So, maybe it requires a change in asyncio before there's any way for us to fix it here...

Dreamsorcerer avatar Sep 03 '21 13:09 Dreamsorcerer

@Dreamsorcerer in case you call cancel multiple times before the Task you want to cancel gets scheduled then _cancel_message stores the last message you called cancel with - So fix from https://github.com/aio-libs/async-timeout/issues/229#issuecomment-908671359 should work. Albeit only for Python 3.9...

By the way official asyncio.wait_for function has the same problem that it can swallow the CancelledError exception during a race condition - see this line. I start to doubt if I should rely on CancelledError propagation at all or rather implement some other custom cancellation mechanism - e.g.: Create select function via Future.add_done_callback (Similar to how asyncio.wait works) and then pass a cancellation future to every async function to allow cancellation during await operations.

ssigut avatar Sep 03 '21 14:09 ssigut

In case you are interested in reproducer how asyncio.wait_for is broken as well see this:

import asyncio

SLEEP_TIME = 1

async def test_task():
    # Sleep with wait_for (timeout always larger than sleep time)
    try:
        t = asyncio.create_task(asyncio.sleep(SLEEP_TIME))
        try:
            await asyncio.wait_for(t, timeout=SLEEP_TIME+10)
        except asyncio.TimeoutError:
            print("Sleep 1 timeouted")
        else:
            print("Sleep 1 finished")
    except asyncio.CancelledError:
        print("Sleep 1 was cancelled")

    # Sleep again in case the cancel missed the first sleep
    try:
        await asyncio.sleep(5)
    except asyncio.CancelledError:
        print("Sleep 2 was cancelled")
    else:
        print("Sleep 2 finished")

async def main():
    t = asyncio.create_task(test_task())
    await asyncio.sleep(SLEEP_TIME)
    # This cancel should happen before wait_for exits its waiter but after first sleep task is done
    t.cancel()
    print("test_task was cancelled")
    await t


asyncio.run(main())

I would expect to see CancelledError to raise either from first or the second sleep, however it never raises instead and produces this output:

test_task was cancelled
Sleep 1 finished
Sleep 2 finished

So how do you cancel a task reliably? Should I check some variable after each await to see if the task got cancelled? What is then the purpose of the CancelledError anyway?

ssigut avatar Sep 03 '21 14:09 ssigut

This appears to already be reported: https://bugs.python.org/issue42130 Looks like it could be fixed in a similar way by adding an identifier to the cancellation, so it only ignores its own cancellations, rather than external ones.

Dreamsorcerer avatar Sep 03 '21 16:09 Dreamsorcerer

@Dreamsorcerer in case you call cancel multiple times before the Task you want to cancel gets scheduled then _cancel_message stores the last message you called cancel with

Nope, it stores the first one. The code shortcuts in future calls and doesn't update it: https://github.com/python/cpython/blob/main/Lib/asyncio/tasks.py#L209

Dreamsorcerer avatar Sep 03 '21 20:09 Dreamsorcerer

No matter what I do, I can't seem to figure out a way to detect the latter case. Again, feel free to expand on #235 if you can figure anything out.

Dreamsorcerer avatar Sep 03 '21 20:09 Dreamsorcerer

In every case I see the same results as for a normal timeout. i.e. I can't tell the difference between these flows:

Timeout._on_timeout()
Timeout.__exit__()

and

Timeout._on_timeout()
t.cancel()
Timeout.__exit__()

Both end up with the sentinel in the exception and None for _cancel_message.

It seems the t.cancel() does not propagate any information at all. Following the code, I think the only way I might be able to tell is by looking at self._task._fut_waiter._cancel_message, however fut_waiter has become None by the time __exit__() is called.

Dreamsorcerer avatar Sep 03 '21 20:09 Dreamsorcerer

After experimenting, I have a conclusion that is very close to @Dreamsorcerer results: the solution doesn't work well.

#237 provides an alternative approach: on_timeout event schedules the task cancellation on the next event loop iteration (loop.call_soon()). It guarantees that any task.cancel() call on the current loop iteration takes a priority over cancellation-by-timeout.

The approach has the obvious drawback: raising TimeoutError requires one more loop iteration.

I think it's ok: happy-path has the same performance, only the timeout case works slower. I believe there is no source code that expects the timeout as a normal (fast) path; a little slower reaction is acceptable.

asvetlov avatar Sep 04 '21 07:09 asvetlov

In case you are interested in reproducer how asyncio.wait_for is broken as well see this:

Not sure why I'm spending all this time on it, but I've fixed wait_for() for you as well: https://github.com/python/cpython/pull/28149

Dreamsorcerer avatar Sep 04 '21 15:09 Dreamsorcerer

Sorry, I'm pretty busy these days. Will return to the issue in a few days on this week.

asvetlov avatar Sep 07 '21 09:09 asvetlov

Sorry, I'm pretty busy these days. Will return to the issue in a few days on this week.

Of course. To summarise the current state:

  • I think #237 is probably wrong. It changes the timing in order to pass the first test, but I don't think it actually fixes the issue.
  • I think you managed to trick the second test into passing on your machine by making the delay too small. To ensure the tests are running correctly I've checked the call order as part of the tests now (#239). If you rerun with these tests on your machine, it should never pass as we don't have a fix.
  • I've added the better approach in #235 for Python 3.9+, but this will still fail the second case.
  • To fix the second case, I believe this needs to be added to asyncio in cpython: https://bugs.python.org/issue45098

Dreamsorcerer avatar Sep 07 '21 20:09 Dreamsorcerer

Found this issue debugging a weird memory leak in a code using aiohttp. It is not easy to to reproduce but when you set connect and sock_read timeouts in a ClientTimeout structure you periodically get this nasty leak that eats all the ram in 10 seconds. Had to debug it in a limited by memory docker container. Well. If I found this issue, it means I was on the right way and need to wait for a fix.

VasiliPupkin256 avatar Dec 16 '21 23:12 VasiliPupkin256

Found this issue debugging a weird memory leak in a code using aiohttp. It is not easy to to reproduce but when you set connect and sock_read timeouts in a ClientTimeout structure you periodically get this nasty leak that eats all the ram in 10 seconds. Had to debug it in a limited by memory docker container. Well. If I found this issue, it means I was on the right way and need to wait for a fix.

You can check if you are leaking tasks by running asyncio.Task.all_tasks() in a loop. You can also check where all the tasks currently block this way.

@Dreamsorcerer thank you for https://github.com/python/cpython/pull/28149 . In the meantime I created a workaround wait function that does not cancel the task implicitly like wait_for but rather just return after the timeout and lets you cancel the task and handle everything explicitly.

Kind of makes you think if the approach that Go took, where timeouts and deadlines are the responsibility of the running coroutine and not the event loop, isn't better. Sure, it places the burden of implementation on the library side and requires you to pass context variable to every function you want to be able to cancel or timeout, however the implementation seems much more explicit to me and harder to get wrong. As a bonus you can decide to not run some code (e.g. TCP connection) at all if you realize from the context that there is not enough time left for that. After all we are talking cooperative multitasking so why does Python implement cancel() in preemptive way?

ssigut avatar Jan 31 '22 14:01 ssigut