inotify_simple icon indicating copy to clipboard operation
inotify_simple copied to clipboard

asyncio Support?

Open archuserr opened this issue 4 years ago • 8 comments

I currently use inotify in a thread to monitor files in a directory. It seems like the Python community prefers asyncio these days. Can this module support it?

archuserr avatar Mar 22 '20 00:03 archuserr

Threading is my preferred approach, but it would be good to support asyncio too.

However, rather than adding a specific async interface, there are already standard bits in asyncio for attaching callbacks to file read operations, which can be used instead (since the INotify class is a file-like object). So I think that's probably the way to go. But I should add the example to the documentation for how to do it. It is something like:

import os
import asyncio
from inotify_simple import INotify, flags

inotify = INotify()
os.mkdir('/tmp/inotify_test')
wd = inotify.add_watch('/tmp/inotify_test', flags.DELETE_SELF)

def reader():
    for event in inotify.read():
        print(event)

def stuff():
    os.system('rmdir /tmp/inotify_test')

loop = asyncio.get_event_loop()
# Add our callback:
loop.add_reader(inotify, reader)

# Create some events:
loop.call_soon(stuff)

try:
    # Run the event loop. Hit ctrl-C to stop
    loop.run_forever()
except KeyboardInterrupt:
    print("Stopping")
finally:
    # We are done. Close the event loop.
    loop.close()

chrisjbillington avatar Mar 22 '20 00:03 chrisjbillington

I guess this isn't quite the same as an awaitable read() function, which people might prefer. I looked into this a while ago and it wasn't obvious how to do it. Do you know of examples of awaitable read() functions on files elsewhere?

If I can figure out how then I could make such a thing.

chrisjbillington avatar Mar 22 '20 00:03 chrisjbillington

That works! Thanks

archuserr avatar Mar 30 '20 02:03 archuserr

I wonder if it's just a matter of making your handle using the syscall and then using fileobj=os.fdopen(). Then you should be able to await fileobj.read(...) and everything becomes straightforward.

iawells avatar Jul 10 '20 01:07 iawells

@iawells is file.read() awaitable? I am not under the impression it is.

chrisjbillington avatar Jul 10 '20 01:07 chrisjbillington

I don't think you need an awaitable os.read(). The key rule with asyncio is not to block unless you await. But the way you are reading doesn't block. The code uses ioctl to determine if there is data to read before actually reading (here). So the actual read never blocks.

The actual blocking in this code occurs with poll https://docs.python.org/3/library/select.html#polling-objects.

To run this async, I suspect the easiest option would be to replace select.poll() with loop.add_reader(). Obviously this fires a callback rather than offering something to await. You can always await a condition and release the condition using the callback.

couling avatar Aug 22 '20 21:08 couling

Yes, add_reader() works already as in the example upthread. However, callbacks are a less idiomatic way to use asyncio than providing an awaitable function or method. Wrapping a callback under the hood to present an awaitable method via a condition is a decent idea!

chrisjbillington avatar Aug 23 '20 03:08 chrisjbillington

I know this is an asyncio thread but I have an implementation in trio and I figured I should share. I think the only thing that doesn't translate to asyncio is the timeout detection. I'm not sure if asyncio has built in timeout tools but the functionality could be implemented with some custom exceptions and a few time.time() calls.

The functionality I wanted was such that I could iterate over the inotify events in an async for until the no event timeout occurred (or indefinitely if no timeout). A thread is used to perform the read (as most IO based operations do). I don't really understand what considerations are necessary to ensure thread safety / no corruption. It's probably fine as long as you don't try to watch for events from an INotify object in more than one task.

Trio functions used:

  • trio.to_thread.run_sync ; used to read events. asyncio.to_thread should be a drop-in replacement.
  • trio.move_on_after ; builtin timeout mechanism. Cancels the task at the next schedule point if the timeout is hit.
  • trio.sleep ; only used to create a schedule point to check for cancellation.

import trio
from inotify_simple import INotify as SimpleINotify

class INotify(SimpleINotify):

    async def events(self, timeout=None, read_delay_ms=None, poll_timeout_ms=None):
        """
        Iterate over inotify events until no more events occur after timeout seconds.

        The reads occur in a separate thread and thus won't block the event loop. However, while
        waiting for the thread to return this task is effectively stopped. It will not process
        cancellation, signals, etc. This in turn could block the program if the program is waiting
        for this task to complete / cancel (usually for shutdown). The worst-case block time is
        poll_timeout_ms + read_delay_ms.

        Arguments:
            timeout (int): The time in seconds to wait for events if there are none. If ``None``,
                block until there are events.
            read_delay_ms (int): If there are no events immediately available for reading, then this
                is the time in milliseconds to wait after the first event arrives before reading the
                file descriptor. While this allows the kernel to coalesce events it is at the expense
                of reducing responsiveness to the event loop.
            poll_timeout_ms (int): How long to poll for events before returning to the event loop. By
                default this is linked to timeout. It can be shortened which makes cancel / cleanup
                more responive at the expensive of idle looping.

        Yields:
            tuple - inotify_event
        """
        # We need to check if the function has been canceled every so often. If the timeout is set
        # to None (wait forever) then the poll_ms_timeout still needs a value such that the async
        # function can eventually cancel / cleanup. We set this timeout to 5 seconds. In the worst
        # case this will delay shutdown by 5 seconds.
        if timeout is None:
            timeout = float('inf')
        if poll_timeout_ms is None:
            poll_timeout_ms = 5000 if timeout == float('inf') else timeout * 1000
        while True:
            with trio.move_on_after(timeout) as cancel_scope:
                events = await self._get_events(poll_timeout_ms, read_delay_ms)
            if cancel_scope.cancelled_caught:
                break
            for inotify_event in events:
                yield inotify_event

    async def _get_events(self, poll_timeout_ms, read_delay_ms):
        events = None
        while not events:
            # The position of trio.sleep is important. Cancellation will occur here if the timeout
            # hits or if the engine decides to stop the task (shutdown). By putting it before the
            # to_thread we give the benefit of the doubt to the read function. That is, we could
            # return events that happen up to poll_timeout_ms + read_delay_ms after the scheduled
            # timeout. If a hard deadline (discarding events after the timeout) is required then
            # the sleep should occur after the to_thread.
            await trio.sleep(0)
            events = await trio.to_thread.run_sync(self.read, poll_timeout_ms, read_delay_ms)
        return events

Sxderp avatar Sep 18 '22 18:09 Sxderp