ray
ray copied to clipboard
add queue length based autoscaling
This PR adds queue-based autoscaling for Ray Serve TaskConsumer deployments. TaskConsumers are workloads that consume tasks from message queues (Redis, RabbitMQ), and their scaling needs are fundamentally different from HTTP-based deployments.
Instead of scaling based on HTTP request load, TaskConsumers should scale based on the number of pending tasks in the message queue.
Key Features
- Automatic queue depth monitoring via QueueMonitor Ray actor
- New
queue_based_autoscaling_policythat scales replicas based on ceil(queue_length / target_ongoing_requests) - Transparent integration: TaskConsumers with autoscaling enabled automatically use queue-based scaling
- Support for Redis and RabbitMQ brokers
- Actor recovery mechanism for fault tolerance
Architecture
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ Message Queue │◄─────│ QueueMonitor │ │ ServeController │
│ (Redis/RMQ) │ │ Actor │◄─────│ Autoscaler │
└─────────────────┘ └──────────────────┘ └─────────────────┘
│ │
│ get_queue_length() │
└─────────────────────────┘
│
▼
┌───────────────────────────┐
│ queue_based_autoscaling │
│ _policy() │
│ desired = ceil(len/target)│
└───────────────────────────┘
Components
- QueueMonitor Actor (
queue_monitor.py) - Lightweight Ray actor that queries queue length from the message broker - Supports Redis (using LLEN command) and RabbitMQ (using passive queue declaration) - Named actor format:QUEUE_MONITOR::<deployment_name>- Detached lifecycle with automatic cleanup on deployment deletion - Queue-Based Autoscaling Policy (
autoscaling_policy.py) - New policy function: queue_based_autoscaling_policy - Formula: desired_replicas = ceil(queue_length / target_ongoing_requests) - Reuses existing smoothing/delay logic to prevent oscillation - Stores QueueMonitor config in policy_state for actor recovery - Automatic Policy Switching (
application_state.py) - Detects TaskConsumer deployments via _is_task_consumer marker - Automatically creates QueueMonitor actor and switches to queue-based policy - Only applies when user hasn't specified a custom autoscaling policy - Lifecycle Management (
autoscaling_state.py) - QueueMonitor actors are cleaned up when deployments are deleted - Prevents actor leaks and ensures test isolation
Files Changed
| File | Changes |
|---|---|
| python/ray/serve/_private/queue_monitor.py | New file - QueueMonitor actor for broker queries |
| python/ray/serve/autoscaling_policy.py | Add queue_based_autoscaling_policy, refactor smoothing logic |
| python/ray/serve/_private/application_state.py | Auto-configure queue-based autoscaling for TaskConsumers |
| python/ray/serve/_private/autoscaling_state.py | Clean up QueueMonitor actors on deployment deletion |
| python/ray/serve/_private/constants.py | Add DEFAULT_QUEUE_BASED_AUTOSCALING_POLICY constant |
| python/ray/serve/task_consumer.py | Add _is_task_consumer marker and get_queue_monitor_config() |
| python/ray/serve/tests/test_task_processor.py | Integration tests for queue-based autoscaling |
| python/ray/serve/tests/unit/test_queue_autoscaling_policy.py | New file - Unit tests for policy |
| python/ray/serve/tests/unit/test_queue_monitor.py | New file - Unit tests for QueueMonitor |
Test Plan
- Unit tests for queue_based_autoscaling_policy (19 tests)
- Basic scaling formula verification
- Scale from zero handling
- Min/max replicas enforcement
- Upscale/downscale delays
- QueueMonitor unavailability handling
- Actor recovery from policy_state
- Unit tests for QueueMonitor (8 tests)
- Redis and RabbitMQ initialization
- Queue length queries
- Error handling and fallback to cached values
- Integration tests (3 tests)
- test_task_consumer_scales_up_based_on_queue_depth - Verifies scaling from 1→4 replicas with 20 queued tasks
- test_task_consumer_autoscaling_respects_max_replicas - Verifies max_replicas cap
- test_task_consumer_autoscaling_respects_min_replicas - Verifies min_replicas floor
P.S. Additional test cases (e.g., edge cases, failure scenarios, multi-deployment tests) will be added in a follow-up PR as this PR is already substantial in scope.