python-sdks icon indicating copy to clipboard operation
python-sdks copied to clipboard

Allow async functions to be used as callbacks

Open davidzhao opened this issue 1 year ago • 1 comments

Since our framework is async by default, we should give the option for users to pass in async functions in the callbacks. This will avoid awkward use of asyncio.create_task() inside the code.

davidzhao avatar Nov 26 '24 07:11 davidzhao

How about this approach

  • Storing all callbacks as async
  • Wrap the non-async callback in an async callback
  • Move asyncio.create_task() to call callbacks inside emit, keeping the emit api unchanged

import asyncio
import logging
from typing import Callable, Dict, Optional, Set, TypeVar, Awaitable, Union

logger = logging.getLogger(__name__)

T_contra = TypeVar("T_contra", contravariant=True)

class EventEmitter:
    def __init__(self):
        self._events: Dict[T_contra, Set[Callable[..., Awaitable[None]]]] = {}

    def on(self, event: T_contra, callback: Optional[Callable] = None) -> Callable:
        """
        Register a callback for an event. Synchronous callbacks are automatically wrapped as async.
        """
        if callback is not None:
            if not asyncio.iscoroutinefunction(callback):
                original_callback = callback
                async def wrapper(*args, **kwargs):
                    return await asyncio.to_thread(original_callback, *args, **kwargs)
                callback = wrapper  # Replace with async wrapper

            if event not in self._events:
                self._events[event] = set()
            self._events[event].add(callback)
            return callback
        else:
            def decorator(callback: Callable) -> Callable:
                return self.on(event, callback)

            return decorator

    def emit(self, event: T_contra, *args) -> None:
        """
        Emit an event, ensuring all callbacks are awaited.
        """
        for callback in self._events.get(event, set()):
            try:
                asyncio.create_task(callback(*args))
            except Exception:
                logger.exception(f"Failed to emit event {event}")

# Example Usage
emitter = EventEmitter()

@emitter.on("greet")
def greet(name):
    print(f"Hello, {name}!")

@emitter.on("greet")
async def async_greet(name):
    await asyncio.sleep(1)
    print(f"Hello asynchronously, {name}!")

async def main():
    emitter.emit("greet", "Charlie")
    await asyncio.sleep(2)  # Allow async functions to complete

asyncio.run(main())

notauserx avatar Feb 07 '25 07:02 notauserx