autogen icon indicating copy to clipboard operation
autogen copied to clipboard

Autogen rate_limit_reached_error - how to set a rate limit?

Open aizhweiwei opened this issue 1 year ago • 2 comments

What happened?

[code] task = "Who was the Miami Heat player with the highest points in the 2006-2007 season, and what was the percentage change in his total rebounds between the 2007-2008 and 2008-2009 seasons?"

Use asyncio.run(...) if you are running this in a script.

await Console(team.run_stream(task=task))

[error] File "/root/miniconda3/envs/llama-factory/lib/python3.11/site-packages/openai/_base_client.py", line 1666, in _retry_request return await self._request( ^^^^^^^^^^^^^^^^^^^^ File "/root/miniconda3/envs/llama-factory/lib/python3.11/site-packages/openai/_base_client.py", line 1634, in _request raise self._make_status_error_from_response(err.response) from None openai.RateLimitError: Error code: 429 - {'error': {'message': 'Your account crobsbebi7sb70r8ack0 request reached max request: 3, please try again after 1 seconds', 'type': 'rate_limit_reached_error'}} ---------- Summary ---------- Number of messages: 4 Finish reason: None Total prompt tokens: 379 Total completion tokens: 114 Duration: 6.39 seconds

What did you expect to happen?

not error

How can we reproduce it (as minimally and precisely as possible)?

use kimi free api

AutoGen version

0.4.11

Which package was this bug in

AgentChat

Model used

kimi

Python version

No response

Operating system

No response

Any additional info you think would be helpful for fixing this bug

No response

aizhweiwei avatar Dec 23 '24 12:12 aizhweiwei

There is not currently functionality for rate limiting the AutoGen OpenAI model client.

However, it would be easy to add a model client which wraps the OpenAI one and either implements rate limiting or backoff in the presence of rate limit errors.

See here for guidance from openai: https://cookbook.openai.com/examples/how_to_handle_rate_limits

Would you be interested in contributing a model client like this?

jackgerrits avatar Dec 27 '24 15:12 jackgerrits

@aizhweiwei here is a implementation how to have a rate limiter with AutoGen for Gemini

# CREATE THE FILE: rate_limiter_agent.py
import time
import threading
import asyncio
from collections import deque # More efficient for removing old timestamps
import logging
from typing import Optional, Union, List, Dict, Any, Tuple, Literal
from shared_rate_limiter import SharedRateLimiter
import autogen
from typing import (
    Any,
    Callable,
    Generator,
    Iterable,
    Literal,
    Optional,
    TypeVar,
    Union,
)

# Assuming the original ConversableAgent code is in a file named 'conversable_agent.py'
# or available in the autogen package path.
# If it's in a local file, adjust the import path:
# from .conversable_agent import ConversableAgent, Agent, LLMAgent, ChatResult, UpdateSystemMessage, ...
# If using the installed package:
from autogen.agentchat.conversable_agent import ConversableAgent, Agent
# Import other necessary types/classes if needed for full context
from autogen.oai.client import OpenAIWrapper


logger = logging.getLogger(__name__)

class RateLimitedConversableAgent(ConversableAgent):
    """
    A subclass of ConversableAgent that adds rate limiting to OAI (e.g., Gemini) API calls.

    This agent limits the number of calls made through `generate_oai_reply` and
    `a_generate_oai_reply` based on the specified rate.
    """
    DEFAULT_SUMMARY_METHOD = "last_msg"


    def __init__(
        self,
        name: str,
        rate_limit_calls: int = 10,  # Max calls (e.g., 5 for Gemini free tier)
        rate_limit_period: float = 60.0, # Period in seconds (e.g., 60 for per minute)
        **kwargs, # Pass other ConversableAgent args
    ):
        print('-------- Initiate RATE LIMITED CONVERSABLE')
        """
        Args:
            name (str): Name of the agent.
            rate_limit_calls (int): Maximum number of API calls allowed within the period.
            rate_limit_period (float): The time window in seconds for the rate limit.
            **kwargs: Other arguments to pass to the ConversableAgent constructor.
        """
        super().__init__(name=name, **kwargs)
        self.rate_limit_calls = rate_limit_calls
        self.rate_limit_period = rate_limit_period

        logger.info(
            f"Rate limiter initialized for agent '{self.name}': "
            f"{self.rate_limit_calls} calls / {self.rate_limit_period} seconds"
        )
    

    def _get_wait_time(self) -> float:
        print('---------- _get_wait_time(self)')
        """
        Calculates the necessary wait time based on call history.
        THIS METHOD MUST BE CALLED UNDER self._rate_limit_lock.
        """
        now = time.monotonic() # Use monotonic clock for measuring duration
        # Use the SharedRateLimiter's static methods and shared lock
        rate_limit_lock = SharedRateLimiter.get_rate_limit_lock()
        with rate_limit_lock:
            wait_time = SharedRateLimiter.get_wait_time(self.rate_limit_calls, self.rate_limit_period)
            return wait_time


    def _register_call(self):
        print('---------- _register_call')

        """Registers the current time as a call timestamp in the SHARED queue."""
        SharedRateLimiter.register_call() # Use the SharedRateLimiter's static method


    def _wait_for_rate_limit_sync(self):
        print('---------- _wait_for_rate_limit_sync')
        print('---------- _wait_for_rate_limit_sync - START')

        """Checks shared rate limit and sleeps if necessary (synchronous version)."""
        wait_time = 0.0
        logger.debug(f"Agent '{self.name}': _wait_for_rate_limit_sync called.")  # Entry log
        logger.debug(f"Agent '{self.name}': Checking shared rate limit (sync). Shared timestamps queue size: {len(SharedRateLimiter._call_timestamps)}")  # Debug log

        wait_time = self._get_wait_time() # Now uses SharedRateLimiter internally
        logger.debug(f"Agent '{self.name}': Calculated wait time (sync): {wait_time:.2f} seconds.")  # Debug log
        logger.debug(f"Agent '{self.name}': Shared Call timestamps: {list(SharedRateLimiter._call_timestamps)}") # Log the timestamps

        if wait_time <= 0:
            # Limit not hit, register the call immediately
            self._register_call() # Now uses SharedRateLimiter internally
            logger.debug(f"Agent '{self.name}': Shared rate limit not reached in sync call.")
        else:
            logger.warning(
                f"------ Agent '{self.name}': Shared rate limit reached in sync call. "
                f"------ Waiting for {wait_time:.2f} seconds..."
            )
            logger.debug(f"Agent '{self.name}': Waiting for {wait_time:.2f} seconds (sync).")  # Debug log
        if wait_time > 0:
            logger.debug(f"Agent '{self.name}': Sleeping for {wait_time:.2f} seconds (sync).")  # Sleep log
            time.sleep(wait_time)
            # After waiting, register the call
            self._register_call() # Now uses SharedRateLimiter internally
            logger.debug(f"Agent '{self.name}': Wait finished, call registered (sync).")  # Debug log
        else:
            logger.debug(f"Agent '{self.name}': No wait needed, call registered immediately (sync).")  # Debug log
        print('---------- _wait_for_rate_limit_sync - END') # End log


    async def _wait_for_rate_limit_async(self):
        print('---------- _wait_for_rate_limit_async')
        print('---------- _wait_for_rate_limit_async - START')
        """Checks shared rate limit and sleeps if necessary (asynchronous version)."""
        wait_time = 0.0
        logger.debug(f"Agent '{self.name}': _wait_for_rate_limit_async called.") # Async entry log
        logger.debug(f"Agent '{self.name}': Checking shared rate limit (async). Shared timestamps queue size: {len(SharedRateLimiter._call_timestamps)}") # Debug log

        wait_time = self._get_wait_time() # Now uses SharedRateLimiter internally
        logger.debug(f"Agent '{self.name}': Calculated wait time (async): {wait_time:.2f} seconds") # Debug log
        logger.debug(f"Agent '{self.name}': Shared Call timestamps: {list(SharedRateLimiter._call_timestamps)}") # Log the timestamps
        if wait_time <= 0:
            # Limit not hit, register the call immediately
            self._register_call() # Now uses SharedRateLimiter internally
            logger.debug(f"Agent '{self.name}': Shared rate limit not reached in async call.")
        else:
            logger.warning(
                f"Agent '{self.name}': Shared rate limit reached (async). "
                f"Waiting for {wait_time:.2f} seconds..."
            )
            logger.debug(f"Agent '{self.name}': Waiting for {wait_time:.2f} seconds (async).") # Debug log

        if wait_time > 0:
            logger.debug(f"Agent '{self.name}': Sleeping for {wait_time:.2f} seconds (async).") # Sleep log
            await asyncio.sleep(wait_time)
            # After waiting, register the call
            self._register_call() # Now uses SharedRateLimiter internally
            logger.debug(f"Agent '{self.name}': Wait finished, call registered (async).") # Debug log
        else:
            logger.debug(f"Agent '{self.name}': No wait needed, call registered immediately (async).") # Debug log
        print('---------- _wait_for_rate_limit_async - END') # End log


    def _generate_oai_reply_from_client(
        self, llm_client: Any, messages: List[Dict[str, Any]], cache
    ) -> Optional[Union[str, Dict[str, Any]]]:
        """Generate a reply using the given client, applying rate limiting first."""

        print(f'---------- Agent "{self.name}": _generate_oai_reply_from_client called') # Check if this is hit

        # Apply rate limiting BEFORE making the actual client.create call
        # This method is likely called synchronously within other sync/async flows,
        # so using the sync wait is appropriate here.
        print(f'---------- Agent "{self.name}": _generate_oai_reply_from_client -> Checking rate limit...')
        self._wait_for_rate_limit_sync()
        print(f'---------- Agent "{self.name}": _generate_oai_reply_from_client -> Proceeding with super().')

        # Call the original method from the parent class
        return super()._generate_oai_reply_from_client(llm_client, messages, cache)
#  CREATE THE FILE shared_rate_limiter
import time
import threading
from collections import deque

class SharedRateLimiter:
    _call_timestamps: deque[float] = deque()
    _rate_limit_lock = threading.Lock()

    @staticmethod
    def get_wait_time(rate_limit_calls: int, rate_limit_period: float) -> float:
        """Calculates wait time based on shared call history."""
        now = time.monotonic()
        while SharedRateLimiter._call_timestamps and SharedRateLimiter._call_timestamps[0] <= now - rate_limit_period:
            SharedRateLimiter._call_timestamps.popleft()

        if len(SharedRateLimiter._call_timestamps) >= rate_limit_calls:
            time_since_oldest_call = now - SharedRateLimiter._call_timestamps[0]
            wait_time = rate_limit_period - time_since_oldest_call
            return max(0, wait_time) + 0.01
        return 0.0

    @staticmethod
    def register_call():
        """Registers a call timestamp in the shared queue."""
        SharedRateLimiter._call_timestamps.append(time.monotonic())

    @staticmethod
    def get_rate_limit_lock():
        """Returns the shared rate limit lock."""
        return SharedRateLimiter._rate_limit_lock


How to use it?

from rate_limiter_agent import RateLimitedConversableAgent


writer = RateLimitedConversableAgent(
    name="Writer",
    system_message="Your system prompt",
    llm_config={"config_list" : config_list_gemini},
    human_input_mode="NEVER",
    rate_limit_calls= 10, # set your RPM based on your model in my case I am using gemini-2.0-flash-thinking-exp-01-21 so it's 10 RPM (10 request per minute)
    rate_limit_period= 60.0 # set your limit period in 60.0 seconds is equal to 1 minute
)

thiagobutignon avatar Mar 26 '25 22:03 thiagobutignon

Closing as stale, and given the response above.

victordibia avatar Jun 13 '25 22:06 victordibia