CancellationToken doesn't stop run_stream() iteration when cancelled by signal handler
CancellationToken doesn't stop run_stream() iteration when signal handler cancels token
Summary
When using run_stream() with a CancellationToken that gets cancelled by a signal handler (e.g., SIGINT/Ctrl+C), the async iteration loop continues processing buffered messages instead of stopping immediately.
Environment
- AutoGen Version: 0.7.5
- Python Version: 3.13.9
- OS: macOS Darwin 25.0.0
Expected Behavior
After cancelling the CancellationToken (via signal handler or programmatically), the run_stream() async iteration should stop immediately.
Actual Behavior
The loop continues processing all buffered messages before checking cancellation status. The application appears to hang until all messages are drained from the queue.
Root Cause
In autogen_agentchat/teams/_group_chat/_base_group_chat.py, the run_stream() implementation has a while True: loop that never checks cancellation_token.is_cancelled() before yielding:
while True: # ← Never checks token status
message_future = asyncio.ensure_future(self._output_message_queue.get())
if cancellation_token is not None:
cancellation_token.link_future(message_future)
# Wait for the next message, this will raise an exception if the task is cancelled.
message = await message_future # ← Exception only raised HERE
# ... process message ...
yield message # ← Messages yielded even if token was just cancelled
Problem: If the queue has buffered messages, they get yielded before the next await message_future can raise the cancellation exception.
Reproduction
import asyncio
import signal
import sys
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.conditions import MaxMessageTermination
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_core import CancellationToken
from autogen_ext.models.openai import OpenAIChatCompletionClient
async def main():
model_client = OpenAIChatCompletionClient(model="gpt-4o-mini")
agent = AssistantAgent("Assistant", model_client=model_client)
team = RoundRobinGroupChat([agent], termination_condition=MaxMessageTermination(10))
token = CancellationToken()
def signal_handler(signum, frame):
print("Cancelling token...")
token.cancel()
# Without raising KeyboardInterrupt, loop continues
signal.signal(signal.SIGINT, signal_handler)
print("Press Ctrl+C during streaming...")
event_count = 0
async for event in team.run_stream(task="Count 1 to 20", cancellation_token=token):
event_count += 1
print(f"Event {event_count}")
print(f"Processed {event_count} events") # Processes ALL events despite Ctrl+C
asyncio.run(main())
Result: After pressing Ctrl+C, all remaining events are still processed instead of stopping immediately.
Workaround
Raise KeyboardInterrupt in the signal handler to forcefully break the async iteration loop:
def signal_handler(signum, frame):
token.cancel()
raise KeyboardInterrupt() # Force break
Side effect: This causes asyncio.CancelledError and ValueError: task_done() called too many times during cleanup (see issue #7007).
Suggested Fix
Check cancellation status before yielding in the loop:
while True:
message_future = asyncio.ensure_future(self._output_message_queue.get())
if cancellation_token is not None:
cancellation_token.link_future(message_future)
message = await message_future
# Add this check:
if cancellation_token is not None and cancellation_token.is_cancelled():
break
if isinstance(message, GroupChatTermination):
# ... existing code ...
break
yield message
Related Issues
- #4029 - Claimed to add cancellation token support, but only tested async cancellation (not SIGINT)
- #7007 - Reports
task_done() called too many timeserror (symptom of cancellation bugs) - #4776 - Mentions
SequentialRoutedAgentshould handle cancellation as event handler
Additional Context
The issue is particularly problematic for interactive CLI applications where users expect Ctrl+C to stop execution immediately. The current behavior makes applications appear unresponsive to user interruption.
Your own documentation acknowledges this limitation:
"Setting the cancellation token potentially put the team in an inconsistent state"
A proper cancellation mechanism should gracefully stop iteration without requiring KeyboardInterrupt.
Minimal Reproduction Script
#!/usr/bin/env python3
"""
Minimal reproduction for AutoGen bug: CancellationToken doesn't stop run_stream iteration
Issue: When CancellationToken is cancelled, the run_stream() loop continues processing
buffered messages instead of stopping immediately.
Expected: Loop stops when token is cancelled
Actual: Loop drains all buffered messages before checking cancellation
"""
import asyncio
import signal
import sys
from typing import Any
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.conditions import MaxMessageTermination
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_core import CancellationToken
from autogen_ext.models.openai import OpenAIChatCompletionClient
async def main() -> None:
"""Demonstrate that Ctrl+C (SIGINT) doesn't stop run_stream iteration."""
# Setup
model_client = OpenAIChatCompletionClient(model="gpt-4o-mini")
agent1 = AssistantAgent("Assistant1", model_client=model_client)
agent2 = AssistantAgent("Assistant2", model_client=model_client)
termination = MaxMessageTermination(10)
team = RoundRobinGroupChat([agent1, agent2], termination_condition=termination)
cancellation_token = CancellationToken()
# Install signal handler that cancels token
def signal_handler(signum: int, frame: Any) -> None:
print("\n[SIGINT] Cancelling token...", file=sys.stderr, flush=True)
cancellation_token.cancel()
print(f"[SIGINT] Token cancelled: {cancellation_token.is_cancelled()}", file=sys.stderr, flush=True)
# NOTE: Without raising KeyboardInterrupt, the loop continues!
# Uncomment next line to see the fix:
# raise KeyboardInterrupt()
old_handler = signal.signal(signal.SIGINT, signal_handler)
print("Starting workflow...", file=sys.stderr)
print("Press Ctrl+C to test cancellation behavior", file=sys.stderr)
print("-" * 60, file=sys.stderr)
try:
event_count = 0
async for event in team.run_stream(
task="Count from 1 to 20, one number per message",
cancellation_token=cancellation_token,
):
event_count += 1
print(f"[Event {event_count}] {type(event).__name__}", file=sys.stderr, flush=True)
print(f"\n[DONE] Processed {event_count} events", file=sys.stderr)
except KeyboardInterrupt:
print(f"\n[INTERRUPTED] Stopped at event {event_count}", file=sys.stderr)
finally:
signal.signal(signal.SIGINT, old_handler)
if __name__ == "__main__":
print("\n" + "="*60)
print("AutoGen Bug Reproduction: CancellationToken doesn't stop iteration")
print("="*60)
print()
print("EXPECTED: After pressing Ctrl+C, loop stops immediately")
print("ACTUAL: Loop continues processing all buffered events")
print()
print("Root cause: run_stream() has 'while True:' loop that never")
print("checks cancellation_token.is_cancelled() before yielding")
print()
print("="*60 + "\n")
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\n[EXIT] Application terminated by KeyboardInterrupt")
sys.exit(130)
To run:
export OPENAI_API_KEY="your-key"
python repro.py
# Press Ctrl+C after a few events
# Observe that all remaining events are still processed
I analyzed this issue but couldn't identify the affected files. Could you please provide more details about which files need to be modified?
Hi! I'm interested in fixing this issue.
I've reviewed the detailed analysis and reproduction script provided by @alexey-pelykh. The root cause is clear: in _base_group_chat.py around line 544-557, the while True: loop in run_stream() doesn't check cancellation_token.is_cancelled() before yielding messages.
Current problematic code:
while True:
message_future = asyncio.ensure_future(self._output_message_queue.get())
if cancellation_token is not None:
cancellation_token.link_future(message_future)
message = await message_future # Exception only raised here
if isinstance(message, GroupChatTermination):
break
yield message # ← Yields even if token was cancelled
Proposed fix:
Add a cancellation check after await message_future and before yielding:
while True:
message_future = asyncio.ensure_future(self._output_message_queue.get())
if cancellation_token is not None:
cancellation_token.link_future(message_future)
message = await message_future
# Add cancellation check here
if cancellation_token is not None and cancellation_token.is_cancelled():
break
if isinstance(message, GroupChatTermination):
break
yield message
My plan:
- Implement the fix in
_base_group_chat.py(add cancellation check before yield) - Test with the provided reproduction script to verify it works
- Add unit tests to ensure cancellation works correctly with signal handlers
- Test on both Unix (SIGINT) and Windows (if applicable) platforms
- Ensure graceful cleanup without the
task_done()errors mentioned in #7007
I'm ready to start working on this. The fix seems straightforward, and I'll make sure to test it thoroughly with the reproduction case provided.
Thanks!