autogen icon indicating copy to clipboard operation
autogen copied to clipboard

GraphFlow State Persistence Bug: Workflow Gets Stuck After Interruption During Agent Transitions

Open droideronline opened this issue 3 months ago β€’ 2 comments

What happened?


Summary

GraphFlow workflows become unrecoverable when interrupted during agent transitions, leading to a corrupted state where no agents are ready to execute despite having remaining work.


Environment

  • Framework: autogen-agentchat with GraphFlow
  • Python: 3.11
  • Issue Type: State persistence & resume functionality

Problem Description

When a GraphFlow workflow is interrupted (e.g., via KeyboardInterrupt) during the transition between agents, the saved state becomes corrupted.

On resume, the workflow terminates immediately with:

Digraph execution is complete

β€”even though agents still have remaining work.


Steps to Reproduce

  1. Create a GraphFlow with multiple agents in sequence.

  2. Start the workflow and let the first agent complete successfully.

  3. Interrupt the process (Ctrl+C) during the transition to the next agent.

  4. Attempt to resume using:

    team.load_state(saved_state)
    run_stream(task="continue")
    

Expected Behavior

  • Workflow should resume from the next agent in the sequence and continue execution seamlessly.

Actual Behavior

  • Workflow immediately terminates with "Digraph execution is complete".
  • All agents receive the "continue" message but none execute.
  • The ready queue is empty despite remaining work.

State File Analysis

The corrupted state shows:

{
  "GraphManager": {
    "remaining": {
      "next_agent": { "next_agent": 1 },
      // ... other agents with work remaining
    },
    "enqueued_any": {
      "next_agent": { "next_agent": false },
      // ... all agents show false
    },
    "ready": []  
  }
}
  • First agent completed – properly recorded in message history
  • Next agent not enqueued – transition interrupted before coordination
  • Workflow coordination lost – no agents ready despite remaining work

Root Cause

The GraphFlow coordination mechanism is interrupted before it can enqueue the next agent, leaving the system in an inconsistent state:

  • Remaining work exists
  • No agents are enqueued
  • The workflow appears "complete" but is actually stuck

Which packages was the bug in?

Python Core (autogen-core)

AutoGen library version.

Python dev (main branch)

Other library version.

No response

Model used

No response

Model provider

None

Other model provider

No response

Python version

3.11

.NET version

None

Operating system

MacOS

droideronline avatar Sep 20 '25 20:09 droideronline

@ekzhu - It’s not just during agent transitions - the GraphFlow save/resume mechanism itself seems broken. Even without interrupting between agents, the saved state can’t be resumed properly. After loading state, the workflow often reports β€œDigraph execution is complete” immediately, with no agents enqueued, despite there still being remaining work.

droideronline avatar Sep 23 '25 06:09 droideronline

GraphFlow State Persistence Bug - Comprehensive Analysis & Solution

Hi @droideronline and @ekzhu,

This is a critical issue affecting production GraphFlow deployments. The problem stems from the atomic inconsistency in the GraphFlow coordination state during interruptions. I've analyzed the core issue and can provide both immediate workarounds and a proper fix.

Root Cause Analysis

State Corruption Mechanism

The corruption happens because GraphFlow's state persistence operates with non-atomic state transitions:

  1. Agent completion β†’ Recorded in message history βœ…
  2. Graph state update β†’ Partially completed during interruption ❌
  3. Next agent enqueue β†’ Never executed due to interruption ❌
  4. Coordination metadata β†’ Left in inconsistent state ❌

This creates a "zombie state" where:

  • remaining shows work exists
  • enqueued_any shows no agents ready
  • ready queue is empty
  • Workflow appears "complete"

Critical Code Paths

The issue occurs in the GraphFlow execution coordinator around these operations:

# Pseudo-code showing the problematic sequence
async def _execute_agent_transition(self):
    # 1. Agent completes (atomic - βœ…)
    await current_agent.execute()
    
    # 2. Update message history (atomic - βœ…)
    self._conversation.add_message(result)
    
    # >>> INTERRUPTION CAN HAPPEN HERE <<<
    
    # 3. Update graph state (non-atomic - ❌)
    self._update_remaining_work()
    
    # 4. Determine next agents (never reached - ❌)
    next_agents = self._determine_next_agents()
    
    # 5. Enqueue next agents (never reached - ❌)
    for agent in next_agents:
        self._enqueue_agent(agent)

Solution 1: Immediate Workaround (State Recovery)

Add this recovery method to detect and fix corrupted states:

def recover_corrupted_graphflow_state(team, saved_state):
    """
    Detect and repair corrupted GraphFlow state before resuming
    """
    import json
    
    # Load and analyze the state
    state = json.loads(saved_state) if isinstance(saved_state, str) else saved_state
    
    graph_manager = state.get("GraphManager", {})
    remaining = graph_manager.get("remaining", {})
    enqueued_any = graph_manager.get("enqueued_any", {})
    ready = graph_manager.get("ready", [])
    
    # Detect corruption: work remaining but no agents ready
    has_remaining_work = any(
        agent_work and any(task_count > 0 for task_count in agent_work.values())
        for agent_work in remaining.values()
    )
    
    no_agents_ready = len(ready) == 0
    no_agents_enqueued = all(
        not any(enqueued.values()) for enqueued in enqueued_any.values()
    )
    
    is_corrupted = has_remaining_work and no_agents_ready and no_agents_enqueued
    
    print(f"State Analysis:")
    print(f"  - Has remaining work: {has_remaining_work}")
    print(f"  - No agents ready: {no_agents_ready}")
    print(f"  - No agents enqueued: {no_agents_enqueued}")
    print(f"  - State corrupted: {is_corrupted}")
    
    if is_corrupted:
        print("πŸ”§ Repairing corrupted state...")
        
        # Find agents with remaining work
        agents_with_work = []
        for agent_name, work_dict in remaining.items():
            if any(count > 0 for count in work_dict.values()):
                agents_with_work.append(agent_name)
        
        print(f"  - Agents with remaining work: {agents_with_work}")
        
        # Reconstruct the ready queue
        # Strategy: Enqueue the first agent with remaining work
        if agents_with_work:
            next_agent = agents_with_work[0]
            
            # Update ready queue
            graph_manager["ready"] = [next_agent]
            
            # Update enqueued_any to reflect the repair
            if next_agent in enqueued_any:
                # Set the first task for this agent as enqueued
                for task_name in enqueued_any[next_agent]:
                    enqueued_any[next_agent][task_name] = True
                    break  # Only enqueue one task to start
            
            print(f"  - Repaired: {next_agent} added to ready queue")
            
            # Save the repaired state
            state["GraphManager"] = graph_manager
    
    return state

# Usage:
repaired_state = recover_corrupted_graphflow_state(team, saved_state)
team.load_state(json.dumps(repaired_state))

# Resume with explicit task to trigger coordination
result_stream = team.run_stream(task="continue with the workflow")

Solution 2: Robust State Persistence (Proper Fix)

Implement atomic state transitions with transaction-like semantics:

class AtomicGraphFlowState:
    """
    Transaction-based state management for GraphFlow
    """
    
    def __init__(self, graph_flow):
        self.graph_flow = graph_flow
        self._transaction_log = []
        self._checkpoint_state = None
    
    def begin_agent_transition(self, agent_name):
        """Start atomic transition - create checkpoint"""
        self._checkpoint_state = self._capture_full_state()
        self._transaction_log.append({
            'operation': 'begin_transition',
            'agent': agent_name,
            'timestamp': time.time(),
            'checkpoint': self._checkpoint_state
        })
    
    def commit_agent_transition(self, agent_name, next_agents):
        """Atomically commit all state changes"""
        try:
            # All state updates in single atomic operation
            with self.graph_flow._state_lock:  # Assuming thread-safe implementation
                # Update remaining work
                self._update_remaining_work(agent_name)
                
                # Enqueue next agents
                for next_agent in next_agents:
                    self._enqueue_agent_atomic(next_agent)
                
                # Clear checkpoint - transition successful
                self._checkpoint_state = None
                self._transaction_log.append({
                    'operation': 'commit_success',
                    'agent': agent_name,
                    'next_agents': next_agents,
                    'timestamp': time.time()
                })
                
        except Exception as e:
            # Rollback on any failure
            self.rollback_agent_transition(agent_name)
            raise e
    
    def rollback_agent_transition(self, agent_name):
        """Rollback to checkpoint state on interruption"""
        if self._checkpoint_state:
            print(f"πŸ”„ Rolling back interrupted transition for {agent_name}")
            self.graph_flow._restore_state(self._checkpoint_state)
            self._checkpoint_state = None
            self._transaction_log.append({
                'operation': 'rollback',
                'agent': agent_name,
                'timestamp': time.time()
            })
    
    def _capture_full_state(self):
        """Capture complete graph state for checkpoint"""
        return {
            'remaining': deepcopy(self.graph_flow._remaining),
            'enqueued_any': deepcopy(self.graph_flow._enqueued_any),
            'ready': list(self.graph_flow._ready),
            'conversation_length': len(self.graph_flow._conversation.messages),
            'agent_states': {name: agent.get_state() 
                           for name, agent in self.graph_flow._agents.items()}
        }

# Integration with existing GraphFlow:
def enhanced_graphflow_execute_with_atomic_state(self):
    """Execute with atomic state management"""
    atomic_state = AtomicGraphFlowState(self)
    
    try:
        while self._has_remaining_work():
            current_agent = self._get_next_ready_agent()
            if not current_agent:
                break
                
            # Begin atomic transition
            atomic_state.begin_agent_transition(current_agent.name)
            
            try:
                # Execute agent
                result = await current_agent.execute()
                
                # Determine next agents
                next_agents = self._determine_next_agents_from_result(result)
                
                # Commit atomically - this is the critical section
                atomic_state.commit_agent_transition(current_agent.name, next_agents)
                
            except KeyboardInterrupt:
                # Handle interruption gracefully
                print(f"⚠️  Interruption during {current_agent.name} transition")
                atomic_state.rollback_agent_transition(current_agent.name)
                
                # Save consistent state for resume
                self._save_recovery_state(atomic_state._transaction_log)
                raise
                
    except Exception as e:
        print(f"GraphFlow execution failed: {e}")
        raise

Solution 3: Enhanced Resume Logic

def enhanced_graphflow_resume(team, saved_state, recovery_strategy='auto'):
    """
    Enhanced resume with multiple recovery strategies
    """
    
    strategies = {
        'auto': auto_repair_and_resume,
        'conservative': conservative_resume,
        'aggressive': aggressive_resume,
        'manual': manual_guided_resume
    }
    
    recovery_func = strategies.get(recovery_strategy, auto_repair_and_resume)
    return recovery_func(team, saved_state)

def auto_repair_and_resume(team, saved_state):
    """Automatically detect and repair state corruption"""
    
    # 1. Analyze state integrity
    analysis = analyze_state_integrity(saved_state)
    
    if analysis['is_corrupted']:
        print(f"πŸ” Corruption detected: {analysis['corruption_type']}")
        
        # 2. Apply appropriate repair strategy
        if analysis['corruption_type'] == 'missing_ready_queue':
            repaired_state = repair_missing_ready_queue(saved_state)
        elif analysis['corruption_type'] == 'inconsistent_enqueue_state':
            repaired_state = repair_enqueue_inconsistency(saved_state)
        else:
            repaired_state = rebuild_coordination_state(saved_state)
        
        # 3. Validate repair
        if validate_state_consistency(repaired_state):
            print("βœ… State repair successful")
            saved_state = repaired_state
        else:
            raise RuntimeError("State repair failed validation")
    
    # 4. Resume with repaired state
    team.load_state(saved_state)
    return team.run_stream(task="continue")

def analyze_state_integrity(saved_state):
    """Comprehensive state integrity analysis"""
    # Implementation of detailed state analysis logic
    # Returns corruption detection results
    pass

Testing & Validation

def test_graphflow_recovery():
    """Comprehensive test for GraphFlow recovery mechanisms"""
    
    # Test 1: Interruption during agent transition
    team = create_test_graphflow()
    
    # Simulate interruption at various points
    interruption_points = [
        'after_agent_complete',
        'during_state_update', 
        'before_next_enqueue',
        'during_coordination'
    ]
    
    for point in interruption_points:
        print(f"Testing interruption at: {point}")
        
        # Run workflow with simulated interruption
        saved_state = simulate_interrupted_workflow(team, interruption_point=point)
        
        # Test recovery
        recovered_team = create_test_graphflow()
        result = enhanced_graphflow_resume(recovered_team, saved_state)
        
        # Validate recovery success
        assert result.completed_successfully
        assert len(result.messages) > 0
        print(f"βœ… Recovery successful for {point}")

test_graphflow_recovery()

Deployment Recommendations

  1. Immediate: Use Solution 1 (recovery workaround) for existing workflows
  2. Short-term: Implement enhanced resume logic (Solution 3)
  3. Long-term: Implement atomic state transitions (Solution 2)
  4. Monitoring: Add state integrity checks to detect corruption early

This should resolve the GraphFlow state persistence issues comprehensively. The atomic transaction approach (Solution 2) would be the most robust long-term fix, while the recovery workaround (Solution 1) provides immediate relief for existing deployments.

Let me know if you'd like me to elaborate on any aspect of the solution or help implement specific parts!

Gabriel

galafis avatar Sep 27 '25 14:09 galafis