-
Notifications
You must be signed in to change notification settings - Fork 2.7k
Closed
Description
Abstract
This RFC proposes a dynamic load balancing solution for VeRL to address the imbalance problem in multi-turn rollout. The solution introduces capacity-aware request distribution and dynamic VLLM instance capacity scaling based on GPU resource utilization.
Motivation
Problem Statement
The current VERL implementation suffers from load imbalance issues that impact system performance and resource utilization:
- Resource Underutilization and slow training: Some GPUs may be idle waiting for the longest rollout due to varying rollout lengths. In multi-turn scenarios, tasks have different numbers of turns and varying decode lengths per turn, which further exacerbates the problem.
- Static request Distribution: rollouts in a batch are statically and evenly distributed.
- Concurrency Bottlenecks: Load balancing effectiveness is limited by fixed concurrency settings, also mentioned in server based multi-turn training load balance #2520.
Solution Overview
The proposed solution introduces a three-tier architecture for dynamic load balancing:
- AgentLoopManager: Central coordinator for request distribution with capacity awareness
- AgentLoopWorker: Local request handler that processes requests and manages local capacity tracking
- LLMServerManager: Resource monitor and capacity updater that distributes requests to selected VLLM instances, supporting least-request priority and session affinity
Architecture
Detailed Design
1. AgentLoopManager
Responsibilities:
- Central request distribution coordinator with capacity constraints
- Maintains global view of all AgentLoopWorker capacities
- Supports request-level processing with iterative distribution
Key Features:
class AgentLoopManager:
def __init__(self):
self.workers = {} # worker_id -> AgentLoopWorker
self.capacities = {} # worker_id -> current_capacity
self.pending_requests = []
def distribute_requests(self, batch_requests):
"""Distribute requests considering worker capacities"""
while batch_requests:
for worker_id, worker in self.workers.items():
if self.capacities[worker_id] > 0 and batch_requests:
request = batch_requests.pop(0)
worker.submit_request(request)
self.capacities[worker_id] -= 1
2. AgentLoopWorker
Responsibilities:
- Local request processing
- Capacity updates
Key Features:
class AgentLoopWorker:
def __init__(self, worker_id):
self.worker_id = worker_id
self.request_queue = []
self.current_capacity = 0
def process_request(self, request, target_instance):
"""Process request on specified VLLM instance"""
if self.current_capacity > 0:
self._send_to_vllm(request, target_instance)
self.current_capacity -= 1
def update_capacity(self, new_capacity):
"""Update worker capacity from LLMServerManager"""
self.current_capacity = new_capacity3. LLMServerManager
Responsibilities:
- Periodic monitoring of VLLM instance GPU block availability
- Concurrent request capacity calculation based on free GPU blocks
- Request routing to available VLLM instances
- Concurrent capacity updates to AgentLoopWorkers
- Resource utilization reporting
Key Features:
class LLMServerManager:
def __init__(self, monitoring_interval=5):
self.monitoring_interval = monitoring_interval
self.vllm_instances = {}
self.workers = {}
self.instance_capacities = {}
self.blocks_per_request = 1
def start_monitoring(self):
"""Start periodic capacity monitoring"""
while True:
self._update_all_capacities()
time.sleep(self.monitoring_interval)
def route_request(self, request):
"""Route request to best available VLLM instance"""
best_instance = self._select_best_instance()
if best_instance and self.instance_capacities[best_instance] > 0:
worker = self._get_worker_for_instance(best_instance)
worker.process_request(request, best_instance)
self.instance_capacities[best_instance] -= 1
def _select_best_instance(self):
"""Select VLLM instance with highest available capacity"""
pass
def _get_worker_for_instance(self, instance_id):
"""Get worker responsible for specific VLLM instance"""
return self.workers[instance_id]
def _update_all_capacities(self):
"""Update concurrent request capacities for all VLLM instances"""
for instance_id, instance in self.vllm_instances.items():
free_blocks = self._get_free_gpu_blocks(instance)
capacity = self._calculate_capacity(free_blocks)
self.instance_capacities[instance_id] = capacity
self._notify_workers(instance_id, capacity)
def _calculate_capacity(self, free_blocks):
"""Calculate maximum concurrent request capacity from free GPU blocks"""
# Implementation depends on VLLM's block allocation strategy
return free_blocks * self.blocks_per_requestExtension Features
Request Abort & Migration (Depends on PR #2200)
Once PR #2200 is merged, the following features can be implemented:
- Request Abort: Ability to cancel requests stuck in overloaded instances
- Request Migration: Move requests from overloaded to underutilized instances
- Dynamic Rebalancing: Real-time load redistribution based on current conditions
Metadata
Metadata
Assignees
Labels
No labels