ray icon indicating copy to clipboard operation
ray copied to clipboard

add queue length based autoscaling

Open harshit-anyscale opened this issue 3 weeks ago • 0 comments

This PR adds queue-based autoscaling for Ray Serve TaskConsumer deployments. TaskConsumers are workloads that consume tasks from message queues (Redis, RabbitMQ), and their scaling needs are fundamentally different from HTTP-based deployments.

Instead of scaling based on HTTP request load, TaskConsumers should scale based on the number of pending tasks in the message queue.

Key Features

  • Automatic queue depth monitoring via QueueMonitor Ray actor
  • New queue_based_autoscaling_policy that scales replicas based on ceil(queue_length / target_ongoing_requests)
  • Transparent integration: TaskConsumers with autoscaling enabled automatically use queue-based scaling
  • Support for Redis and RabbitMQ brokers
  • Actor recovery mechanism for fault tolerance

Architecture


  ┌─────────────────┐      ┌──────────────────┐      ┌─────────────────┐
  │  Message Queue  │◄─────│  QueueMonitor    │      │ ServeController │
  │  (Redis/RMQ)    │      │  Actor           │◄─────│ Autoscaler      │
  └─────────────────┘      └──────────────────┘      └─────────────────┘
                                   │                         │
                                   │ get_queue_length()      │
                                   └─────────────────────────┘
                                             │
                                             ▼
                                ┌───────────────────────────┐
                                │ queue_based_autoscaling   │
                                │ _policy()                 │
                                │ desired = ceil(len/target)│
                                └───────────────────────────┘

Components

  1. QueueMonitor Actor (queue_monitor.py) - Lightweight Ray actor that queries queue length from the message broker - Supports Redis (using LLEN command) and RabbitMQ (using passive queue declaration) - Named actor format: QUEUE_MONITOR::<deployment_name> - Detached lifecycle with automatic cleanup on deployment deletion
  2. Queue-Based Autoscaling Policy (autoscaling_policy.py) - New policy function: queue_based_autoscaling_policy - Formula: desired_replicas = ceil(queue_length / target_ongoing_requests) - Reuses existing smoothing/delay logic to prevent oscillation - Stores QueueMonitor config in policy_state for actor recovery
  3. Automatic Policy Switching (application_state.py) - Detects TaskConsumer deployments via _is_task_consumer marker - Automatically creates QueueMonitor actor and switches to queue-based policy - Only applies when user hasn't specified a custom autoscaling policy
  4. Lifecycle Management (autoscaling_state.py) - QueueMonitor actors are cleaned up when deployments are deleted - Prevents actor leaks and ensures test isolation

Files Changed

File Changes
python/ray/serve/_private/queue_monitor.py New file - QueueMonitor actor for broker queries
python/ray/serve/autoscaling_policy.py Add queue_based_autoscaling_policy, refactor smoothing logic
python/ray/serve/_private/application_state.py Auto-configure queue-based autoscaling for TaskConsumers
python/ray/serve/_private/autoscaling_state.py Clean up QueueMonitor actors on deployment deletion
python/ray/serve/_private/constants.py Add DEFAULT_QUEUE_BASED_AUTOSCALING_POLICY constant
python/ray/serve/task_consumer.py Add _is_task_consumer marker and get_queue_monitor_config()
python/ray/serve/tests/test_task_processor.py Integration tests for queue-based autoscaling
python/ray/serve/tests/unit/test_queue_autoscaling_policy.py New file - Unit tests for policy
python/ray/serve/tests/unit/test_queue_monitor.py New file - Unit tests for QueueMonitor

Test Plan

  • Unit tests for queue_based_autoscaling_policy (19 tests)
    • Basic scaling formula verification
    • Scale from zero handling
    • Min/max replicas enforcement
    • Upscale/downscale delays
    • QueueMonitor unavailability handling
    • Actor recovery from policy_state
  • Unit tests for QueueMonitor (8 tests)
    • Redis and RabbitMQ initialization
    • Queue length queries
    • Error handling and fallback to cached values
  • Integration tests (3 tests)
    • test_task_consumer_scales_up_based_on_queue_depth - Verifies scaling from 1→4 replicas with 20 queued tasks
    • test_task_consumer_autoscaling_respects_max_replicas - Verifies max_replicas cap
    • test_task_consumer_autoscaling_respects_min_replicas - Verifies min_replicas floor

P.S. Additional test cases (e.g., edge cases, failure scenarios, multi-deployment tests) will be added in a follow-up PR as this PR is already substantial in scope.

harshit-anyscale avatar Dec 10 '25 16:12 harshit-anyscale