feat: ParallelAgentRunner
Why are these changes needed?
This PR introduces a new ParallelAgentRunner Class which let's you run your agents in parallel. The ParallelAgentRunner adds native parallel execution capabilities to AG2, allowing multiple agents to run concurrently.
Basic Task Structure:
from autogen import ParallelAgentRunner
# Create a parallel runner
parallel_runner = ParallelAgentRunner(
max_workers=8, # or use default calculation
timeout=300, # 5 minutes per task
handle_errors='collect' # 'collect', 'fail_fast', or 'ignore'
)
# Define parallel tasks
tasks = [
{
'name': 'symptom_analysis',
'agent_config': {...},
'prompt': 'Analyze patient symptoms...'
},
{
'name': 'drug_interaction',
'agent_config': {...},
'prompt': 'Check drug interactions...'
}
]
# Execute in parallel
results = parallel_runner.run(tasks)
Related issue number
Checks
- [ ] I've included any doc changes needed for https://docs.ag2.ai/. See https://docs.ag2.ai/latest/docs/contributor-guide/documentation/ to build and test documentation locally.
- [ ] I've added tests (if relevant) corresponding to the changes introduced in this PR.
- [ ] I've made sure all auto checks have passed.
Joggr analysis skipped - Draft PR
Codecov Report
:x: Patch coverage is 35.29412% with 66 lines in your changes missing coverage. Please review.
| Files with missing lines | Patch % | Lines |
|---|---|---|
| autogen/agentchat/run_parallel_agents.py | 35.29% | 66 Missing :warning: |
| Files with missing lines | Coverage Δ | |
|---|---|---|
| autogen/agentchat/run_parallel_agents.py | 35.29% <35.29%> (ø) |
... and 20 files with indirect coverage changes
:rocket: New features to boost your workflow:
- :snowflake: Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
- :package: JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.
Code Review for PR #2143: ParallelAgentRunner
Thank you for this contribution! The ParallelAgentRunner is an interesting addition that brings parallel execution capabilities to AG2. Here's my comprehensive review:
✅ Strengths
- Clean API Design: The task-based approach with
ParallelTaskandParallelTaskResultdataclasses is intuitive - Good Error Handling: Three error strategies (
collect,fail_fast,ignore) provide flexibility - Proper Logging: Comprehensive logging throughout execution
- Documentation: Well-documented docstrings and examples
- Backward Compatibility: Helper function
run_agents_parallelprovided
🐛 Critical Issues
1. Incorrect API Usage (autogen/agentchat/run_parallel_agents.py:160)
response = agent.run(message=task.prompt, max_turns=task.max_turns)
response.process()
The agent.run() method returns a RunResponse object that needs to be consumed via iteration or async processing. The current usage likely won't work as intended. You should either:
- Iterate through the response:
for event in response: ... - Or use
initiate_chat()directly if synchronous blocking behavior is desired
2. Timeout Not Enforced Per-Task (autogen/agentchat/run_parallel_agents.py:259)
The timeout parameter is passed to as_completed() which applies to the entire batch, not individual tasks. Individual long-running tasks can still exceed the timeout. Consider:
future = executor.submit(self._execute_single_task, task)
result = future.result(timeout=self.timeout) # Per-task timeout
3. Missing Cleanup on Failure (autogen/agentchat/run_parallel_agents.py:265-270)
When fail_fast is triggered, remaining futures are cancelled but agents may not be properly cleaned up (closed connections, released resources). Consider implementing proper cleanup.
4. Agent Copy Logic Issue (autogen/agentchat/run_parallel_agents.py:123-129)
When copying a ConversableAgent, only a subset of attributes are copied. This could lead to inconsistent behavior if the original agent has custom reply functions, tool registrations, or other state. Consider deep copying or documenting this limitation.
⚠️ Major Issues
5. No Thread Safety Considerations
ConversableAgent instances may not be thread-safe (chat history, internal state). Creating separate instances per task is good, but document this clearly and warn against sharing agent instances.
6. Missing Tests ❌
No test file was included. This is a required addition per the PR checklist. Essential tests needed:
- Basic parallel execution
- Error handling (collect, fail_fast, ignore)
- Timeout behavior
- Agent creation from dict vs instance
- Thread safety/concurrent execution
- Edge cases (empty task list, duplicate names)
7. Not Exported in Module (autogen/agentchat/init.py)
The ParallelAgentRunner class is not added to __init__.py, making it non-discoverable via from autogen import ParallelAgentRunner. Add it to the module exports.
8. TimeoutError Handling (autogen/agentchat/run_parallel_agents.py:259)
as_completed() can raise concurrent.futures.TimeoutError, but this isn't caught. If the global timeout is reached, it will crash instead of gracefully handling it per the error strategy.
🔧 Code Quality Issues
9. Unnecessary load_dotenv() (autogen/agentchat/run_parallel_agents.py:17)
Loading environment variables at module import is a side effect. This should be the user's responsibility or documented why it's necessary here.
10. Inconsistent Context Handling
The context field in ParallelTask is defined but never used. Either implement it or remove it.
11. Type Hint Inconsistency (autogen/agentchat/run_parallel_agents.py:502)
summary_args: dict[str, Any] | None = {} - Default should be None, not {} (mutable default argument anti-pattern, though mitigated by the | None annotation).
🚀 Performance Considerations
12. ThreadPoolExecutor for I/O-bound Operations
Good choice for LLM API calls (I/O-bound). However:
- Consider documenting that this won't help CPU-bound operations
- GIL limitations mean CPU-bound agent operations won't benefit
- For true parallelism with CPU-bound work,
ProcessPoolExecutorwould be needed (but has serialization constraints)
13. Agent Creation Overhead
Creating a new agent per task (line 136) adds overhead. Consider reusing agents if tasks use identical configs.
🔒 Security Concerns
14. No Input Validation
- Task names could contain malicious content used in logging
- No validation of
max_workers(could be set to extreme values) - No validation of timeout values (negative/extreme values)
15. Exception Information Leakage (autogen/agentchat/run_parallel_agents.py:179)
error_msg = f"{type(e).__name__}: {str(e)}" might expose sensitive information in error messages (API keys, file paths, etc.). Consider sanitizing.
📝 Documentation & Best Practices
16. Missing Documentation
Per the PR checklist, documentation is needed at https://docs.ag2.ai/. Users need:
- When to use parallel vs sequential execution
- Thread safety considerations
- Performance characteristics
- Example notebooks
17. Unclear Use Case
The PR description mentions medical use cases, but those examples suggest agents that should probably share context between tasks (symptom analysis informing drug interaction checks). Pure parallel execution might not be the best pattern for that scenario. Consider documenting when parallel is appropriate vs when sequential/orchestrated is better.
🎯 Recommendations
High Priority
- ✅ Add comprehensive test suite
- ✅ Fix the
agent.run()API usage - ✅ Implement per-task timeout correctly
- ✅ Export class in
__init__.py - ✅ Handle
TimeoutErrorfromas_completed()
Medium Priority
- Add proper agent cleanup on failure
- Document thread safety considerations
- Remove unused
contextfield or implement it - Add input validation
- Add documentation and example notebooks
Low Priority
- Remove
load_dotenv()side effect - Consider agent reuse for identical configs
- Add sanitization for error messages
Summary
This is a solid foundation for parallel agent execution with good architectural decisions. However, it needs:
- Critical API fix for agent.run() usage
- Tests (required by PR checklist)
- Timeout implementation correction
- Module export to make it discoverable
Once these issues are addressed, this will be a valuable addition to AG2. Great work on the overall structure! 🎉
hows this going?
@priyansh4320 i need this asap lol i can't wait to use it!!!! :)