GraphFlow State Persistence Bug: Workflow Gets Stuck After Interruption During Agent Transitions
What happened?
Summary
GraphFlow workflows become unrecoverable when interrupted during agent transitions, leading to a corrupted state where no agents are ready to execute despite having remaining work.
Environment
- Framework:
autogen-agentchatwith GraphFlow - Python: 3.11
- Issue Type: State persistence & resume functionality
Problem Description
When a GraphFlow workflow is interrupted (e.g., via KeyboardInterrupt) during the transition between agents, the saved state becomes corrupted.
On resume, the workflow terminates immediately with:
Digraph execution is complete
βeven though agents still have remaining work.
Steps to Reproduce
-
Create a GraphFlow with multiple agents in sequence.
-
Start the workflow and let the first agent complete successfully.
-
Interrupt the process (
Ctrl+C) during the transition to the next agent. -
Attempt to resume using:
team.load_state(saved_state) run_stream(task="continue")
Expected Behavior
- Workflow should resume from the next agent in the sequence and continue execution seamlessly.
Actual Behavior
- Workflow immediately terminates with
"Digraph execution is complete". - All agents receive the
"continue"message but none execute. - The ready queue is empty despite remaining work.
State File Analysis
The corrupted state shows:
{
"GraphManager": {
"remaining": {
"next_agent": { "next_agent": 1 },
// ... other agents with work remaining
},
"enqueued_any": {
"next_agent": { "next_agent": false },
// ... all agents show false
},
"ready": []
}
}
- First agent completed β properly recorded in message history
- Next agent not enqueued β transition interrupted before coordination
- Workflow coordination lost β no agents ready despite remaining work
Root Cause
The GraphFlow coordination mechanism is interrupted before it can enqueue the next agent, leaving the system in an inconsistent state:
- Remaining work exists
- No agents are enqueued
- The workflow appears "complete" but is actually stuck
Which packages was the bug in?
Python Core (autogen-core)
AutoGen library version.
Python dev (main branch)
Other library version.
No response
Model used
No response
Model provider
None
Other model provider
No response
Python version
3.11
.NET version
None
Operating system
MacOS
@ekzhu - Itβs not just during agent transitions - the GraphFlow save/resume mechanism itself seems broken. Even without interrupting between agents, the saved state canβt be resumed properly. After loading state, the workflow often reports βDigraph execution is completeβ immediately, with no agents enqueued, despite there still being remaining work.
GraphFlow State Persistence Bug - Comprehensive Analysis & Solution
Hi @droideronline and @ekzhu,
This is a critical issue affecting production GraphFlow deployments. The problem stems from the atomic inconsistency in the GraphFlow coordination state during interruptions. I've analyzed the core issue and can provide both immediate workarounds and a proper fix.
Root Cause Analysis
State Corruption Mechanism
The corruption happens because GraphFlow's state persistence operates with non-atomic state transitions:
- Agent completion β Recorded in message history β
- Graph state update β Partially completed during interruption β
- Next agent enqueue β Never executed due to interruption β
- Coordination metadata β Left in inconsistent state β
This creates a "zombie state" where:
remainingshows work existsenqueued_anyshows no agents readyreadyqueue is empty- Workflow appears "complete"
Critical Code Paths
The issue occurs in the GraphFlow execution coordinator around these operations:
# Pseudo-code showing the problematic sequence
async def _execute_agent_transition(self):
# 1. Agent completes (atomic - β
)
await current_agent.execute()
# 2. Update message history (atomic - β
)
self._conversation.add_message(result)
# >>> INTERRUPTION CAN HAPPEN HERE <<<
# 3. Update graph state (non-atomic - β)
self._update_remaining_work()
# 4. Determine next agents (never reached - β)
next_agents = self._determine_next_agents()
# 5. Enqueue next agents (never reached - β)
for agent in next_agents:
self._enqueue_agent(agent)
Solution 1: Immediate Workaround (State Recovery)
Add this recovery method to detect and fix corrupted states:
def recover_corrupted_graphflow_state(team, saved_state):
"""
Detect and repair corrupted GraphFlow state before resuming
"""
import json
# Load and analyze the state
state = json.loads(saved_state) if isinstance(saved_state, str) else saved_state
graph_manager = state.get("GraphManager", {})
remaining = graph_manager.get("remaining", {})
enqueued_any = graph_manager.get("enqueued_any", {})
ready = graph_manager.get("ready", [])
# Detect corruption: work remaining but no agents ready
has_remaining_work = any(
agent_work and any(task_count > 0 for task_count in agent_work.values())
for agent_work in remaining.values()
)
no_agents_ready = len(ready) == 0
no_agents_enqueued = all(
not any(enqueued.values()) for enqueued in enqueued_any.values()
)
is_corrupted = has_remaining_work and no_agents_ready and no_agents_enqueued
print(f"State Analysis:")
print(f" - Has remaining work: {has_remaining_work}")
print(f" - No agents ready: {no_agents_ready}")
print(f" - No agents enqueued: {no_agents_enqueued}")
print(f" - State corrupted: {is_corrupted}")
if is_corrupted:
print("π§ Repairing corrupted state...")
# Find agents with remaining work
agents_with_work = []
for agent_name, work_dict in remaining.items():
if any(count > 0 for count in work_dict.values()):
agents_with_work.append(agent_name)
print(f" - Agents with remaining work: {agents_with_work}")
# Reconstruct the ready queue
# Strategy: Enqueue the first agent with remaining work
if agents_with_work:
next_agent = agents_with_work[0]
# Update ready queue
graph_manager["ready"] = [next_agent]
# Update enqueued_any to reflect the repair
if next_agent in enqueued_any:
# Set the first task for this agent as enqueued
for task_name in enqueued_any[next_agent]:
enqueued_any[next_agent][task_name] = True
break # Only enqueue one task to start
print(f" - Repaired: {next_agent} added to ready queue")
# Save the repaired state
state["GraphManager"] = graph_manager
return state
# Usage:
repaired_state = recover_corrupted_graphflow_state(team, saved_state)
team.load_state(json.dumps(repaired_state))
# Resume with explicit task to trigger coordination
result_stream = team.run_stream(task="continue with the workflow")
Solution 2: Robust State Persistence (Proper Fix)
Implement atomic state transitions with transaction-like semantics:
class AtomicGraphFlowState:
"""
Transaction-based state management for GraphFlow
"""
def __init__(self, graph_flow):
self.graph_flow = graph_flow
self._transaction_log = []
self._checkpoint_state = None
def begin_agent_transition(self, agent_name):
"""Start atomic transition - create checkpoint"""
self._checkpoint_state = self._capture_full_state()
self._transaction_log.append({
'operation': 'begin_transition',
'agent': agent_name,
'timestamp': time.time(),
'checkpoint': self._checkpoint_state
})
def commit_agent_transition(self, agent_name, next_agents):
"""Atomically commit all state changes"""
try:
# All state updates in single atomic operation
with self.graph_flow._state_lock: # Assuming thread-safe implementation
# Update remaining work
self._update_remaining_work(agent_name)
# Enqueue next agents
for next_agent in next_agents:
self._enqueue_agent_atomic(next_agent)
# Clear checkpoint - transition successful
self._checkpoint_state = None
self._transaction_log.append({
'operation': 'commit_success',
'agent': agent_name,
'next_agents': next_agents,
'timestamp': time.time()
})
except Exception as e:
# Rollback on any failure
self.rollback_agent_transition(agent_name)
raise e
def rollback_agent_transition(self, agent_name):
"""Rollback to checkpoint state on interruption"""
if self._checkpoint_state:
print(f"π Rolling back interrupted transition for {agent_name}")
self.graph_flow._restore_state(self._checkpoint_state)
self._checkpoint_state = None
self._transaction_log.append({
'operation': 'rollback',
'agent': agent_name,
'timestamp': time.time()
})
def _capture_full_state(self):
"""Capture complete graph state for checkpoint"""
return {
'remaining': deepcopy(self.graph_flow._remaining),
'enqueued_any': deepcopy(self.graph_flow._enqueued_any),
'ready': list(self.graph_flow._ready),
'conversation_length': len(self.graph_flow._conversation.messages),
'agent_states': {name: agent.get_state()
for name, agent in self.graph_flow._agents.items()}
}
# Integration with existing GraphFlow:
def enhanced_graphflow_execute_with_atomic_state(self):
"""Execute with atomic state management"""
atomic_state = AtomicGraphFlowState(self)
try:
while self._has_remaining_work():
current_agent = self._get_next_ready_agent()
if not current_agent:
break
# Begin atomic transition
atomic_state.begin_agent_transition(current_agent.name)
try:
# Execute agent
result = await current_agent.execute()
# Determine next agents
next_agents = self._determine_next_agents_from_result(result)
# Commit atomically - this is the critical section
atomic_state.commit_agent_transition(current_agent.name, next_agents)
except KeyboardInterrupt:
# Handle interruption gracefully
print(f"β οΈ Interruption during {current_agent.name} transition")
atomic_state.rollback_agent_transition(current_agent.name)
# Save consistent state for resume
self._save_recovery_state(atomic_state._transaction_log)
raise
except Exception as e:
print(f"GraphFlow execution failed: {e}")
raise
Solution 3: Enhanced Resume Logic
def enhanced_graphflow_resume(team, saved_state, recovery_strategy='auto'):
"""
Enhanced resume with multiple recovery strategies
"""
strategies = {
'auto': auto_repair_and_resume,
'conservative': conservative_resume,
'aggressive': aggressive_resume,
'manual': manual_guided_resume
}
recovery_func = strategies.get(recovery_strategy, auto_repair_and_resume)
return recovery_func(team, saved_state)
def auto_repair_and_resume(team, saved_state):
"""Automatically detect and repair state corruption"""
# 1. Analyze state integrity
analysis = analyze_state_integrity(saved_state)
if analysis['is_corrupted']:
print(f"π Corruption detected: {analysis['corruption_type']}")
# 2. Apply appropriate repair strategy
if analysis['corruption_type'] == 'missing_ready_queue':
repaired_state = repair_missing_ready_queue(saved_state)
elif analysis['corruption_type'] == 'inconsistent_enqueue_state':
repaired_state = repair_enqueue_inconsistency(saved_state)
else:
repaired_state = rebuild_coordination_state(saved_state)
# 3. Validate repair
if validate_state_consistency(repaired_state):
print("β
State repair successful")
saved_state = repaired_state
else:
raise RuntimeError("State repair failed validation")
# 4. Resume with repaired state
team.load_state(saved_state)
return team.run_stream(task="continue")
def analyze_state_integrity(saved_state):
"""Comprehensive state integrity analysis"""
# Implementation of detailed state analysis logic
# Returns corruption detection results
pass
Testing & Validation
def test_graphflow_recovery():
"""Comprehensive test for GraphFlow recovery mechanisms"""
# Test 1: Interruption during agent transition
team = create_test_graphflow()
# Simulate interruption at various points
interruption_points = [
'after_agent_complete',
'during_state_update',
'before_next_enqueue',
'during_coordination'
]
for point in interruption_points:
print(f"Testing interruption at: {point}")
# Run workflow with simulated interruption
saved_state = simulate_interrupted_workflow(team, interruption_point=point)
# Test recovery
recovered_team = create_test_graphflow()
result = enhanced_graphflow_resume(recovered_team, saved_state)
# Validate recovery success
assert result.completed_successfully
assert len(result.messages) > 0
print(f"β
Recovery successful for {point}")
test_graphflow_recovery()
Deployment Recommendations
- Immediate: Use Solution 1 (recovery workaround) for existing workflows
- Short-term: Implement enhanced resume logic (Solution 3)
- Long-term: Implement atomic state transitions (Solution 2)
- Monitoring: Add state integrity checks to detect corruption early
This should resolve the GraphFlow state persistence issues comprehensively. The atomic transaction approach (Solution 2) would be the most robust long-term fix, while the recovery workaround (Solution 1) provides immediate relief for existing deployments.
Let me know if you'd like me to elaborate on any aspect of the solution or help implement specific parts!
Gabriel