transitions icon indicating copy to clipboard operation
transitions copied to clipboard

Feature Request: Add non-gather mode for AsyncMachine callbacks

Open JIAQIA opened this issue 4 months ago • 6 comments

Feature Request: Add non-gather mode for AsyncMachine callbacks

Problem Description

Currently, AsyncMachine executes callbacks using asyncio.gather(), which creates a potential issue when managing resource lifecycles across different state transitions.

The problem manifests when:

  1. Opening resources in one transition callback (e.g., on_enter_connect)
  2. Closing those same resources in another transition callback (e.g., on_enter_disconnect)

Because asyncio.gather() runs callbacks in separate tasks, this creates a task boundary violation for context managers that require opening/closing in the same task (e.g., anyio cancel scopes, async_exit_stack, etc.).

Current Workaround

I've implemented a temporary solution by subclassing AsyncMachine:

class A2CAsyncMachine(AsyncMachine):
    @staticmethod
    async def await_all(callables: list[Callable]) -> list:
        ret = []
        for c in callables:
            ret.append(await c())
        return ret

This works but isn't ideal for long-term maintenance.

Proposed Solution

Add a configuration flag to control callback execution mode:

machine = AsyncMachine(..., gather_callbacks=False)  # Default True for backward compatibility

When False, callbacks would execute sequentially in the same task rather than via asyncio.gather().

Benefits

  1. Maintains backward compatibility
  2. Gives users control over execution model
  3. Solves task-boundary issues for context managers
  4. Still allows parallel execution when desired (default mode)

Potential Considerations

  1. Performance impact for users who don't need sequential execution
  2. Need to verify no other parts of the code assume gather behavior
  3. Documentation updates to explain the tradeoffs

Environment

  • OS: macOS Ventura
  • Python: 3.11
  • Library: transitions (latest main branch)

Would appreciate maintainers' thoughts on this approach. Happy to submit a PR if this direction is approved.

JIAQIA avatar Aug 19 '25 10:08 JIAQIA

You can't call context managers' __aenter__ and __aexit__ methods from callbacks and expect that to be stable in any form, separate tasks or not. While __enter__ and __exit__ are somewhat less critical in this regard, it's still possible to royally confuse an ExitStack, particularly when handling nontrivial exceptions.

The correct way to do this is to create a helper task that enters and exits the context on your behalf.

Something like this:

import anyio
from concurrent.futures import CancelledError  # intentionally not asynio/anyio.Cancelled
from contextlib import asynccontextmanager
from typing import Literal
from collections.abc import AsyncIterator

class ContextMgr[CtxType]:
    """
    This class manages a context for the caller.

    Useful when entering/leaving the context is triggered from state
    machine callbacks or similar event handlers.
    """
    @asynccontextmanager
    async def context(self, *args, **kwargs) -> AsyncIterator[CtxType]:
        raise NotImplemented("Override me!")

    exc:Exception|None = None
    ctx:CtxType|Literal[False]|None = None
    stopper:anyio.Event=None
    stopped:anyio.Event=None

    def __init__(self):
        self.qw,self.qr = anyio.create_memory_object_stream(0)

    async def task(self):
        """
        The task that encapsulates the context handler.

        Start this once, when setting up your state machine.
        """
        async with self.qr:
            async for evt,args,kwargs in self.qr:
                self.stopper = anyio.Event()
                self.stopped = anyio.Event()
                try:
                    async with self.context(*args,**kwargs) as self.ctx:
                        evt.set()
                        evt = self.stopped
                        await self.stopper.wait()
                        if self.exc is not None:
                            raise self.exc
                except Exception as exc:
                    self.exc = exc
                except BaseException:
                    self.exc = CancelledError()
                    raise
                finally:
                    evt.set()
                    self.ctx = None

    def close(self):
        """
        Ends the context task.
        """
        self.qw.close()
        if self.stopper is not None:
            self.exc = CancellerError()
            self.stopper.set()

    async def start(self, *args, **kwargs) -> CtxType:
        """
        Creates and starts your context, passing the given arguments.

        Raises `RuntimeError` if the context is already open
        (or starting in a different task).
        """
        if self.ctx is not None:
            raise RuntimeError("Context already entered")
        self.ctx = False
        evt=anyio.Event()
        await self.qw.send((evt,args,kwargs))
        await evt.wait()
        if self.exc is not None:
            exc,self.exc = self.exc,None
            raise exc
        return self.ctx

    async def stop(self, exc:Exception|None=None) -> None:
        """
        Stops your context.

        If @exc is set, it is passed into / raised in the context.

        This method waits until the context ends.
        """
        if self.ctx in (None,False):
            raise RuntimeError("Context not entered")
        if exc is not None:
            self.exc = exc
        self.stopper.set()
        await self.stopped.wait()
        if self.exc is None:
            return
        if self.exc is exc:
            self.exc = None
        else:
            exc,self.exc = self.exc,None
            raise exc

Usage:

class YourStuff:
    async def run(self):
        async with anyio.create_task_group() as tg:
            self.ctxm = CtxMgr()
            tg.start_soon(self.ctx.task)

            ...  
            # set up and run your model here
            # hook up self.ctxm.start / stop to start/stop the context

            # Your context object is available in self.ctxm.ctx.

I didn't test this (yet – I plan to add this to my moat.util.ctx codebase).

smurfix avatar Aug 30 '25 09:08 smurfix

Thank you for the response. I've been using the transitions library for many years, and I truly appreciate all the effort and maintenance you've put into it.

However, I believe we still need to further discuss this issue. Let me share some background context first:

I encountered this problem while developing an MCPServer manager. Since the MCP Python SDK doesn't plan to release a synchronous API version in the short term(https://github.com/modelcontextprotocol/python-sdk/issues/309#issuecomment-3178881557), everyone must continue using the asyncio version. Additionally, the client and session management provided by the MCP Python SDK are based on context managers.

My team has been working on developing an Agent framework for two years, and one key insight we've gained is that many lifecycle management tasks in LLM and Agent engineering systems are better handled using finite state machines - something we feel the industry hasn't fully recognized yet.

During this work, I came across the mcp-use project. Interestingly, their current version hasn't adopted transitions yet, though I believe they could benefit from doing so later. I share this background for two reasons:

  1. transitions will play a significantly larger role in the AI Agent era (my team members, who are contributors to LangChain, UnstructuredIO, LlamaIndex, and other prominent AIAgent libraries, all agree with this assessment)
  2. The sample code you provided is very similar to mcp-use's implementation, which makes me think you might also see potential opportunities in this direction.

Now, back to the issue I raised:

The ContextMgr solution and my feature request address two separate needs. To explain why:

  1. I proposed this feature specifically because I encountered an error when trying to open a context manager in one event loop and close it in another - an issue directly caused by AsyncMachine's use of await_all
  2. After encountering this, I implemented a patch, submitted this feature request, and continued development
  3. I did experience the ExitStack confusion issue you mentioned, and I solved it with a solution similar to yours
  4. Now we're continuing this discussion after your response

Regarding point 3, here's my solution implementation:

class BaseMCPClient(ABC):
    def __init__(self, params: BaseModel, state_change_callback: Callable[[str, str], None | Awaitable[None]] | None = None) -> None:
        """
        基类初始化

        Attributes:
            params (BaseModel): MCP Server启动参数
            state_change_callback (Callable[[str, str], None | Awaitable[None]]): 状态变化回调,兼容同步与异步
        """
        self.params = params
        self._state_change_callback = state_change_callback
        self._aexit_stack = AsyncExitStack()
        self._async_session: ClientSession | None = None
        self._session_keep_alive_task: asyncio.Task | None = None
        self._create_session_success_event = asyncio.Event()
        self._create_session_failure_event = asyncio.Event()
        self._async_session_closed_event = asyncio.Event()

        # 初始化异步状态机
        self.machine = A2CAsyncMachine(
            model=self,
            states=STATES,
            transitions=TRANSITIONS,
            initial=STATES.initialized,
            send_event=True,  # 传递事件对象给回调
            auto_transitions=False,  # 禁用自动生成的状态转移
            ignore_invalid_triggers=False,  # 忽略无效触发器
        )

    async def _create_async_session(self) -> ClientSession:
        """
        创建异步会话

        Returns:
            ClientSession: 异步会话
        """
        stdout, stdin = await self._aexit_stack.enter_async_context(stdio_client(self.params))
        client_session = await self._aexit_stack.enter_async_context(ClientSession(stdout, stdin))
        return client_session


    async def _keep_alive_task(self) -> None:
        """
        async_session 保活,进而保证其它连接可以正常使用它。

        在MCP源码设计中,xxx_client与ClientSession均使用了anyio的task_group来管理子任务。但这带来一个维护问题,在Manager中需要管理多个Client,如果
          Client的AsyncSession是基于anyio.task_group打开,那么在关闭时,必须严格按照打开顺序关闭,否则会导致anyio报错。基于这个anyio特性,因为我需要让
          ClientSession在一个独立的Asyncio Task中运行,如此可以保证这个上下文的打开关联在这个内部Task中,从而可以实现自由关闭。在Manager中可
          以独立启停Client

        在这个实现中主要完成以下几个工作:

        1. 完成 self._async_session的创建
        2. 将需要持续保证的上下文压栈 self._aexit_stack
        3. 通过 asyncio.Event().wait() 来保证上下文的持续,同时通过响应 self._session_keep_alive_task.done() 来完成上下文的关闭
        4. 得到关闭信号后,对 self._aexit_stack 里的上下文进行关闭
        """
        logger.debug(f"Session keep-alive task: {asyncio.current_task().get_name()}")
        try:
            # 创建异步会话,同时完成上下文的压栈
            self._async_session = await self._create_async_session()
            # 通知创建成功
            self._create_session_success_event.set()
            # 持续等待关闭信息
            try:
                # 等待任务的cancel
                await asyncio.Event().wait()
            except asyncio.CancelledError:
                # 任务被取消,完成上下文
                logger.debug(f"Session keep-alive task cancelled: {asyncio.current_task().get_name()}")
        except Exception as e:
            logger.error(f"Session keep-alive task error: {asyncio.current_task().get_name()}: {e}")
            self._create_session_failure_event.set()
            await self.aerror()

        finally:
            # 关闭上下文
            await self._aexit_stack.aclose()
            # 清理session
            self._async_session = None
            self._async_session_closed_event.set()

    async def on_enter_connected(self, event: EventData) -> None:
        """进入已连接状态(可重写)"""
        logger.debug(f"Entering connected state with event: {event}\n\nserver params: {self.params}")
        self._session_keep_alive_task = asyncio.create_task(self._keep_alive_task())
        # 等待会话创建成功
        await self._create_session_success_event.wait()
        # 初始化client_session
        await (await self.async_session).initialize()

    async def on_enter_disconnected(self, event: EventData) -> None:
        """状态机进入断开状态时的回调(可重写)"""
        logger.debug(f"Entering disconnected state with event: {event}\n\nserver params: {self.params}")
        # 关闭异步会话,保证资源的正常释放
        logger.debug(f"Enter disconnected state async task: {asyncio.current_task().get_name()}")
        await self._close_task()
        # 等待会话关闭
        await self._async_session_closed_event.wait()

    async def _close_task(self) -> None:
        """
        关闭异步任务
        """
        # 将所有异步Event全部clear
        if self._session_keep_alive_task and not self._session_keep_alive_task.done():
            self._session_keep_alive_task.cancel()

            # 等待_session_keep_alive_task结束
            try:
                await self._session_keep_alive_task
            except asyncio.CancelledError:
                logger.debug("Session keep-alive task was cancelled")
            except Exception as e:
                logger.error(f"Session keep-alive task failed: {e}")

Your approach is similar to mcp-use's - I used AsyncExitStack instead (personally, I feel it reduces encapsulation complexity and some maintenance burden, but this is just an implementation detail).


I want to clarify that I'm not suggesting we focus on ContextMgr further. As you rightly pointed out, it's essential for complex async context management scenarios. What I want to emphasize is that in transitions internals, using await_all with asyncio.gather to process callbacks essentially splits the user's event loop.

This event loop fragmentation might cause issues beyond just context management problems. This is why I feel strongly about continuing this discussion - in the emerging AI Agent landscape, this library has tremendous potential. We should proactively consider and address any potentially confusing design patterns.

Therefore, I still recommend (as a long-time user) that we carefully reconsider the "fragmentation" of coroutines in AsyncMachine. While we can leverage coroutines for efficiency gains, we must:

  • Ensure users clearly understand what's happening
  • Give them the choice to use a "simpler approach" (sequential execution) for more predictable and controllable execution sequences

Thank you again for all your contributions over the years. I'd be happy to help contribute in any way I can to improve the library.


I have implemented all the provided code at: https://github.com/A2C-SMCP (This library is currently being open-sourced but hasn't been promoted yet)

JIAQIA avatar Sep 01 '25 02:09 JIAQIA

Hi,

using await_all with asyncio.gather to process callbacks essentially splits the user's event loop.

I don't understand your problem here. asyncio.gather doesn't split anything. It merely waits until all subtasks have completed. Your band-aid fix (a simple loop over the subtasks) does the same thing, just sequentially.

In both cases unrelated async events (e.g. a new sensor value or a message from some remote system or whatever) might happen, requiring some additional state management by the user. (Like, "queue this event, but only if the state machine is not in "running" state or still does some post processing"). You'd add a postprocessing step to your transition.after handler that resumes processing them.

NB: All of this works way better when you use anyio instead of asyncio. I'm currently preparing a PR for that.

smurfix avatar Sep 01 '25 07:09 smurfix

Sorry.

My previous statement was inaccurate. It's not about splitting the event loop, but rather about spawning separate coroutines. Each function is invoked within a new, independent coroutine when using asyncio.gather.

This leads to an issue where resources opened by the user in the current event loop may throw exceptions if one of the func()calls inside return await asyncio.gather(*[func() for func in callables])attempts to close them.

Similarly, in our implementation of the state machine, when handling await_all, we don’t create multiple threads for processing but instead handle the tasks sequentially. Although using asyncio.gatherhere improves efficiency, it also introduces potential risks.


asyncio.gather​ executes the callbacks concurrently in separate tasks (coroutines)

Your statement that "In both cases unrelated async events might happen"​misses the critical difference: • With gather, the ​callbacks themselves become "unrelated async events"​​ to each other • State machine transitions inherently require ​atomic callback processing​ – concurrency breaks this guarantee ​This is not theoretical​ – it’s precisely why Python’s asynciohas safeguards like BoundedSemaphorefor shared resources. Transitions shouldn’t create these hazards at the framework level

JIAQIA avatar Sep 01 '25 07:09 JIAQIA

State machine transitions inherently require ​atomic callback processing​ – concurrency breaks this guarantee

Running any async code at all, as a state machine's callback, breaks this guarantee. Running your async callbacks serially instead of in parallel will not change that.

smurfix avatar Sep 01 '25 08:09 smurfix

I think you should copy the history messages on this page (or the url of this page), then pass them to ChatGPT, and let it provide some explanations.

JIAQIA avatar Sep 01 '25 08:09 JIAQIA