traits
traits copied to clipboard
Async observer dispatch
It would be nice to support dispatch via asyncio or other similar async systems using traits.
It's not hard to create a dispatcher that dispatches regular functions asynchronously:
def dispatch_asyncio(handler, event):
try:
loop = asyncio.get_event_loop()
loop.call_soon_threadsafe(handler, event)
except RuntimeError:
# no loop currently; dispatch synchronously
handler(event)
which could then be hooked up via _ObserverDispatchers["async"] = dispatch_asyncio
and then invoked with dispatch="async"
; but this is only half the problem, since the callback is a regular function it will block the event loop while it executes. This is of course analogous to what happens with dispatch="ui"
.
But it would be really nice to be able to have async observers to handle longer running tasks in the background (I/O in particular, but also possibly GUI tasks). Something like:
class RemoteResource(HasTraits):
url = Str()
data = Bytes()
@observe('url')
async def _download_data(self, event):
# probably need more sanity checking, but this is the general idea
self.data = await get_url_async(event.new)
This raises some design issues, though:
- what happens if there is no async event loop running? (nothing? error? start an event loop and call run?). This is particularly of concern for handlers called during instance creation which are more likely to be outside of an event loop.
- how do we track async handlers while they are running? The obvious way is with a set that holds all the active handlers (via Tasks or Futures) and attach callbacks that delete them from the set when they are done. But what granularity do we use (global? per class? per object? per observer?).
- do we expose running handlers via an API (eg. do we allow handlers to be cancelled?). If so, what would that look like? What do we do if we want to shut down while handlers are still running?
- there needs to be some way of awaiting the completion of the async handlers, I think.
Something like the following might work:
_handler_tasks = set()
async def gather_handlers():
# probably a better way of doing this
while _handler_tasks:
await asyncio.sleep(0)
def dispatch_async_handler(handler, event):
task = asyncio.create_task(handler(event))
_handler_tasks.add(task)
task.add_done_callback(_hander_tasks.discard)
You could imagine this being used something like the following:
async def get_resources(urls):
resources = [RemoteResource(url=url) for url in urls]
await gather_handlers()
return resources
although I also think that if async-based handlers are used it will be much more natural to chain observers: if you need to react when the download is done, just observe data
and assume that the app/server code is doing a loop.run_forever()
somewhere.
@mdickinson Thoughts?
- what happens if there is no async event loop running? (nothing? error? start an event loop and call run?)
On another project (using Panel rather than Traits), I had a need for a fourth option: schedule the task to be executed when the event loop starts. What Panel did instead was to start an event loop and call run, then block until the task completed, but that was undesirable for the situation I was in, since we wanted a task to start at application startup and run in parallel with the application.
So I'd vote for either (a) error, or (d) schedule the task to be executed when the event loop starts running.
- there needs to be some way of awaiting the completion of the async handlers, I think
I'm not sure we need to do anything special here. A fire-and-forget model works pretty well with async, and it's the basis of how things work in JavaScript. Individual async handlers can take care to update state to mark completion if that's what they need. Avoiding interactions from incomplete cleanup in unit tests is generally easy to achieve by using IsolatedAsyncioTestCase
from the std. lib.
So I can't think of any use-cases that would require having Traits do bookkeeping of running coroutines beyond the book-keeping already being performed by asyncio
. Project-specific bookkeeping can be handled in the coroutines themselves.
- what happens if there is no async event loop running? (nothing? error? start an event loop and call run?)
... So I'd vote for either (a) error, or (d) schedule the task to be executed when the event loop starts running.
Having things error is certainly the easiest, and probably a reasonable first step. It's easier to make a change that does something sensible instead of an error condition than to do something wrong and then have to change all the code that depends on it.
- there needs to be some way of awaiting the completion of the async handlers, I think
I'm not sure we need to do anything special here. A fire-and-forget model works pretty well with async, and it's the basis of how things work in JavaScript. Individual async handlers can take care to update state to mark completion if that's what they need. Avoiding interactions from incomplete cleanup in unit tests is generally easy to achieve by using
IsolatedAsyncioTestCase
from the std. lib.So I can't think of any use-cases that would require having Traits do bookkeeping of running coroutines beyond the book-keeping already being performed by
asyncio
. Project-specific bookkeeping can be handled in the coroutines themselves.
FWIW, I just implemented what I think may be the dumbest possible approach in an async Flet app and it just worked. I just replaced dispatch_same
with:
_handler_tasks = set()
def dispatch_same(handler, event):
if asyncio.iscoroutinefunction(handler):
task = asyncio.create_task(handler(event))
_handler_tasks.add(task)
task.add_done_callback(_handler_tasks.discard)
else:
handler(event)
I think we do need the book-keeping of the _handler_tasks
set to prevent the tasks being garbage collected, but in practice (and in particular, where the observers are being manually connected in a context where we know that the async event loop exists) it seems that we don't need to worry about gathering the tasks.
I think we do need the book-keeping of the
_handler_tasks
set to prevent the tasks being garbage collected
Ah yes, that makes sense.
The other half of async support that I think I'd want at some point would be the ability to await a Trait change in a coroutine.
The other half of async support that I think I'd want at some point would be the ability to await a Trait change in a coroutine.
Yes, that would be nice, something like:
event = await obj.await_trait_change("foo.bar")
where event
is the appropriate trait change event. Not quite sure what the implementation would look like - possibly something which uses an asyncio.Event
internally that gets set by an observer on the trait. Something which looks a bit like:
async def await_trait_change(obj, trait):
async_event = asyncio.Event()
trait_event = None
def handler(event):
nonlocal trait_event
trait_event = event
async_event.set()
obj.observe(handler, trait)
await async_event.wait()
obj.observe(handler, trait, remove=True)
return trait_event
It would be nicer if asyncio.Event
could carry a payload, but that doesn't seem to be the case.
I could also see a pattern (particularly for Event
traits) where the trait events are instead yield
ed so you could do something like:
async for event in obj.yield_trait_changes("foo.bar"):
# process the event
...
Being able to do this seems independent of asyncio handlers, and so could be implemented separately.
Being able to do this seems independent of asyncio handlers, and so could be implemented separately.
Yes, definitely.
Basic feature completed in #1771. We can open separate issues for follow-on features, if desired.