python-sdks
python-sdks copied to clipboard
Allow async functions to be used as callbacks
Since our framework is async by default, we should give the option for users to pass in async functions in the callbacks. This will avoid awkward use of asyncio.create_task() inside the code.
How about this approach
- Storing all callbacks as async
- Wrap the non-async callback in an async callback
- Move asyncio.create_task() to call callbacks inside emit, keeping the emit api unchanged
import asyncio
import logging
from typing import Callable, Dict, Optional, Set, TypeVar, Awaitable, Union
logger = logging.getLogger(__name__)
T_contra = TypeVar("T_contra", contravariant=True)
class EventEmitter:
def __init__(self):
self._events: Dict[T_contra, Set[Callable[..., Awaitable[None]]]] = {}
def on(self, event: T_contra, callback: Optional[Callable] = None) -> Callable:
"""
Register a callback for an event. Synchronous callbacks are automatically wrapped as async.
"""
if callback is not None:
if not asyncio.iscoroutinefunction(callback):
original_callback = callback
async def wrapper(*args, **kwargs):
return await asyncio.to_thread(original_callback, *args, **kwargs)
callback = wrapper # Replace with async wrapper
if event not in self._events:
self._events[event] = set()
self._events[event].add(callback)
return callback
else:
def decorator(callback: Callable) -> Callable:
return self.on(event, callback)
return decorator
def emit(self, event: T_contra, *args) -> None:
"""
Emit an event, ensuring all callbacks are awaited.
"""
for callback in self._events.get(event, set()):
try:
asyncio.create_task(callback(*args))
except Exception:
logger.exception(f"Failed to emit event {event}")
# Example Usage
emitter = EventEmitter()
@emitter.on("greet")
def greet(name):
print(f"Hello, {name}!")
@emitter.on("greet")
async def async_greet(name):
await asyncio.sleep(1)
print(f"Hello asynchronously, {name}!")
async def main():
emitter.emit("greet", "Charlie")
await asyncio.sleep(2) # Allow async functions to complete
asyncio.run(main())