hummingbot icon indicating copy to clipboard operation
hummingbot copied to clipboard

(feat) implement decay-based rate limiting in AsyncRequestContext

Open riseandignite opened this issue 9 months ago • 8 comments

Before submitting this PR, please make sure:

  • [x] Your code builds clean without any errors or warnings
  • [x] You are using approved title ("feat/", "fix/", "docs/", "refactor/")

A description of the changes proposed in the pull request:

This PR adds support for decay-based rate limits in the AsyncThrottler, addressing issue #4163. These limits differ from fixed window limits as they continuously recover capacity over time instead of resetting at fixed intervals.

Key design principle: Added this functionality without breaking existing code. Backward compatible with zero changes needed for existing connectors for easy and fast integration.

Problem:

Exchanges like Kraken and Coinbase Pro use decay-based rate limiting rather than traditional fixed windows. Adding this functionality allows connectors to more accurately model these API behaviors and optimize request throughput.

Tests performed by the developer:

Tested thoroughly with:

  1. Unit tests that simulate time progression to verify decay behavior
  2. Real-world testing against Binance API
  3. A local script simulation (included in PR description) showing capacity usage over time
Local script
import asyncio
import logging
import time
from typing import List, Set

from hummingbot.core.api_throttler.async_throttler import AsyncThrottler
from hummingbot.core.api_throttler.data_types import RateLimit, RateLimitType

# Setup colored logging for better readability


class ColoredFormatter(logging.Formatter):
    COLORS = {
        'DEBUG': '\033[94m',  # blue
        'INFO': '\033[92m',   # green
        'WARNING': '\033[93m',  # yellow
        'ERROR': '\033[91m',  # red
        'CRITICAL': '\033[91m\033[1m',  # bold red
        'RESET': '\033[0m'    # reset
    }

    def format(self, record):
        log_message = super().format(record)
        return f"{self.COLORS.get(record.levelname, self.COLORS['RESET'])}{log_message}{self.COLORS['RESET']}"


# Setup logging
logger = logging.getLogger("binance_api")
logger.setLevel(logging.INFO)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
formatter = ColoredFormatter('%(asctime)s - %(levelname)s - %(message)s')
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)

# Constants for the demonstration
API_LIMIT_ID = "BINANCE_WEIGHT"
API_LIMIT = 3.0        # 10 weight units capacity
API_DECAY_RATE = 1.0    # 1 unit per second recovery rate
REQUEST_WEIGHT = 2.0    # Each request costs 2 weight units


class BinanceSimpleDemo:
    """Simple demonstration of decay-based throttling with a single API limit"""

    def __init__(self):
        # Create a single rate limit
        rate_limits: List[RateLimit] = [
            RateLimit(
                limit_id=API_LIMIT_ID,
                limit=API_LIMIT,
                time_interval=60.0,  # 1 minute (not relevant for decay type)
                weight=REQUEST_WEIGHT,  # Each request costs 2 weight units
                limit_type=RateLimitType.DECAY,
                decay_rate=API_DECAY_RATE  # 1 unit per second recovery
            )
        ]

        # Create throttler
        self.throttler = AsyncThrottler(rate_limits=rate_limits)
        logger.info(f"Created API throttler with decay-based limit:")
        logger.info(f"  - Capacity: {API_LIMIT} weight units")
        logger.info(f"  - Request weight: {REQUEST_WEIGHT} units per request")
        logger.info(f"  - Recovery rate: {API_DECAY_RATE} units per second")

        # Track active and waiting requests
        self.active_requests: Set[str] = set()
        self.waiting_requests: Set[str] = set()
        self.completed_requests = 0
        self.start_time = 0

    async def log_status(self):
        """Log current status of the system"""
        current_usage, _ = self.throttler._decay_usage.get(API_LIMIT_ID, (0.0, 0.0))
        # Calculate available capacity
        available = max(0, API_LIMIT - current_usage)

        # Create visualization
        usage_bar = self._create_progress_bar(current_usage, API_LIMIT)
        elapsed = time.time() - self.start_time

        logger.info(f"STATUS AT {elapsed:.1f}s:")
        logger.info(f"  API Usage: {usage_bar} {current_usage:.1f}/{API_LIMIT} units")
        logger.info(f"  Available: {available:.1f} units")
        logger.info(f"  Active requests: {len(self.active_requests)}")
        logger.info(f"  Waiting requests: {len(self.waiting_requests)}")
        logger.info(f"  Completed: {self.completed_requests}/20 requests")
        logger.info(f"  Task logs count: {len(self.throttler._task_logs)}")

        if self.waiting_requests:
            waiting_list = ", ".join(sorted(self.waiting_requests))
            logger.info(f"  Waiting IDs: {waiting_list}")

    def _create_progress_bar(self, current, total, width=20):
        """Create ASCII progress bar"""
        percentage = min(current / total, 1.0)
        filled_width = int(width * percentage)
        bar = '█' * filled_width + '░' * (width - filled_width)
        return bar

    async def execute_request(self, request_id):
        """Execute a request respecting the rate limit"""
        logger.info(f"Request #{request_id}: Wants to start")
        self.waiting_requests.add(request_id)

        start_time = time.time()

        try:
            # This will wait if we're over capacity
            async with self.throttler.execute_task(limit_id=API_LIMIT_ID):
                logger.info(f"Request #{request_id}: Started")
                # Request is now active
                self.waiting_requests.remove(request_id)
                self.active_requests.add(request_id)

                wait_time = time.time() - start_time
                if wait_time > 0.1:
                    logger.warning(f"Request #{request_id}: Waited {wait_time:.2f}s for capacity")
                else:
                    logger.info(f"Request #{request_id}: Processing immediately")

                # Simulate API request execution (3 seconds)
                await asyncio.sleep(3.0)
        except Exception as e:
            logger.error(f"Request #{request_id}: Error - {e}")
            if request_id in self.waiting_requests:
                self.waiting_requests.remove(request_id)
        finally:
            # Request is completed
            if request_id in self.active_requests:
                self.active_requests.remove(request_id)

            execution_time = time.time() - start_time
            self.completed_requests += 1
            logger.info(f"Request #{request_id}: Completed in {execution_time:.2f}s")


async def run_demo():
    """Run the demonstration"""
    logger.info("=" * 80)
    logger.info("DECAY-BASED THROTTLING DEMONSTRATION")
    logger.info("=" * 80)
    logger.info("This demo shows how 20 requests (each with weight 2) are handled")
    logger.info("with a capacity of 10 units and recovery rate of 1 unit/second")
    logger.info("=" * 80)

    demo = BinanceSimpleDemo()
    demo.start_time = time.time()

    # Create a mix of parallel and sequential requests
    # First 5 requests in parallel
    parallel_tasks = []
    for i in [1, 2, 3, 4, 5, 6, 7, 8]:
        parallel_tasks.append(demo.execute_request(f"PARALLEL-{i}"))

    # Start status reporting
    status_task = asyncio.create_task(report_status(demo))

    # Launch first batch of parallel requests
    await asyncio.sleep(1)  # Small delay for better log readability
    logger.info("Launching 5 parallel requests...")
    await asyncio.gather(*parallel_tasks)

    # execute 5 parallel requests
    for i in [1, 2, 3, 4, 5]:
        await demo.execute_request(f"SEQ{i}")

    # Final status
    await demo.log_status()

    # Stop status reporting
    status_task.cancel()
    try:
        await status_task
    except asyncio.CancelledError:
        pass

    logger.info("\n" + "=" * 80)
    logger.info("DEMONSTRATION COMPLETED")
    logger.info("=" * 80)


async def report_status(demo):
    """Periodically report system status"""
    try:
        while True:
            await demo.log_status()
            await asyncio.sleep(1.0)  # Update status every second
    except asyncio.CancelledError:
        logger.info("Status reporting stopped")
        raise


if __name__ == "__main__":
    # Run the demonstration
    asyncio.run(run_demo())

Tips for QA testing:

Example use case:

# Define a decay-based rate limit
rate_limits = [
    RateLimit(
        limit_id="BINANCE_WEIGHT",
        limit=50.0,           # Maximum capacity
        time_interval=60.0,   # Not used for DECAY type but required for consistency
        weight=1.0,           # Default weight per request
        limit_type=RateLimitType.DECAY,
        decay_rate=1.0        # Recovery rate of 1 unit per second
    )
]

# Create throttler
throttler = AsyncThrottler(rate_limits=rate_limits)

# Use in request
async with throttler.execute_task(limit_id="BINANCE_WEIGHT"):

riseandignite avatar Mar 04 '25 13:03 riseandignite

Please review contributions guidelines: https://hummingbot.org/developers/contributions/#6-create-a-pull-request

nikspz avatar Mar 05 '25 08:03 nikspz

Thank you for pointing out @nikspz

Improved PR:

  1. Changed branch to development
  2. Created proposal https://snapshot.box/#/s:hbot-prp.eth/proposal/0xaf1b812ed7964645cd24f90eb9c8fa0545eb341a05eef2439626e796719b7cbb
  3. Increased test coverage to pass CI

riseandignite avatar Mar 05 '25 14:03 riseandignite

@nikspz Increased test coverage to 99% to pass CI

Name                                               Stmts   Miss Branch BrPart   Cover   Missing
-----------------------------------------------------------------------------------------------
hummingbot/core/api_throttler/async_throttler.py      61      0     22      1  98.80%   124->123
-----------------------------------------------------------------------------------------------
TOTAL                                                 61      0     22      1  98.80%

riseandignite avatar Mar 05 '25 23:03 riseandignite

Thanks for this contribution! This change significantly improves order management in Hummingbot. Have you tested it across multiple exchanges to ensure compatibility?

bentrnr21 avatar Mar 06 '25 10:03 bentrnr21

Hi @bentrnr21 I've tested it with Binance. By design decay rate limit is compatible with all exchanges, but if we are talking about how well it works with exchanges that use decay based limiting, you need to first manually find/figure out the exchanges limits and customize limit and decay_rate for them.

riseandignite avatar Mar 07 '25 03:03 riseandignite

hey @riseandignite thanks very much for this contribution! I will prioritize it for the next release!

cardosofede avatar Apr 01 '25 15:04 cardosofede

hi @riseandignite could you please check / fix branch conflicts?

nikspz avatar Jun 30 '25 12:06 nikspz

@nikspz resolved

riseandignite avatar Jun 30 '25 14:06 riseandignite