trio icon indicating copy to clipboard operation
trio copied to clipboard

Non-blocking interface for trio.from_thread.run

Open vxgmichel opened this issue 4 years ago • 14 comments

At the moment, it doesn't seem possible to schedule a task to a trio loop from another thread and get the result later (i.e the trio.from_thread.run function blocks until the result is returned). In comparison, the similar asyncio function run_coroutine_threadsafe returns a concurrent.futures.Future.

For a bit of context, I wanted to implement a mechanism against deadlocks. For some reasons, a mistake in our trio application could end up in a sync call to the file system, which in turns might wake up a thread that tries to schedule a task in the trio loop that's currently blocked by the file system call, hence the deadlock.

If trio.from_thread.run were to return a concurrent.futures.Future, I could implement a deadlock detection like so:

def run_task(afn, args, kwargs, deadlock_timeout=1.0, task_timeout=None)
    event = threading.Event()

    async def wrapped():
        event.set()
        return await afn(*args, **kwargs)

    future = trio.from_thread.run(wrapped)
    if not event.wait(deadlock_timeout):
        raise RuntimeError("Deadlock detected")
    return future.result(timeout=task_timeout)

Another benefit would be an easy implementation of cancellation through future.cancel() or similar.

I couldn't find any related issue on the tracker, so I hope that makes sense :)

vxgmichel avatar Sep 02 '21 10:09 vxgmichel

Hmm. I think you're conflating two orthogonal tasks here:

  • detect a deadlock
  • add a timeout to a task

The latter is easy, of course:

def run_task(afn, args, kwargs, timeout=None)
    async def wrapped():
        with trio.fail_after(timeout):
            return await afn(*args, **kwargs)

    return trio.from_thread.run(wrapped)

For the former you could start a Trio task that loops sending a byte to a pipe every half second or so, and a thread that selects on that pipe and raises hell when there's a timeout.

import os,select
async def deadlock_detect(timeout=1.0):
    r,w = os.pipe()
    os.set_blocking(r,False)
    os.set_blocking(w,False)

    def sel():
        with select.epoll() as p:
            p.register(r, select.POLLIN)
            while True:
                res = p.poll(timeout)
                if not res:
                    raise TimeoutError("Deadlock!")
                    # how to raise an error *right now* instead of after the deadlock is resolved
                    # is left as an exercise to the reader
                if not os.read(r,1):
                    return

    async with trio.open_nursery() as n:
        n.start_soon(trio.to_thread.run_sync, sel)
        try:
            while True:
                await trio.sleep(timeout/2)
                os.write(w,b"X")
        finally:
            os.close(r)
            os.close(w)

Test program for the above:

async def main():
    import time
    async with trio.open_nursery() as n:
        n.start_soon(deadlock_detect)
        await trio.sleep(2)
        time.sleep(2)
        await trio.sleep(2)
        raise RuntimeError("Shouldn't get here")
trio.run(main)

smurfix avatar Sep 02 '21 10:09 smurfix

@smurfix

Hmm. I think you're conflating two orthogonal tasks here:

  • detect a deadlock
  • add a timeout to a task

I would argue that there are different use cases that unlock with an asynchronous API such as trio.from_thread.run returning a concurrent.futures.Future, those use cases being:

  • timeout management
  • running multiple tasks concurrently and getting the results early (using a helper like concurrent.futures.as_completed)
  • task cancellation
  • and exotic use cases such as the deadlock detection I mentioned

As you pointed out, there are other ways to approach those use cases. But I think this feature is worth considering since asyncio does provide it and that it has never been addressed on this tracker as far as I can tell.

For the former you could start a Trio task that loops sending a byte to a pipe every half second or so, and a thread that selects on that pipe and raises hell when there's a timeout.

That sadly would not work in our specific use case as detecting the deadlock is not enough, we specifically need to break out of it.

vxgmichel avatar Sep 02 '21 14:09 vxgmichel

different use cases that unlock with an asynchronous API such as Future

Yeah, that's one of the problems with asyncio's Futures. They conflate a heap of different usecases into one data structure.

Timeouts: use fail_after on the block that might fail.

Multiple tasks: use nursery.start.

Getting results early: use a memory queue, or an event plus a data field. The problem here is, what happens if there's not going to be a result and/or you no longer need it? There's no single answer that works for everybody, which is why Trio doesn't have a Future class.

Cancellation: use cancel on the scope (any number of tasks) that should end.

Deadlocks: well … you might want to use instrumentation to record which task is currently running. The problem is …

That sadly would not work in our specific use case as detecting the deadlock is not enough, we specifically need to break out of it.

… how could you possibly do that? You specifically asked for …

a mistake in our trio application could end up in a sync call to the file system

The thing is, you cannot send an interrupt to the Trio thread. All you can do, from another thread, is to kill off the whole program.

smurfix avatar Sep 02 '21 15:09 smurfix

Yeah, that's one of the problems with asyncio's Futures.

There seems to be a misunderstanding, I was actually talking about concurrent.futures.Future and not asyncio.Future. I fully agree that trio does not need the concept of future to operate. However, I think the future API is useful for another thread that does not run a trio loop but needs to schedule asynchronous tasks to an existing trio loop.

how could you possibly do that?

I went a bit fast on our use case because it's quite specific, but the idea is the following:

  • a trio application implements a virtual file system, exposed in the operating system through fuse (or equivalent)
  • due to some bug, the trio loop performs a sync call to the virtual file system (it's hard to keep track of all the file system access, especially in third party libraries)
  • the kernel receives the request and forwards it to the fuse driver
  • the fuse driver notifies the fuse operations thread, hence entering user space again.
  • the fuse operations thread wakes up, submits the corresponding task to trio through trio.from_thread.run and waits for the result so it can pass it back to the fuse driver
  • but obviously, the trio thread is already stuck in the file system call, hence the deadlock.
  • so what we need is a timeout in order to report back an error to the driver instead of locking everything in place
  • this way, the error propagates back to the trio thread and the sync call raises an exception that can then be logged

But since the trio loop is already stuck, we cannot rely on trio.fail_after (that would only be useful to prevent those trio tasks from taking too much time when the system is not deadlocked). The simplest way to implement this non-trio timeout would be to pass a timeout parameter here in the _run_fn_as_system_task function: https://github.com/python-trio/trio/blob/b387ef27f05c2f4628323f393bfdd45c57ea926a/trio/_threads.py#L238 like so:

return q.get(timeout=1.0).unwrap()

Except this timeout isn't a great solution either as it applies to the duration of the whole task which might legitimately take a lot of time. In fact we can know the system is not deadlocked much earlier, i.e as soon as our task starts running in the trio thread. Hence the implementation with the threading event in my first comment.

Note that it's also possible to re-write trio._threads._run_fn_as_system_task to add this deadlock detection (with the threading event), which is probably what I'll end up doing. It's a bit annoying because of the code duplication, but it's completely fine if you trio maintainers think that those specific use cases are not worth a more flexible API for trio.from_thread.run :)

Sorry for the long explanation, I hope it's not too confusing.

vxgmichel avatar Sep 02 '21 16:09 vxgmichel

OK, thanks for the explanation. In this case you can create a concurrent.futures.Future like this:

async def runner(f, proc, args, kw):
    try:
        f.set_result(await proc(*args,**kw))
    except Exception as exc:
        f.set_exception(exc)
    except BaseException:
        f.cancel()
        raise

def run_async_as_future(nursery, proc, *args, **kw):
    f = concurrent.futures.Future()
    trio.from_thread.run_sync(n.start_soon, runner, f, proc, args, kw)
    return f

Simply wrap your trio.to_thread.run_sync call with async with trio.open_nursery() as n: to pass a suitable nursery to your thread.

smurfix avatar Sep 02 '21 16:09 smurfix

(code edited to fix exception handling and stupid argument ordering problems; looks mostly sane now but untested)

smurfix avatar Sep 02 '21 18:09 smurfix

@smurfix

In this case you can create a concurrent.futures.Future like this: [...]

Great idea, it's easy to implement and covers most use cases I listed in my comment earlier (timeout management, running tasks concurrently, etc). Cancellation could also work by wrapping the task with a cancel scope and adding a done callback like so:

def _done_callback(future):
    if future.cancelled():
        trio.from_thread.run_sync(cancel_scope.cancel)
future.add_done_callback(_done_callback)

However, this approach doesn't work as it is for deadlock detection in the specific use case I described earlier, since the call to trio.from_thread.run_sync is going to deadlock before the future can be returned. You can see more precisely when the deadlock occurs in this gist (I replaced the fuse virtual file system with two linux FIFOs for simplicity).

Instead, trio.from_thread.run_sync should be replaced with token.run_sync_soon, which then allows for the deadlock detection I posted in my first comment. Also, replacing the explicit nursery with trio.lowlevel.spawn_system_task should make things easier.

Anyway, I have no problem adding the modified version of run_async_as_future to our code base, but it's not so easy to implement in a robust way either. Maybe it would make sense to have a reference implementation somewhere?

vxgmichel avatar Sep 03 '21 11:09 vxgmichel

So you take the above code snippets and combine them.

  • add a set of open futures to my deadlock detector, above.
  • teach my run_async_as_future code, above, to add the futures it creates to this set (and remove them when they're set of course).
  • if a deadlock is detected, the detector calls set_exception(TimeoutError()) on each of these futures.

Done.

Bonus feature: there's now one deadlock detector in the whole system. Thus you don't need to remember to apply timeouts to the individual futures, so if a job does take longer despite there not being a deadlock in Trio you won't have spurious errors.

NB, your cancellation idea is dangerous as written, because it depends on Trio not being deadlocked. Solution: create a separate thread to which you pass the future from your _done_callback (using an unlimited queue.Queue) and let that second thread feed the cancellation calls to Trio.

smurfix avatar Sep 03 '21 11:09 smurfix

@smurfix Great, thanks for the suggestions :)

vxgmichel avatar Sep 03 '21 13:09 vxgmichel

One warning: there's an open Python bug (filed by me, otherwise untouched so far) where Python does a syscall without releasing the GIL, which would deadlock your program with no way to recover using a watchdog thread. The syscall in question is isatty() (which translates to ioctl in libc of course). There may be other undiscovered instances of this problem lurking in Python's bowels.

Workaround: again a pipe with keepalives, but this time to a separate process.

Solution: get somebody to submit a patch. This particular one should be easy; auditing the code for other instances of this is somewhat harder. And yes I should have done it myself :-/ but I'm already juggling too much.

https://bugs.python.org/issue44219

smurfix avatar Sep 03 '21 14:09 smurfix

@smurfix

One warning: there's an open Python bug (filed by me, otherwise untouched so far) where Python does a syscall without releasing the GIL, which would deadlock your program with no way to recover using a watchdog thread.

I did remember stumbling across this issue a couple of years ago. If I recall correctly, the call to isatty only occurs while opening a file in text mode and not in binary mode which was very confusing. I didn't realize it was a GIL deadlock, I wrongly assumed it was some internal problem with fuse instead. Thanks for reporting it, I hope it'll get some attention!

Workaround: again a pipe with keepalives, but this time to a separate process.

Interesting, but then how does the watchdog process unlocks the main one? Even with signals, no python can be executed in the main process since the GIL is already taken.

vxgmichel avatar Sep 06 '21 16:09 vxgmichel

the call to isatty only occurs while opening a file in text mode and not in binary mode which was very confusing

Actually this makes sense: the call is used to determine whether to line-buffer the output, which presumably makes no sense in binary mode.

then how does the watchdog process unlocks the main one?

It basically doesn't. Currently if that happens you're SOL and your only way out is to kill off the process.

On the plus side, Python calls this only directly after opening the file, so the window for this to happen is pretty small.

Also, come to think of it, if you reply to TTY-ish (or, even better, all) ioctl calls directly instead of potentially recursing, then this won't bite you.

smurfix avatar Sep 06 '21 16:09 smurfix

Going right back to the original comment that defines this issue:

At the moment, it doesn't seem possible to schedule a task to a trio loop from another thread and get the result later (i.e the trio.from_thread.run function blocks until the result is returned).

This issue was originally about this one thing, and the stuff about deadlock detection was mentioned as context. I just wanted to point out that it absolutely is possible to schedule a task to a trio loop without blocking, by using TrioToken.run_sync_soon() instead. You can use this as a tool to build whatever else you want. For example, you can pass data back to the original thread using a condition variable (which is what futures.Future does under the hood) or a standard queue.Queue.

I had the same question originally but for a very different application, and it took me a while to find TrioToken.run_sync_soon() in the documentation. I'd suggest it might be worth putting a cross reference from trio.run_sync().

(I'm a bit confused that when a comment here finally noted that function, it was only a passing mention like it was a small detail, when it's really the main bit that allows the problem to be solved.)

arthur-tacca avatar Sep 30 '21 23:09 arthur-tacca

I'm a bit confused that when a comment here finally noted that function, it was only a passing mention like it was a small detail, when it's really the main bit that allows the problem to be solved

You're absolutely right, I've been able to implement the use case I described by using TrioToken.run_sync_soon, see the implementation here. I'd say the main issue is that I had to copy to tricky logic in trio._threads, like wrapping the result in an outcome, synchronizing the threads and disabling keyboard interrupt protection. Also, I couldn't use trio._utils.coroutine_or_error as it is not exposed publicly.

My main point is that a high-level API for asynchronous task scheduling from a thread might be something to consider, especially if it were to include more powerful features such as cancellation support. But maybe it's a niche use case that does not justify the work of maintaining such an API when a low-level API already exists :)

vxgmichel avatar Oct 18 '21 08:10 vxgmichel

Closing this because I think it's been resolved, and doesn't seem to come up often enough to justify a high-level API.

Zac-HD avatar Mar 17 '23 04:03 Zac-HD