traits icon indicating copy to clipboard operation
traits copied to clipboard

Async observer dispatch

Open corranwebster opened this issue 5 months ago • 5 comments

It would be nice to support dispatch via asyncio or other similar async systems using traits.

It's not hard to create a dispatcher that dispatches regular functions asynchronously:

def dispatch_asyncio(handler, event):
    try:
        loop = asyncio.get_event_loop()
        loop.call_soon_threadsafe(handler, event)
    except RuntimeError:
        # no loop currently; dispatch synchronously
        handler(event)

which could then be hooked up via _ObserverDispatchers["async"] = dispatch_asyncio and then invoked with dispatch="async"; but this is only half the problem, since the callback is a regular function it will block the event loop while it executes. This is of course analogous to what happens with dispatch="ui".

But it would be really nice to be able to have async observers to handle longer running tasks in the background (I/O in particular, but also possibly GUI tasks). Something like:

class RemoteResource(HasTraits):
    url = Str()
    data = Bytes()

    @observe('url')
    async def _download_data(self, event):
        # probably need more sanity checking, but this is the general idea
        self.data = await get_url_async(event.new)

This raises some design issues, though:

  • what happens if there is no async event loop running? (nothing? error? start an event loop and call run?). This is particularly of concern for handlers called during instance creation which are more likely to be outside of an event loop.
  • how do we track async handlers while they are running? The obvious way is with a set that holds all the active handlers (via Tasks or Futures) and attach callbacks that delete them from the set when they are done. But what granularity do we use (global? per class? per object? per observer?).
  • do we expose running handlers via an API (eg. do we allow handlers to be cancelled?). If so, what would that look like? What do we do if we want to shut down while handlers are still running?
  • there needs to be some way of awaiting the completion of the async handlers, I think.

Something like the following might work:

_handler_tasks = set()

async def gather_handlers():
    # probably a better way of doing this
    while _handler_tasks:
        await asyncio.sleep(0)

def dispatch_async_handler(handler, event):
    task = asyncio.create_task(handler(event))
    _handler_tasks.add(task)
    task.add_done_callback(_hander_tasks.discard)

You could imagine this being used something like the following:

async def get_resources(urls):
    resources = [RemoteResource(url=url) for url in urls]
    await gather_handlers()
    return resources

although I also think that if async-based handlers are used it will be much more natural to chain observers: if you need to react when the download is done, just observe data and assume that the app/server code is doing a loop.run_forever() somewhere.

@mdickinson Thoughts?

corranwebster avatar Feb 07 '24 10:02 corranwebster

  • what happens if there is no async event loop running? (nothing? error? start an event loop and call run?)

On another project (using Panel rather than Traits), I had a need for a fourth option: schedule the task to be executed when the event loop starts. What Panel did instead was to start an event loop and call run, then block until the task completed, but that was undesirable for the situation I was in, since we wanted a task to start at application startup and run in parallel with the application.

So I'd vote for either (a) error, or (d) schedule the task to be executed when the event loop starts running.

  • there needs to be some way of awaiting the completion of the async handlers, I think

I'm not sure we need to do anything special here. A fire-and-forget model works pretty well with async, and it's the basis of how things work in JavaScript. Individual async handlers can take care to update state to mark completion if that's what they need. Avoiding interactions from incomplete cleanup in unit tests is generally easy to achieve by using IsolatedAsyncioTestCase from the std. lib.

So I can't think of any use-cases that would require having Traits do bookkeeping of running coroutines beyond the book-keeping already being performed by asyncio. Project-specific bookkeeping can be handled in the coroutines themselves.

mdickinson avatar Feb 08 '24 15:02 mdickinson

  • what happens if there is no async event loop running? (nothing? error? start an event loop and call run?)

... So I'd vote for either (a) error, or (d) schedule the task to be executed when the event loop starts running.

Having things error is certainly the easiest, and probably a reasonable first step. It's easier to make a change that does something sensible instead of an error condition than to do something wrong and then have to change all the code that depends on it.

  • there needs to be some way of awaiting the completion of the async handlers, I think

I'm not sure we need to do anything special here. A fire-and-forget model works pretty well with async, and it's the basis of how things work in JavaScript. Individual async handlers can take care to update state to mark completion if that's what they need. Avoiding interactions from incomplete cleanup in unit tests is generally easy to achieve by using IsolatedAsyncioTestCase from the std. lib.

So I can't think of any use-cases that would require having Traits do bookkeeping of running coroutines beyond the book-keeping already being performed by asyncio. Project-specific bookkeeping can be handled in the coroutines themselves.

FWIW, I just implemented what I think may be the dumbest possible approach in an async Flet app and it just worked. I just replaced dispatch_same with:

_handler_tasks = set()

def dispatch_same(handler, event):
    if asyncio.iscoroutinefunction(handler):
        task = asyncio.create_task(handler(event))
        _handler_tasks.add(task)
        task.add_done_callback(_handler_tasks.discard)
    else:
        handler(event)

I think we do need the book-keeping of the _handler_tasks set to prevent the tasks being garbage collected, but in practice (and in particular, where the observers are being manually connected in a context where we know that the async event loop exists) it seems that we don't need to worry about gathering the tasks.

corranwebster avatar Feb 09 '24 17:02 corranwebster

I think we do need the book-keeping of the _handler_tasks set to prevent the tasks being garbage collected

Ah yes, that makes sense.

The other half of async support that I think I'd want at some point would be the ability to await a Trait change in a coroutine.

mdickinson avatar Feb 12 '24 08:02 mdickinson

The other half of async support that I think I'd want at some point would be the ability to await a Trait change in a coroutine.

Yes, that would be nice, something like:

event = await obj.await_trait_change("foo.bar")

where event is the appropriate trait change event. Not quite sure what the implementation would look like - possibly something which uses an asyncio.Event internally that gets set by an observer on the trait. Something which looks a bit like:

async def await_trait_change(obj, trait):
    async_event = asyncio.Event()
    trait_event = None

    def handler(event):
        nonlocal trait_event
        trait_event = event
        async_event.set()

    obj.observe(handler, trait)
    await async_event.wait()
    obj.observe(handler, trait, remove=True)
    return trait_event

It would be nicer if asyncio.Event could carry a payload, but that doesn't seem to be the case.

I could also see a pattern (particularly for Event traits) where the trait events are instead yielded so you could do something like:

async for event in obj.yield_trait_changes("foo.bar"):
    # process the event
    ...

Being able to do this seems independent of asyncio handlers, and so could be implemented separately.

corranwebster avatar Feb 14 '24 11:02 corranwebster

Being able to do this seems independent of asyncio handlers, and so could be implemented separately.

Yes, definitely.

mdickinson avatar Feb 16 '24 08:02 mdickinson

Basic feature completed in #1771. We can open separate issues for follow-on features, if desired.

mdickinson avatar Mar 26 '24 10:03 mdickinson