Support for scoped cancellation/timeouts when calling into asyncio code
I'm using anyio with asyncio backend since I need some asyncio-based libs for which no anyio or trio equivalents of comparable quality exist yet. However, since asyncio code doesn't respect scoped cancellation/timeouts of anyio, calling into them is quite awkward and error-prone. I came up with the following generic utility to deal with this:
from __future__ import annotations
from typing import TypeVar, ParamSpec, Callable
from collections.abc import Awaitable
import asyncio
import anyio
_P = ParamSpec('_P')
_T = TypeVar('_T')
async def call_into_asyncio(
fn: Callable[_P, Awaitable[_T]],
/,
*args: _P.args, **kwargs: _P.kwargs,
) -> _T:
task = asyncio.create_task(fn(*args, **kwargs))
with anyio.CancelScope() as cancel_scope:
def cancel(_: asyncio.Task[_T]):
cancel_scope.cancel()
try:
task.add_done_callback(cancel)
try:
await anyio.sleep_forever()
finally:
task.remove_done_callback(cancel)
except anyio.get_cancelled_exc_class():
if not cancel_scope.cancel_called:
task.cancel()
raise
return await task
A call into asyncio code wrapped with call_into_asyncio will respect scoped cancellation and timeouts. It appears to work correctly for my purposes. Are there any edge cases or other issues with this approach that I'm missing? I'm surprised something like this doesn't already exists in anyio.
Sorry I missed this. Can you elaborate on what difficulties you are experiencing? Is level cancellation the issue?
I'm not sure this is related, but I have indeed planned a utility function to call into asyncio, but I'm not sure how to implement that with trio.
Sorry I missed this. Can you elaborate on what difficulties you are experiencing? Is level cancellation the issue?
I haven't actually tried exercising naive calls into asyncio thoroughly enough to run into any issues, I just assume something can go wrong due to this passage from the docs:
Tasks spawned by these “native” libraries on backends other than trio are not subject to the cancellation rules enforced by AnyIO.
Something likely related to level cancelation and scoped timeouts. Or did I misunderstand what that means? Now that I look at it again, it can indeed be interpreted as "tasks created via asyncio.create_task are not part of the anyio task tree" rather than "awaiting asyncio coroutine in timeout scope is not guaranteed to be cancelled when the timeout expires" as I initially thought.
I'm not sure this is related, but I have indeed planned a utility function to call into asyncio, but I'm not sure how to implement that with trio.
I'm talking more along the lines of calling into the current backend (whatever that may be) while preserving anyio semantics, rather than into asyncio specifically.
I'm talking more along the lines of calling into the current backend (whatever that may be) while preserving anyio semantics, rather than into asyncio specifically.
I think there is some sort of confusion here. If you're using asyncio libraries to spawn tasks, they will expect those tasks to adhere to the native asyncio cancellation rules, yes? Why would you want them to use anyio semantics instead? Did I understand something wrong?
I'm not spawning asyncio tasks, at least not directly. Let's consider a concrete example.
Suppose I want to use websockets, which is built directly on top of asyncio. Can I simply await websocket.send(data) from an anyio task, and expect timeout/cancellation scopes surrounding this call to work correctly, assuming that internally send could be spawning asyncio tasks and cleaning them up properly using asyncio idioms?
Depends. The biggest gotcha is whether it uses await while cleaning up (usually, within a finally block). That would be a problem because the task would adhere to AnyIO semantics (level cancellation) so any await would be hit with another cancellation exception unless it was within a shielded cancel scope (which it won't be in a native asyncio library). But, if it cancels the subtasks synchronously, it may not even matter. The trouble here is that such cleanup in libraries like websockets often involves network I/O which would then get cancelled before it gets a chance to be completed, potentially leading to subtle errors.
Right, so a utility is necessary that will convert level cancellation into edge cancellation, possibly by awaiting in a separate asyncio task as call_into_asyncio I propose above does.
Yeah, you need something extra for that. Your implementation seems a bit unnecessarily complex though – wouldn't it be enough to just await on the task, potentially cancelling it if the host gets cancelled?
My original motivation for doing it this way was to avoid awaiting on an incomplete asyncio task from anyio task because I'm not sure what the cancellation semantics of such a composition are supposed to be.
Won't doing as you suggest cause level cancellation within asyncio task, same as awaiting the wrapped coroutine directly?
I haven't verified that, but you could be right. I think the question becomes: what should the host task do after the initial cancellation? Should it wait for the child task to finish, in a shielded cancel scope? Or should the native task be orphaned?
asyncio doesn't like unretrieved task/future results either (at least when the results are exceptions), so I believe orphaning is not a good idea, at least not by default. If asyncio task decides to ignore cancellation, we'll be stuck waiting for it, but that's not something well-behaved asyncio code generally does.