vllm icon indicating copy to clipboard operation
vllm copied to clipboard

[Core] feat: Implement Priority Scheduling in V1 Engine

Open amitm02 opened this issue 5 months ago • 6 comments

This commit introduces priority scheduling capabilities to the V1 LLM engine.

Key changes include:

EngineCoreRequest and Request updates:

Added a priority field to EngineCoreRequest and Request classes to carry priority information. Processor update:

Modified Processor.process_inputs to accept and pass the priority to EngineCoreRequest. V1 Scheduler modifications:

The scheduler now respects the --scheduling-policy argument. When policy="priority", self.waiting is managed as a min-heap, prioritizing requests by their assigned priority value (lower value means higher priority) and then by arrival time (FCFS). Preemption logic now correctly identifies and preempts the actual lowest-priority running request when space is needed for higher-priority or new requests. FCFS behavior is maintained when policy="fcfs". Documentation:

Updated docs/usage/v1_guide.md and docs/serving/openai_compatible_server.md to reflect V1 engine's support for priority scheduling. Unit Tests:

Added a new test suite in tests/v1/core/test_scheduler.py. This allows you to influence the order of request processing in the V1 engine by assigning priorities, which is particularly useful in scenarios with varying request importance.

FIX https://github.com/vllm-project/vllm/issues/14002

amitm02 avatar Jun 03 '25 06:06 amitm02

👋 Hi! Thank you for contributing to the vLLM project.

💬 Join our developer Slack at https://slack.vllm.ai to discuss your PR in #pr-reviews, coordinate on features in #feat- channels, or join special interest groups in #sig- channels.

Just a reminder: PRs would not trigger full CI run by default. Instead, it would only run fastcheck CI which starts running only a small and essential subset of CI tests to quickly catch errors. You can run other CI tests on top of those by going to your fastcheck build on Buildkite UI (linked in the PR checks section) and unblock them. If you do not have permission to unblock, ping simon-mo or khluu to add you in our Buildkite org.

Once the PR is approved and ready to go, your PR reviewer(s) can run CI to test the changes comprehensively before merging.

To run CI, PR reviewers can either: Add ready label to the PR or enable auto-merge.

🚀

github-actions[bot] avatar Jun 03 '25 06:06 github-actions[bot]

This pull request has merge conflicts that must be resolved before it can be merged. Please rebase the PR, @amitm02.

https://docs.github.com/en/pull-requests/collaborating-with-pull-requests/working-with-forks/syncing-a-fork

mergify[bot] avatar Jun 04 '25 22:06 mergify[bot]

This pull request has merge conflicts that must be resolved before it can be merged. Please rebase the PR, @amitm02.

https://docs.github.com/en/pull-requests/collaborating-with-pull-requests/working-with-forks/syncing-a-fork

mergify[bot] avatar Jun 11 '25 07:06 mergify[bot]

@amitm02 Could you please benchmark the performance for following three case?

  1. Current main (before this PR)
  2. FIFO scheduling w/ this PR
  3. Priority scheduling w/ this PR (when all reqs have the same priority)

WoosukKwon avatar Jun 18 '25 17:06 WoosukKwon

@amitm02 Could you please benchmark the performance for following three case?

  1. Current main (before this PR)
  2. FIFO scheduling w/ this PR
  3. Priority scheduling w/ this PR (when all reqs have the same priority)

Realistic Scheduling Performance Benchmark

Scenario Requests Avg Total Time (ms) Avg Schedule Calls Ops/sec
Previous FCFS Scheduling 100 5.205 56.0 19210.6
FCFS Scheduling 100 5.377 56.0 18596.8
Priority Scheduling (Random) 100 5.841 56.0 17120.8
Priority Scheduling (Same Priority) 100 5.403 56.0 18509.5
Previous FCFS Scheduling 500 24.586 256.0 20336.6
FCFS Scheduling 500 27.143 256.0 18421.1
Priority Scheduling (Random) 500 27.359 256.0 18275.7
Priority Scheduling (Same Priority) 500 26.668 256.0 18749.0
Previous FCFS Scheduling 1000 49.736 504.0 20106.0
FCFS Scheduling 1000 57.698 504.0 17331.6
Priority Scheduling (Random) 1000 54.741 504.0 18267.7
Priority Scheduling (Same Priority) 1000 52.387 504.0 19088.7
Previous FCFS Scheduling 5000 243.834 2504.0 20505.7
FCFS Scheduling 5000 264.800 2504.0 18882.1
Priority Scheduling (Random) 5000 274.427 2504.0 18219.8
Priority Scheduling (Same Priority) 5000 263.425 2504.0 18980.7
Previous FCFS Scheduling 10000 487.705 5000.0 20504.2
FCFS Scheduling 10000 534.170 5000.0 18720.6
Priority Scheduling (Random) 10000 544.555 5000.0 18363.6
Priority Scheduling (Same Priority) 10000 538.537 5000.0 18568.8
Previous FCFS Scheduling 50000 2466.662 25000.0 20270.3
FCFS Scheduling 50000 2701.605 25000.0 18507.5
Priority Scheduling (Random) 50000 2785.103 25000.0 17952.7
Priority Scheduling (Same Priority) 50000 2697.503 25000.0 18535.7
Previous FCFS Scheduling 200000 9927.364 100000.0 20146.3
FCFS Scheduling 200000 10988.144 100000.0 18201.4
Priority Scheduling (Random) 200000 11464.132 100000.0 17445.7
Priority Scheduling (Same Priority) 200000 10955.162 100000.0 18256.2
Previous FCFS Scheduling 500000 24660.033 250000.0 20275.7
FCFS Scheduling 500000 27254.433 250000.0 18345.6
Priority Scheduling (Random) 500000 28612.106 250000.0 17475.1
Priority Scheduling (Same Priority) 500000 27255.262 250000.0 18345.1

What We Tested: Realistic Scheduler Performance Under Load

This benchmark simulates a realistic production scenario where:

  • Large batches of requests (100 to 500,000) arrive simultaneously at the scheduler
  • Limited resources: Only 16 requests can run concurrently, with 512 tokens per batch
  • Multiple scheduling cycles: Each request requires ~16 tokens to complete, necessitating many schedule() calls
  • Three scheduling policies: FCFS, Priority with random priorities, and Priority with same priorities

The test measures end-to-end performance from when all requests are queued until all are completed.

Column Meanings:

Scenario

The scheduling algorithm being tested:

  • Previous FCFS: The current FCFS implementation before this PR (baseline for comparison)
  • FCFS: New First-Come-First-Served implementation being introduced in this PR
  • Priority (Random): Requests have random priorities 0-100, highest priority served first
  • Priority (Same): All requests have identical priority, falls back to FCFS behavior

Requests

Total number of requests processed in the test batch (100 to 500,000)

Avg Total Time (ms)

Complete wall-clock time to process ALL requests from start to finish, including:

  • All schedule() method calls to make scheduling decisions
  • All update_from_output() calls to process simulated model results
  • Queue management and memory allocation operations

Avg Schedule Calls

Number of times scheduler.schedule() was invoked to complete all requests

  • Pattern observed: ~0.5 calls per request (e.g., 100 requests → 56 calls)
  • Why less than 1:1: Batching efficiency - each call can schedule multiple requests
  • Key insight: More calls = more scheduling overhead

Ops/sec

Throughput metric: total_requests ÷ (total_time_in_seconds)

  • Higher is better: More requests processed per second
  • Range observed: ~17,000-19,000 requests/second
  • Shows overall efficiency of the scheduling policy under realistic constraints

Key Findings from Your Results:

  1. Baseline comparison: Previous FCFS (current implementation) consistently outperforms the new implementations by ~2-5%
  2. Small performance differences: Priority scheduling shows only 3-5% overhead vs FCFS
  3. Consistent scaling: Performance remains stable even at 500K requests
  4. Batching efficiency: ~2 requests processed per schedule() call on average
  5. Priority overhead is manageable: The heap operations don't dominate at these scales
  6. Implementation trade-offs: The new FCFS shows slight performance regression compared to the baseline

This benchmark provides realistic insights into scheduler performance under production-like workloads with proper resource constraints.

def benchmark_scheduling_performance():
    """Benchmark the performance of different scheduling scenarios with realistic constraints."""
    
    # Test parameters
    num_requests_list = [100, 500, 1000, 5000, 10000, 50000, 200000, 500000] 
    num_iterations = 3  # Reduced iterations since we're doing more work per test
    
    # Realistic scheduler constraints
    max_concurrent_requests = 16  # Typical batch size
    max_tokens_per_batch = 512   # Typical token budget
    max_tokens_per_request = 16  # Typical output tokens per request
    
    print("\n=== Realistic Scheduling Performance Benchmark ===")
    print(f"Scheduler limits: {max_concurrent_requests} concurrent requests, {max_tokens_per_batch} tokens/batch")
    print(f"{'Scenario':<40} {'Requests':<10} {'Avg Total Time (ms)':<15} {'Avg Schedule Calls':<15} {'Ops/sec':<15}")
    print("-" * 100)
    
    def simulate_model_execution(scheduler_output):
        """Create mock model output to simulate request completion."""
        req_ids = []
        req_id_to_index = {}
        sampled_token_ids = []
        
        # For new requests, generate first output token
        for i, req_data in enumerate(scheduler_output.scheduled_new_reqs):
            req_ids.append(req_data.req_id)
            req_id_to_index[req_data.req_id] = i
            sampled_token_ids.append([100])  # Mock token ID
        
        # For cached requests, generate next token or finish
        for i, req_data in enumerate(scheduler_output.scheduled_cached_reqs):
            req_idx = len(scheduler_output.scheduled_new_reqs) + i
            req_ids.append(req_data.req_id)
            req_id_to_index[req_data.req_id] = req_idx
            # Simulate finishing after max_tokens_per_request tokens
            if req_data.num_computed_tokens >= max_tokens_per_request:
                sampled_token_ids.append([EOS_TOKEN_ID])  # Finish request
            else:
                sampled_token_ids.append([100])  # Continue generating
        
        return ModelRunnerOutput(
            req_ids=req_ids,
            req_id_to_index=req_id_to_index,
            sampled_token_ids=sampled_token_ids,
            spec_token_ids=None,
            logprobs=None,
            prompt_logprobs_dict={},
            pooler_output=[],
        )
    
    for num_requests in num_requests_list:
        print(f"\nTesting with {num_requests:,} requests...")
        
        # Test 1: FCFS Scheduling
        fcfs_times = []
        fcfs_schedule_calls = []
        for iteration in range(num_iterations):
            scheduler = create_scheduler(
                max_num_seqs=max_concurrent_requests,
                max_num_batched_tokens=max_tokens_per_batch
            )
            
            # Create requests for FCFS
            requests = create_requests(num_requests=num_requests, num_tokens=10, max_tokens=max_tokens_per_request)
            
            # Add all requests to scheduler
            for request in requests:
                scheduler.add_request(request)
            
            # Measure time for all scheduling operations
            start_time = time.perf_counter()
            schedule_call_count = 0
            
            while scheduler.get_num_unfinished_requests() > 0:
                output = scheduler.schedule()
                schedule_call_count += 1
                
                if output.scheduled_new_reqs or output.scheduled_cached_reqs:
                    model_output = simulate_model_execution(output)
                    scheduler.update_from_output(output, model_output)
                else:
                    # No requests scheduled, break to avoid infinite loop
                    break
            
            end_time = time.perf_counter()
            fcfs_times.append((end_time - start_time) * 1000)
            fcfs_schedule_calls.append(schedule_call_count)
        
        avg_fcfs_time = sum(fcfs_times) / len(fcfs_times)
        avg_fcfs_calls = sum(fcfs_schedule_calls) / len(fcfs_schedule_calls)
        fcfs_ops_per_sec = num_requests / (avg_fcfs_time / 1000)
        
        print(f"{'FCFS Scheduling':<40} {num_requests:<10} {avg_fcfs_time:<15.3f} {avg_fcfs_calls:<15.1f} {fcfs_ops_per_sec:<15.1f}")
        
        # Test 2: Priority Scheduling with Random Priorities
        priority_random_times = []
        priority_random_calls = []
        for iteration in range(num_iterations):
            scheduler = create_scheduler_with_priority(
                max_num_seqs=max_concurrent_requests,
                max_num_batched_tokens=max_tokens_per_batch
            )
            
            # Create requests with random priorities
            random.seed(42)  # For reproducible results
            priorities = [random.randint(0, 100) for _ in range(num_requests)]
            arrival_times = [float(i) for i in range(num_requests)]
            
            requests = create_requests_with_priority(
                num_requests=num_requests,
                priorities=priorities,
                arrival_times=arrival_times,
                num_tokens=10,
                max_tokens=max_tokens_per_request
            )
            
            # Add all requests to scheduler
            for request in requests:
                scheduler.add_request(request)
            
            # Measure time for all scheduling operations
            start_time = time.perf_counter()
            schedule_call_count = 0
            
            while scheduler.get_num_unfinished_requests() > 0:
                output = scheduler.schedule()
                schedule_call_count += 1
                
                if output.scheduled_new_reqs or output.scheduled_cached_reqs:
                    model_output = simulate_model_execution(output)
                    scheduler.update_from_output(output, model_output)
                else:
                    break
            
            end_time = time.perf_counter()
            priority_random_times.append((end_time - start_time) * 1000)
            priority_random_calls.append(schedule_call_count)
        
        avg_priority_random_time = sum(priority_random_times) / len(priority_random_times)
        avg_priority_random_calls = sum(priority_random_calls) / len(priority_random_calls)
        priority_random_ops_per_sec = num_requests / (avg_priority_random_time / 1000)
        
        print(f"{'Priority Scheduling (Random)':<40} {num_requests:<10} {avg_priority_random_time:<15.3f} {avg_priority_random_calls:<15.1f} {priority_random_ops_per_sec:<15.1f}")
        
        # Test 3: Priority Scheduling with Same Priority (FCFS fallback)
        priority_same_times = []
        priority_same_calls = []
        for iteration in range(num_iterations):
            scheduler = create_scheduler_with_priority(
                max_num_seqs=max_concurrent_requests,
                max_num_batched_tokens=max_tokens_per_batch
            )
            
            # Create requests with same priority
            priorities = [5] * num_requests  # All same priority
            arrival_times = [float(i) for i in range(num_requests)]
            
            requests = create_requests_with_priority(
                num_requests=num_requests,
                priorities=priorities,
                arrival_times=arrival_times,
                num_tokens=10,
                max_tokens=max_tokens_per_request
            )
            
            # Add all requests to scheduler
            for request in requests:
                scheduler.add_request(request)
            
            # Measure time for all scheduling operations
            start_time = time.perf_counter()
            schedule_call_count = 0
            
            while scheduler.get_num_unfinished_requests() > 0:
                output = scheduler.schedule()
                schedule_call_count += 1
                
                if output.scheduled_new_reqs or output.scheduled_cached_reqs:
                    model_output = simulate_model_execution(output)
                    scheduler.update_from_output(output, model_output)
                else:
                    break
            
            end_time = time.perf_counter()
            priority_same_times.append((end_time - start_time) * 1000)
            priority_same_calls.append(schedule_call_count)
        
        avg_priority_same_time = sum(priority_same_times) / len(priority_same_times)
        avg_priority_same_calls = sum(priority_same_calls) / len(priority_same_calls)
        priority_same_ops_per_sec = num_requests / (avg_priority_same_time / 1000)
        
        print(f"{'Priority Scheduling (Same Priority)':<40} {num_requests:<10} {avg_priority_same_time:<15.3f} {avg_priority_same_calls:<15.1f} {priority_same_ops_per_sec:<15.1f}")

amitm02 avatar Jun 19 '25 10:06 amitm02

I think the code looks a lot cleaner. Thanks for this.

But I'm not sure about the degradation in FCFS. 5% is quite a huge margin...

Thanks! Glad the cleanup looks better.

Regarding the FCFS degradation — while it’s around 5%, we’re still seeing ~18k ops/sec, which is far above the actual bottleneck introduced by vLLM. I don’t think it’s worth optimizing something that already performs well beyond what the system can realistically handle. It feels like premature optimization at this point.

amitm02 avatar Jun 20 '25 09:06 amitm02

Hi @amitm02, this feature seems to only consider priority when the new block is insufficient and preempt the block of low-priority requests. It cannot guarantee that high-priority [requests] will have a faster response.

7d1-z avatar Jul 10 '25 02:07 7d1-z

Hi @amitm02, this feature seems to only consider priority when the new block is insufficient and preempt the block of low-priority requests. It cannot guarantee that high-priority [requests] will have a faster response.

How so? The queue of pending request is a priority queue

amitm02 avatar Jul 10 '25 04:07 amitm02

@amitm02 Sorry, you are right. When decrease max_num_seqs. The change is evident!

7d1-z avatar Jul 10 '25 06:07 7d1-z