(feat) implement decay-based rate limiting in AsyncRequestContext
Before submitting this PR, please make sure:
- [x] Your code builds clean without any errors or warnings
- [x] You are using approved title ("feat/", "fix/", "docs/", "refactor/")
A description of the changes proposed in the pull request:
This PR adds support for decay-based rate limits in the AsyncThrottler, addressing issue #4163. These limits differ from fixed window limits as they continuously recover capacity over time instead of resetting at fixed intervals.
Key design principle: Added this functionality without breaking existing code. Backward compatible with zero changes needed for existing connectors for easy and fast integration.
Problem:
Exchanges like Kraken and Coinbase Pro use decay-based rate limiting rather than traditional fixed windows. Adding this functionality allows connectors to more accurately model these API behaviors and optimize request throughput.
Tests performed by the developer:
Tested thoroughly with:
- Unit tests that simulate time progression to verify decay behavior
- Real-world testing against Binance API
- A local script simulation (included in PR description) showing capacity usage over time
Local script
import asyncio
import logging
import time
from typing import List, Set
from hummingbot.core.api_throttler.async_throttler import AsyncThrottler
from hummingbot.core.api_throttler.data_types import RateLimit, RateLimitType
# Setup colored logging for better readability
class ColoredFormatter(logging.Formatter):
COLORS = {
'DEBUG': '\033[94m', # blue
'INFO': '\033[92m', # green
'WARNING': '\033[93m', # yellow
'ERROR': '\033[91m', # red
'CRITICAL': '\033[91m\033[1m', # bold red
'RESET': '\033[0m' # reset
}
def format(self, record):
log_message = super().format(record)
return f"{self.COLORS.get(record.levelname, self.COLORS['RESET'])}{log_message}{self.COLORS['RESET']}"
# Setup logging
logger = logging.getLogger("binance_api")
logger.setLevel(logging.INFO)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
formatter = ColoredFormatter('%(asctime)s - %(levelname)s - %(message)s')
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
# Constants for the demonstration
API_LIMIT_ID = "BINANCE_WEIGHT"
API_LIMIT = 3.0 # 10 weight units capacity
API_DECAY_RATE = 1.0 # 1 unit per second recovery rate
REQUEST_WEIGHT = 2.0 # Each request costs 2 weight units
class BinanceSimpleDemo:
"""Simple demonstration of decay-based throttling with a single API limit"""
def __init__(self):
# Create a single rate limit
rate_limits: List[RateLimit] = [
RateLimit(
limit_id=API_LIMIT_ID,
limit=API_LIMIT,
time_interval=60.0, # 1 minute (not relevant for decay type)
weight=REQUEST_WEIGHT, # Each request costs 2 weight units
limit_type=RateLimitType.DECAY,
decay_rate=API_DECAY_RATE # 1 unit per second recovery
)
]
# Create throttler
self.throttler = AsyncThrottler(rate_limits=rate_limits)
logger.info(f"Created API throttler with decay-based limit:")
logger.info(f" - Capacity: {API_LIMIT} weight units")
logger.info(f" - Request weight: {REQUEST_WEIGHT} units per request")
logger.info(f" - Recovery rate: {API_DECAY_RATE} units per second")
# Track active and waiting requests
self.active_requests: Set[str] = set()
self.waiting_requests: Set[str] = set()
self.completed_requests = 0
self.start_time = 0
async def log_status(self):
"""Log current status of the system"""
current_usage, _ = self.throttler._decay_usage.get(API_LIMIT_ID, (0.0, 0.0))
# Calculate available capacity
available = max(0, API_LIMIT - current_usage)
# Create visualization
usage_bar = self._create_progress_bar(current_usage, API_LIMIT)
elapsed = time.time() - self.start_time
logger.info(f"STATUS AT {elapsed:.1f}s:")
logger.info(f" API Usage: {usage_bar} {current_usage:.1f}/{API_LIMIT} units")
logger.info(f" Available: {available:.1f} units")
logger.info(f" Active requests: {len(self.active_requests)}")
logger.info(f" Waiting requests: {len(self.waiting_requests)}")
logger.info(f" Completed: {self.completed_requests}/20 requests")
logger.info(f" Task logs count: {len(self.throttler._task_logs)}")
if self.waiting_requests:
waiting_list = ", ".join(sorted(self.waiting_requests))
logger.info(f" Waiting IDs: {waiting_list}")
def _create_progress_bar(self, current, total, width=20):
"""Create ASCII progress bar"""
percentage = min(current / total, 1.0)
filled_width = int(width * percentage)
bar = '█' * filled_width + '░' * (width - filled_width)
return bar
async def execute_request(self, request_id):
"""Execute a request respecting the rate limit"""
logger.info(f"Request #{request_id}: Wants to start")
self.waiting_requests.add(request_id)
start_time = time.time()
try:
# This will wait if we're over capacity
async with self.throttler.execute_task(limit_id=API_LIMIT_ID):
logger.info(f"Request #{request_id}: Started")
# Request is now active
self.waiting_requests.remove(request_id)
self.active_requests.add(request_id)
wait_time = time.time() - start_time
if wait_time > 0.1:
logger.warning(f"Request #{request_id}: Waited {wait_time:.2f}s for capacity")
else:
logger.info(f"Request #{request_id}: Processing immediately")
# Simulate API request execution (3 seconds)
await asyncio.sleep(3.0)
except Exception as e:
logger.error(f"Request #{request_id}: Error - {e}")
if request_id in self.waiting_requests:
self.waiting_requests.remove(request_id)
finally:
# Request is completed
if request_id in self.active_requests:
self.active_requests.remove(request_id)
execution_time = time.time() - start_time
self.completed_requests += 1
logger.info(f"Request #{request_id}: Completed in {execution_time:.2f}s")
async def run_demo():
"""Run the demonstration"""
logger.info("=" * 80)
logger.info("DECAY-BASED THROTTLING DEMONSTRATION")
logger.info("=" * 80)
logger.info("This demo shows how 20 requests (each with weight 2) are handled")
logger.info("with a capacity of 10 units and recovery rate of 1 unit/second")
logger.info("=" * 80)
demo = BinanceSimpleDemo()
demo.start_time = time.time()
# Create a mix of parallel and sequential requests
# First 5 requests in parallel
parallel_tasks = []
for i in [1, 2, 3, 4, 5, 6, 7, 8]:
parallel_tasks.append(demo.execute_request(f"PARALLEL-{i}"))
# Start status reporting
status_task = asyncio.create_task(report_status(demo))
# Launch first batch of parallel requests
await asyncio.sleep(1) # Small delay for better log readability
logger.info("Launching 5 parallel requests...")
await asyncio.gather(*parallel_tasks)
# execute 5 parallel requests
for i in [1, 2, 3, 4, 5]:
await demo.execute_request(f"SEQ{i}")
# Final status
await demo.log_status()
# Stop status reporting
status_task.cancel()
try:
await status_task
except asyncio.CancelledError:
pass
logger.info("\n" + "=" * 80)
logger.info("DEMONSTRATION COMPLETED")
logger.info("=" * 80)
async def report_status(demo):
"""Periodically report system status"""
try:
while True:
await demo.log_status()
await asyncio.sleep(1.0) # Update status every second
except asyncio.CancelledError:
logger.info("Status reporting stopped")
raise
if __name__ == "__main__":
# Run the demonstration
asyncio.run(run_demo())
Tips for QA testing:
Example use case:
# Define a decay-based rate limit
rate_limits = [
RateLimit(
limit_id="BINANCE_WEIGHT",
limit=50.0, # Maximum capacity
time_interval=60.0, # Not used for DECAY type but required for consistency
weight=1.0, # Default weight per request
limit_type=RateLimitType.DECAY,
decay_rate=1.0 # Recovery rate of 1 unit per second
)
]
# Create throttler
throttler = AsyncThrottler(rate_limits=rate_limits)
# Use in request
async with throttler.execute_task(limit_id="BINANCE_WEIGHT"):
Please review contributions guidelines: https://hummingbot.org/developers/contributions/#6-create-a-pull-request
Thank you for pointing out @nikspz
Improved PR:
- Changed branch to
development - Created proposal https://snapshot.box/#/s:hbot-prp.eth/proposal/0xaf1b812ed7964645cd24f90eb9c8fa0545eb341a05eef2439626e796719b7cbb
- Increased test coverage to pass CI
@nikspz Increased test coverage to 99% to pass CI
Name Stmts Miss Branch BrPart Cover Missing
-----------------------------------------------------------------------------------------------
hummingbot/core/api_throttler/async_throttler.py 61 0 22 1 98.80% 124->123
-----------------------------------------------------------------------------------------------
TOTAL 61 0 22 1 98.80%
Thanks for this contribution! This change significantly improves order management in Hummingbot. Have you tested it across multiple exchanges to ensure compatibility?
Hi @bentrnr21 I've tested it with Binance. By design decay rate limit is compatible with all exchanges, but if we are talking about how well it works with exchanges that use decay based limiting, you need to first manually find/figure out the exchanges limits and customize limit and decay_rate for them.
hey @riseandignite thanks very much for this contribution! I will prioritize it for the next release!
hi @riseandignite could you please check / fix branch conflicts?
@nikspz resolved