Autogen rate_limit_reached_error - how to set a rate limit?
What happened?
[code] task = "Who was the Miami Heat player with the highest points in the 2006-2007 season, and what was the percentage change in his total rebounds between the 2007-2008 and 2008-2009 seasons?"
Use asyncio.run(...) if you are running this in a script.
await Console(team.run_stream(task=task))
[error]
File "/root/miniconda3/envs/llama-factory/lib/python3.11/site-packages/openai/_base_client.py", line 1666, in _retry_request
return await self._request(
^^^^^^^^^^^^^^^^^^^^
File "/root/miniconda3/envs/llama-factory/lib/python3.11/site-packages/openai/_base_client.py", line 1634, in _request
raise self._make_status_error_from_response(err.response) from None
openai.RateLimitError: Error code: 429 - {'error': {'message': 'Your account crobsbebi7sb70r8ack0
What did you expect to happen?
not error
How can we reproduce it (as minimally and precisely as possible)?
use kimi free api
AutoGen version
0.4.11
Which package was this bug in
AgentChat
Model used
kimi
Python version
No response
Operating system
No response
Any additional info you think would be helpful for fixing this bug
No response
There is not currently functionality for rate limiting the AutoGen OpenAI model client.
However, it would be easy to add a model client which wraps the OpenAI one and either implements rate limiting or backoff in the presence of rate limit errors.
See here for guidance from openai: https://cookbook.openai.com/examples/how_to_handle_rate_limits
Would you be interested in contributing a model client like this?
@aizhweiwei here is a implementation how to have a rate limiter with AutoGen for Gemini
# CREATE THE FILE: rate_limiter_agent.py
import time
import threading
import asyncio
from collections import deque # More efficient for removing old timestamps
import logging
from typing import Optional, Union, List, Dict, Any, Tuple, Literal
from shared_rate_limiter import SharedRateLimiter
import autogen
from typing import (
Any,
Callable,
Generator,
Iterable,
Literal,
Optional,
TypeVar,
Union,
)
# Assuming the original ConversableAgent code is in a file named 'conversable_agent.py'
# or available in the autogen package path.
# If it's in a local file, adjust the import path:
# from .conversable_agent import ConversableAgent, Agent, LLMAgent, ChatResult, UpdateSystemMessage, ...
# If using the installed package:
from autogen.agentchat.conversable_agent import ConversableAgent, Agent
# Import other necessary types/classes if needed for full context
from autogen.oai.client import OpenAIWrapper
logger = logging.getLogger(__name__)
class RateLimitedConversableAgent(ConversableAgent):
"""
A subclass of ConversableAgent that adds rate limiting to OAI (e.g., Gemini) API calls.
This agent limits the number of calls made through `generate_oai_reply` and
`a_generate_oai_reply` based on the specified rate.
"""
DEFAULT_SUMMARY_METHOD = "last_msg"
def __init__(
self,
name: str,
rate_limit_calls: int = 10, # Max calls (e.g., 5 for Gemini free tier)
rate_limit_period: float = 60.0, # Period in seconds (e.g., 60 for per minute)
**kwargs, # Pass other ConversableAgent args
):
print('-------- Initiate RATE LIMITED CONVERSABLE')
"""
Args:
name (str): Name of the agent.
rate_limit_calls (int): Maximum number of API calls allowed within the period.
rate_limit_period (float): The time window in seconds for the rate limit.
**kwargs: Other arguments to pass to the ConversableAgent constructor.
"""
super().__init__(name=name, **kwargs)
self.rate_limit_calls = rate_limit_calls
self.rate_limit_period = rate_limit_period
logger.info(
f"Rate limiter initialized for agent '{self.name}': "
f"{self.rate_limit_calls} calls / {self.rate_limit_period} seconds"
)
def _get_wait_time(self) -> float:
print('---------- _get_wait_time(self)')
"""
Calculates the necessary wait time based on call history.
THIS METHOD MUST BE CALLED UNDER self._rate_limit_lock.
"""
now = time.monotonic() # Use monotonic clock for measuring duration
# Use the SharedRateLimiter's static methods and shared lock
rate_limit_lock = SharedRateLimiter.get_rate_limit_lock()
with rate_limit_lock:
wait_time = SharedRateLimiter.get_wait_time(self.rate_limit_calls, self.rate_limit_period)
return wait_time
def _register_call(self):
print('---------- _register_call')
"""Registers the current time as a call timestamp in the SHARED queue."""
SharedRateLimiter.register_call() # Use the SharedRateLimiter's static method
def _wait_for_rate_limit_sync(self):
print('---------- _wait_for_rate_limit_sync')
print('---------- _wait_for_rate_limit_sync - START')
"""Checks shared rate limit and sleeps if necessary (synchronous version)."""
wait_time = 0.0
logger.debug(f"Agent '{self.name}': _wait_for_rate_limit_sync called.") # Entry log
logger.debug(f"Agent '{self.name}': Checking shared rate limit (sync). Shared timestamps queue size: {len(SharedRateLimiter._call_timestamps)}") # Debug log
wait_time = self._get_wait_time() # Now uses SharedRateLimiter internally
logger.debug(f"Agent '{self.name}': Calculated wait time (sync): {wait_time:.2f} seconds.") # Debug log
logger.debug(f"Agent '{self.name}': Shared Call timestamps: {list(SharedRateLimiter._call_timestamps)}") # Log the timestamps
if wait_time <= 0:
# Limit not hit, register the call immediately
self._register_call() # Now uses SharedRateLimiter internally
logger.debug(f"Agent '{self.name}': Shared rate limit not reached in sync call.")
else:
logger.warning(
f"------ Agent '{self.name}': Shared rate limit reached in sync call. "
f"------ Waiting for {wait_time:.2f} seconds..."
)
logger.debug(f"Agent '{self.name}': Waiting for {wait_time:.2f} seconds (sync).") # Debug log
if wait_time > 0:
logger.debug(f"Agent '{self.name}': Sleeping for {wait_time:.2f} seconds (sync).") # Sleep log
time.sleep(wait_time)
# After waiting, register the call
self._register_call() # Now uses SharedRateLimiter internally
logger.debug(f"Agent '{self.name}': Wait finished, call registered (sync).") # Debug log
else:
logger.debug(f"Agent '{self.name}': No wait needed, call registered immediately (sync).") # Debug log
print('---------- _wait_for_rate_limit_sync - END') # End log
async def _wait_for_rate_limit_async(self):
print('---------- _wait_for_rate_limit_async')
print('---------- _wait_for_rate_limit_async - START')
"""Checks shared rate limit and sleeps if necessary (asynchronous version)."""
wait_time = 0.0
logger.debug(f"Agent '{self.name}': _wait_for_rate_limit_async called.") # Async entry log
logger.debug(f"Agent '{self.name}': Checking shared rate limit (async). Shared timestamps queue size: {len(SharedRateLimiter._call_timestamps)}") # Debug log
wait_time = self._get_wait_time() # Now uses SharedRateLimiter internally
logger.debug(f"Agent '{self.name}': Calculated wait time (async): {wait_time:.2f} seconds") # Debug log
logger.debug(f"Agent '{self.name}': Shared Call timestamps: {list(SharedRateLimiter._call_timestamps)}") # Log the timestamps
if wait_time <= 0:
# Limit not hit, register the call immediately
self._register_call() # Now uses SharedRateLimiter internally
logger.debug(f"Agent '{self.name}': Shared rate limit not reached in async call.")
else:
logger.warning(
f"Agent '{self.name}': Shared rate limit reached (async). "
f"Waiting for {wait_time:.2f} seconds..."
)
logger.debug(f"Agent '{self.name}': Waiting for {wait_time:.2f} seconds (async).") # Debug log
if wait_time > 0:
logger.debug(f"Agent '{self.name}': Sleeping for {wait_time:.2f} seconds (async).") # Sleep log
await asyncio.sleep(wait_time)
# After waiting, register the call
self._register_call() # Now uses SharedRateLimiter internally
logger.debug(f"Agent '{self.name}': Wait finished, call registered (async).") # Debug log
else:
logger.debug(f"Agent '{self.name}': No wait needed, call registered immediately (async).") # Debug log
print('---------- _wait_for_rate_limit_async - END') # End log
def _generate_oai_reply_from_client(
self, llm_client: Any, messages: List[Dict[str, Any]], cache
) -> Optional[Union[str, Dict[str, Any]]]:
"""Generate a reply using the given client, applying rate limiting first."""
print(f'---------- Agent "{self.name}": _generate_oai_reply_from_client called') # Check if this is hit
# Apply rate limiting BEFORE making the actual client.create call
# This method is likely called synchronously within other sync/async flows,
# so using the sync wait is appropriate here.
print(f'---------- Agent "{self.name}": _generate_oai_reply_from_client -> Checking rate limit...')
self._wait_for_rate_limit_sync()
print(f'---------- Agent "{self.name}": _generate_oai_reply_from_client -> Proceeding with super().')
# Call the original method from the parent class
return super()._generate_oai_reply_from_client(llm_client, messages, cache)
# CREATE THE FILE shared_rate_limiter
import time
import threading
from collections import deque
class SharedRateLimiter:
_call_timestamps: deque[float] = deque()
_rate_limit_lock = threading.Lock()
@staticmethod
def get_wait_time(rate_limit_calls: int, rate_limit_period: float) -> float:
"""Calculates wait time based on shared call history."""
now = time.monotonic()
while SharedRateLimiter._call_timestamps and SharedRateLimiter._call_timestamps[0] <= now - rate_limit_period:
SharedRateLimiter._call_timestamps.popleft()
if len(SharedRateLimiter._call_timestamps) >= rate_limit_calls:
time_since_oldest_call = now - SharedRateLimiter._call_timestamps[0]
wait_time = rate_limit_period - time_since_oldest_call
return max(0, wait_time) + 0.01
return 0.0
@staticmethod
def register_call():
"""Registers a call timestamp in the shared queue."""
SharedRateLimiter._call_timestamps.append(time.monotonic())
@staticmethod
def get_rate_limit_lock():
"""Returns the shared rate limit lock."""
return SharedRateLimiter._rate_limit_lock
How to use it?
from rate_limiter_agent import RateLimitedConversableAgent
writer = RateLimitedConversableAgent(
name="Writer",
system_message="Your system prompt",
llm_config={"config_list" : config_list_gemini},
human_input_mode="NEVER",
rate_limit_calls= 10, # set your RPM based on your model in my case I am using gemini-2.0-flash-thinking-exp-01-21 so it's 10 RPM (10 request per minute)
rate_limit_period= 60.0 # set your limit period in 60.0 seconds is equal to 1 minute
)
Closing as stale, and given the response above.