anyio icon indicating copy to clipboard operation
anyio copied to clipboard

Write guide for porting asyncio libraries to AnyIO

Open agronholm opened this issue 3 years ago • 13 comments

This should cover at least the following:

  • Migrating from orphan tasks to structured concurrency
  • Migrating from Transports and Protocols to streams
  • Migrating from Queues to memory object streams
  • Dealing with the lack of Event.clear()

agronholm avatar Apr 07 '21 20:04 agronholm

Dealing with the lack of Event.clear()

Out of all the things this is actually what I am having the most issues with. I had to change the structure of the ratelimit system.

See I had a locking mechanism for rate limiting. This was essentially a weakly referenced dictionary where locks were lazily created.

When the locks were created they were given an event that was always set and before acquiring the underlying lock they would .wait() this event. This event represented a global lock. When a global ratelimit was hit this would be cleared, and then set when it was over.

Essentially it's an inverted event. But without the ability to clear it of course couldn't work.

Bluenix2 avatar Oct 23 '21 16:10 Bluenix2

I second the need for such a document, and would like to add the suggestion for discussing issues coming from

I have difficulties to convert code that uses asyncio's asyncio.Future() future objects, which can be manually created, started, cancelled, completed and failed and async awaited.

I would need a pattern that allows me to interface anyio with existing code, where the beginning and end of an async operation is in different threads. I know anyio returns a concurrent.futures.Future for tasks, which is similar, but I would need a pseudo-task that is not actually running in anyio, only monitored for completion/result, cancellation or exceptions. This is because the real operation is happening in separately managed threads.
I'm not sure whether this is at all compatible with anyio's style, but manually created futures are really useful when interfacing with low-level libraries (think compiled ones, where you cannot yourself change how they work). If this is not possible with anyio, an alternative approach should be documented.

The concept of a user managed future construct is also in other languages, e.g. c.f. C#'s TaskCompletionSource<TResult>, meaning that people coming from those languages would likely also be interested in finding a substitute when they start to use anyio.

ckuhlmann avatar Apr 29 '22 14:04 ckuhlmann

If you could give me a concrete example of a task you're struggling with, I could possible offer some suggestions as for how to do things in the "AnyIO way".

agronholm avatar Apr 29 '22 14:04 agronholm

Hi, thanks for the quick response and the offer to help.

I have posted the problem at stackoverflow Converting code using asyncio.Future futures to anyio. I'll admit that I'm not too experienced with either asyncio or anyio, so I'm sorry if this is kind of a stupid approach to create an async interface. However, I don't think I can get rid of the two manually managed threads, as they do hardware coordination necessary for the connection that must not be interrupted and has to be coordinated between them.

ckuhlmann avatar Apr 29 '22 15:04 ckuhlmann

I think it would be better to use tasks instead of long running threads, and only use worker threads briefly when doing something that requires blocking I/O. At least nothing in that SO post seems to actually require threads so I'm wondering what they're really needed for.

agronholm avatar Apr 29 '22 15:04 agronholm

They are long running e.g. because the connection requires keep-alive messages and hardware monitoring that is coordinated with send/receive actions. You are probably right that in hindsight individual tasks could also work. But since this is existing code that I'm trying to interface with, that choice for threads has been made a long time ago. I think this is not an uncommon situation when dealing with existing code, where people would like to use async libraries to make things easier when using the high-level interface, yet also deal with lots of legacy code that uses manually created threads.

ckuhlmann avatar Apr 29 '22 15:04 ckuhlmann

You are probably right that in hindsight individual tasks could also work. But since this is existing code that I'm trying to interface with, that choice for threads has been made a long time ago.

Well then the first step should be to convert each of these threads into a task. This basically involves painting the call path between the task's run method and whichever code ultimately blocks in recvfrom (or similar calls) with async and await code words.

I'm serious. Threads are annoying. You can't stop a thread without closing the file descriptor it's waiting for (yes there are workarounds involving using another file descriptor, poll, and so on, but that is anything but programmer-friendly code and I can almost guarantee that there are error conditions which will cause it to deadlock). You need to deal with implicit concurrency and locking your data structures and all that, for no good reason. And so on.

While you're doing that, rewrite your keepalive handling. or rather, remove it, because that job is absurdly easy with anyio. Let me show you how.

Sending: you create a separate sender task which you talk to using a memory stream. The sending task basically does

while True:
    try:
        with anyio.fail_after(30):
            msg = await sendq.receive()
    except TimeoutError:
        await send_keepalive(socket)
    else:
        await socket.send(msg)

The receiver task is even shorter:

while True:
    with fail_after(65):
        msg = await socket.receive()
    await handle_message(msg)

The two pages of convoluted timeout handling and keeping-track-of-keepalive-timing in your legacy code base can now simply be deleted. I've been there, multiple times.

smurfix avatar Apr 29 '22 16:04 smurfix

Thank you both for taking the time to point out a better way to do it. I want to stop hijacking this issue with my specific use case, but I believe that the answers show that the original idea of a pattern document is a good one, even if it is just pointers to doing things differently. I'm not certain I have the time to walk that road in this case, but I agree with you that it would pay off in the long run. Having at a lot of working code to rewrite, it's not always easy to justify to change the complete architecture. Just a quick last question: is anyio.create_memory_object_stream() thread safe? If I want to redo that code, I need an intermediate solution for some basic functionality and having a stream interface between the new tasks and the old code would help to test against the old and new backend with the same high level anyio code. Thanks again.

ckuhlmann avatar Apr 29 '22 18:04 ckuhlmann

AnyIO's memory object streams might work here. You just need to use from_thread.run() in the threads when sending something down the pipes.

agronholm avatar Apr 29 '22 18:04 agronholm

I had the same idea when I saw that feature. Are the streams obtained from anyio.create_memory_object_stream() thread safe?

Edit: never mind the question, i just realized that you mentioned from_thread.run ...

ckuhlmann avatar Apr 29 '22 18:04 ckuhlmann

How can I change the following code to be anyio @agronholm

import asyncio
from typing import Coroutine
from tortoise import Toirtoise

def run_async(coro: Coroutine) -> None:
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(coro)
    finally:
        loop.run_until_complete(Tortoise.close_connections())

https://github.com/tortoise/tortoise-orm/issues/1119

waketzheng avatar May 13 '22 09:05 waketzheng

You should open a separate discussion (not an issue) about that.

agronholm avatar May 13 '22 09:05 agronholm

Well, it's a good example for a guide anyway.

The anyio-ish solution is

async def close_when_done(main):
    try:
        await main()
    finally:
        await Tortoise.close_connections()
anyio.run(close_when_done, your_main_code)

asyncio's standard calling convention would look like asyncio.run(close_when_done, your_main_code()), and just write await main in the body. anyio and trio don't do that, for multiple reasons.

smurfix avatar May 13 '22 10:05 smurfix