autogen icon indicating copy to clipboard operation
autogen copied to clipboard

CancellationToken doesn't stop run_stream() iteration when cancelled by signal handler

Open alexey-pelykh opened this issue 2 months ago • 2 comments

CancellationToken doesn't stop run_stream() iteration when signal handler cancels token

Summary

When using run_stream() with a CancellationToken that gets cancelled by a signal handler (e.g., SIGINT/Ctrl+C), the async iteration loop continues processing buffered messages instead of stopping immediately.

Environment

  • AutoGen Version: 0.7.5
  • Python Version: 3.13.9
  • OS: macOS Darwin 25.0.0

Expected Behavior

After cancelling the CancellationToken (via signal handler or programmatically), the run_stream() async iteration should stop immediately.

Actual Behavior

The loop continues processing all buffered messages before checking cancellation status. The application appears to hang until all messages are drained from the queue.

Root Cause

In autogen_agentchat/teams/_group_chat/_base_group_chat.py, the run_stream() implementation has a while True: loop that never checks cancellation_token.is_cancelled() before yielding:

while True:  # ← Never checks token status
    message_future = asyncio.ensure_future(self._output_message_queue.get())
    if cancellation_token is not None:
        cancellation_token.link_future(message_future)
    # Wait for the next message, this will raise an exception if the task is cancelled.
    message = await message_future  # ← Exception only raised HERE
    # ... process message ...
    yield message  # ← Messages yielded even if token was just cancelled

Problem: If the queue has buffered messages, they get yielded before the next await message_future can raise the cancellation exception.

Reproduction

import asyncio
import signal
import sys
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.conditions import MaxMessageTermination
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_core import CancellationToken
from autogen_ext.models.openai import OpenAIChatCompletionClient

async def main():
    model_client = OpenAIChatCompletionClient(model="gpt-4o-mini")
    agent = AssistantAgent("Assistant", model_client=model_client)
    team = RoundRobinGroupChat([agent], termination_condition=MaxMessageTermination(10))
    token = CancellationToken()

    def signal_handler(signum, frame):
        print("Cancelling token...")
        token.cancel()
        # Without raising KeyboardInterrupt, loop continues

    signal.signal(signal.SIGINT, signal_handler)

    print("Press Ctrl+C during streaming...")
    event_count = 0
    async for event in team.run_stream(task="Count 1 to 20", cancellation_token=token):
        event_count += 1
        print(f"Event {event_count}")

    print(f"Processed {event_count} events")  # Processes ALL events despite Ctrl+C

asyncio.run(main())

Result: After pressing Ctrl+C, all remaining events are still processed instead of stopping immediately.

Workaround

Raise KeyboardInterrupt in the signal handler to forcefully break the async iteration loop:

def signal_handler(signum, frame):
    token.cancel()
    raise KeyboardInterrupt()  # Force break

Side effect: This causes asyncio.CancelledError and ValueError: task_done() called too many times during cleanup (see issue #7007).

Suggested Fix

Check cancellation status before yielding in the loop:

while True:
    message_future = asyncio.ensure_future(self._output_message_queue.get())
    if cancellation_token is not None:
        cancellation_token.link_future(message_future)
    message = await message_future

    # Add this check:
    if cancellation_token is not None and cancellation_token.is_cancelled():
        break

    if isinstance(message, GroupChatTermination):
        # ... existing code ...
        break
    yield message

Related Issues

  • #4029 - Claimed to add cancellation token support, but only tested async cancellation (not SIGINT)
  • #7007 - Reports task_done() called too many times error (symptom of cancellation bugs)
  • #4776 - Mentions SequentialRoutedAgent should handle cancellation as event handler

Additional Context

The issue is particularly problematic for interactive CLI applications where users expect Ctrl+C to stop execution immediately. The current behavior makes applications appear unresponsive to user interruption.

Your own documentation acknowledges this limitation:

"Setting the cancellation token potentially put the team in an inconsistent state"

A proper cancellation mechanism should gracefully stop iteration without requiring KeyboardInterrupt.

alexey-pelykh avatar Oct 29 '25 17:10 alexey-pelykh

Minimal Reproduction Script

#!/usr/bin/env python3
"""
Minimal reproduction for AutoGen bug: CancellationToken doesn't stop run_stream iteration

Issue: When CancellationToken is cancelled, the run_stream() loop continues processing
buffered messages instead of stopping immediately.

Expected: Loop stops when token is cancelled
Actual: Loop drains all buffered messages before checking cancellation
"""

import asyncio
import signal
import sys
from typing import Any

from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.conditions import MaxMessageTermination
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_core import CancellationToken
from autogen_ext.models.openai import OpenAIChatCompletionClient


async def main() -> None:
    """Demonstrate that Ctrl+C (SIGINT) doesn't stop run_stream iteration."""

    # Setup
    model_client = OpenAIChatCompletionClient(model="gpt-4o-mini")
    agent1 = AssistantAgent("Assistant1", model_client=model_client)
    agent2 = AssistantAgent("Assistant2", model_client=model_client)
    termination = MaxMessageTermination(10)
    team = RoundRobinGroupChat([agent1, agent2], termination_condition=termination)

    cancellation_token = CancellationToken()

    # Install signal handler that cancels token
    def signal_handler(signum: int, frame: Any) -> None:
        print("\n[SIGINT] Cancelling token...", file=sys.stderr, flush=True)
        cancellation_token.cancel()
        print(f"[SIGINT] Token cancelled: {cancellation_token.is_cancelled()}", file=sys.stderr, flush=True)
        # NOTE: Without raising KeyboardInterrupt, the loop continues!
        # Uncomment next line to see the fix:
        # raise KeyboardInterrupt()

    old_handler = signal.signal(signal.SIGINT, signal_handler)

    print("Starting workflow...", file=sys.stderr)
    print("Press Ctrl+C to test cancellation behavior", file=sys.stderr)
    print("-" * 60, file=sys.stderr)

    try:
        event_count = 0
        async for event in team.run_stream(
            task="Count from 1 to 20, one number per message",
            cancellation_token=cancellation_token,
        ):
            event_count += 1
            print(f"[Event {event_count}] {type(event).__name__}", file=sys.stderr, flush=True)

        print(f"\n[DONE] Processed {event_count} events", file=sys.stderr)

    except KeyboardInterrupt:
        print(f"\n[INTERRUPTED] Stopped at event {event_count}", file=sys.stderr)
    finally:
        signal.signal(signal.SIGINT, old_handler)


if __name__ == "__main__":
    print("\n" + "="*60)
    print("AutoGen Bug Reproduction: CancellationToken doesn't stop iteration")
    print("="*60)
    print()
    print("EXPECTED: After pressing Ctrl+C, loop stops immediately")
    print("ACTUAL: Loop continues processing all buffered events")
    print()
    print("Root cause: run_stream() has 'while True:' loop that never")
    print("checks cancellation_token.is_cancelled() before yielding")
    print()
    print("="*60 + "\n")

    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print("\n[EXIT] Application terminated by KeyboardInterrupt")
        sys.exit(130)

To run:

export OPENAI_API_KEY="your-key"
python repro.py
# Press Ctrl+C after a few events
# Observe that all remaining events are still processed

alexey-pelykh avatar Oct 29 '25 17:10 alexey-pelykh

I analyzed this issue but couldn't identify the affected files. Could you please provide more details about which files need to be modified?

priteesshhh avatar Nov 09 '25 18:11 priteesshhh

Hi! I'm interested in fixing this issue.

I've reviewed the detailed analysis and reproduction script provided by @alexey-pelykh. The root cause is clear: in _base_group_chat.py around line 544-557, the while True: loop in run_stream() doesn't check cancellation_token.is_cancelled() before yielding messages.

Current problematic code:

while True:
    message_future = asyncio.ensure_future(self._output_message_queue.get())
    if cancellation_token is not None:
        cancellation_token.link_future(message_future)
    message = await message_future  # Exception only raised here
    if isinstance(message, GroupChatTermination):
        break
    yield message  # ← Yields even if token was cancelled

Proposed fix: Add a cancellation check after await message_future and before yielding:

while True:
    message_future = asyncio.ensure_future(self._output_message_queue.get())
    if cancellation_token is not None:
        cancellation_token.link_future(message_future)
    message = await message_future
    
    # Add cancellation check here
    if cancellation_token is not None and cancellation_token.is_cancelled():
        break
    
    if isinstance(message, GroupChatTermination):
        break
    yield message

My plan:

  1. Implement the fix in _base_group_chat.py (add cancellation check before yield)
  2. Test with the provided reproduction script to verify it works
  3. Add unit tests to ensure cancellation works correctly with signal handlers
  4. Test on both Unix (SIGINT) and Windows (if applicable) platforms
  5. Ensure graceful cleanup without the task_done() errors mentioned in #7007

I'm ready to start working on this. The fix seems straightforward, and I'll make sure to test it thoroughly with the reproduction case provided.

Thanks!

Shizoqua avatar Nov 22 '25 12:11 Shizoqua