Skip to content

Commit 6db61dc

Browse files
JoyboyBrianclaude
authored andcommitted
[algo] feat: Add RateLimitedRewardLoopManager with three-layer rate limiting for API-based rewards (verl-project#4107)
### What does this PR do? This PR implements a **three-layer rate limiting system** for API-based reward functions in the reward loop manager, specifically designed for LLM-as-judge scenarios. The new `RateLimitedRewardLoopManager` provides: 1. **Concurrency limiting** (`max_concurrent`) - Controls maximum parallel requests 2. **Request rate limiting** (`max_rpm`) - Limits requests per minute (RPM) 3. **Token rate limiting** (`max_tpm`) - Limits tokens per minute (TPM) This is essential for integrating with external API-based reward models (e.g., OpenAI, Anthropic) that have rate limits, preventing request failures and ensuring smooth training workflows. The implementation uses a custom `AsyncTokenBucket` algorithm for smooth rate limiting and includes timeout handling and error recovery mechanisms. **Related context:** This addresses the need for controlled API usage during RL training when using external LLM-as-judge reward models. ### Checklist Before Starting - [x] Search for similar PRs: [GitHub search for "rate limit"](https://github.com/volcengine/verl/search?q=rate+limit&type=pullrequests) - [x] Format the PR title as `[reward] feat: Add RateLimitedRewardLoopManager with three-layer rate limiting` ### Test **Testing approach:** - Unit tests should cover the `AsyncTokenBucket` rate limiting logic - Integration tests should validate rate limiting with mock API reward functions - Manual validation: Configure different rate limits and verify request patterns comply with limits ### API and Usage Example **Configuration:** ```yaml # In config file or via command line overrides reward_model: manager: rate_limited # Use the new rate-limited manager max_concurrent: 10 # Max parallel requests max_rpm: 100 # Max 100 requests per minute max_tpm: 20000 # Max 20,000 tokens per minute estimated_tokens_per_request: 2000 # Token estimate for TPM limiting timeout: 300.0 # Timeout in seconds ``` ### Design & Code Changes **High-level design:** 1. **AsyncTokenBucket class** ([limited.py:30-63](verl/experimental/reward/reward_loop/limited.py#L30-L63)) - Implements token bucket algorithm for smooth rate limiting - Supports variable token consumption (useful for TPM limiting) - Thread-safe with asyncio locks - Auto-refills tokens based on configured rate 2. **RateLimitedRewardLoopManager class** ([limited.py:66-235](verl/experimental/reward/reward_loop/limited.py#L66-L235)) - **Class-level state**: Rate limiters are shared globally across all worker instances to ensure limits apply system-wide - **Three-layer limiting**: - Layer 1 (Concurrency): `asyncio.Semaphore` limits parallel requests - Layer 2 (RPM): `AsyncTokenBucket` limits requests per minute - Layer 3 (TPM): `AsyncTokenBucket` limits tokens per minute - **Initialization guard**: `_class_initialized` flag prevents duplicate initialization - **Error handling**: Timeout and exception handling with fallback rewards - **Logging**: Detailed configuration logging on initialization **Specific changes:** - Added `verl/experimental/reward/reward_loop/limited.py` (235 lines) - Updated `verl/experimental/reward/reward_loop/__init__.py` to export `RateLimitedRewardLoopManager` - Added `CLAUDE.md` to `.gitignore` **Key implementation details:** - Rate limiters acquire in order: RPM → TPM → Concurrency, ensuring smooth throttling - Timeout handling returns reward=0.0 with metadata for debugging - Supports both sync and async reward functions via `inspect.iscoroutinefunction` - Estimated tokens per request used for TPM limiting (configurable) ### Documentation **Added comprehensive docstrings** to [verl/experimental/reward/reward_loop/limited.py](verl/experimental/reward/reward_loop/limited.py): 1. **AsyncTokenBucket class** (lines 30-137) - Detailed algorithm explanation (token bucket rate limiting) - Complete Args and Attributes documentation with types - Thread safety guarantees - Usage examples for RPM and TPM limiting scenarios - Step-by-step algorithm details - Implementation notes about asyncio event loop usage 2. **RateLimitedRewardLoopManager class** (lines 140-219) - Comprehensive overview of three-layer rate limiting architecture - Detailed Rate Limiting Flow explanation - Full configuration parameters with defaults and descriptions - Global class-level state management documentation - Example configuration with concrete values - Thread safety notes for distributed training - Cross-references to related classes and functions **Documentation coverage:** - All public methods have detailed docstrings with Args, Returns, Raises, and Examples - Class-level documentation explains design patterns and use cases - Code examples demonstrate common usage patterns - Algorithm details help developers understand implementation ### Test Coverage **Created comprehensive test suite** with 35+ test cases covering both unit and integration scenarios: #### 1. **Unit Tests for AsyncTokenBucket** ([tests/experimental/reward/test_async_token_bucket_on_cpu.py](tests/experimental/reward/test_async_token_bucket_on_cpu.py)) 19 test cases covering: - ✅ Basic token acquisition and refill mechanism - ✅ Waiting behavior when tokens are insufficient - ✅ Max tokens capacity cap enforcement - ✅ Fractional token consumption - ✅ Concurrent acquires with race condition handling - ✅ High rate limit scenarios (1000 tokens/sec) - ✅ Rate limit accuracy verification (within 20% margin) - ✅ Sequential vs concurrent acquisition patterns - ✅ Large token acquisitions (5x capacity) - ✅ Multiple wait cycles in refill loop - ✅ Thread safety with locks under high concurrency - ✅ Default parameters (max_tokens = rate_limit) - ✅ Zero initial state and first-use behavior - ✅ Rapid small acquisitions (50x 2-token requests) #### 2. **Integration Tests with Mock API Functions** ([tests/experimental/reward/test_rate_limited_reward_manager_on_cpu.py](tests/experimental/reward/test_rate_limited_reward_manager_on_cpu.py)) 16 test cases covering: - ✅ Basic reward computation (sync and async) - ✅ RPM rate limiting validation (60 RPM = 1 req/sec) - ✅ TPM rate limiting validation (6000 TPM with 2000 tokens/req) - ✅ Concurrency limiting (max 2 concurrent requests) - ✅ Timeout handling for slow APIs (500ms timeout) - ✅ Error handling for failing APIs with exception catching - ✅ Dict vs float return format handling - ✅ Combined multi-layer rate limits (all 3 layers active) - ✅ Correct vs incorrect answer scoring - ✅ High throughput scenarios (50+ concurrent requests) - ✅ Class initialization idempotency - ✅ Extra info propagation through reward pipeline - ✅ Synchronous reward function compatibility - ✅ Global rate limiter sharing across instances **Mock API functions implemented:** - `mock_sync_reward_function` - Synchronous API simulation - `mock_async_reward_function` - Asynchronous API with call tracking - `mock_slow_api_function` - Timeout testing (2s delay) - `mock_failing_api_function` - Error handling testing - `mock_dict_result_function` - Complex result format testing - `MockAPICounter` - Global call tracking and rate measurement **CI Integration:** - Both test files follow `*_on_cpu.py` naming convention - Automatically discovered by [.github/workflows/cpu_unit_tests.yml](.github/workflows/cpu_unit_tests.yml) - No workflow changes needed - tests run automatically on every PR - Tests validated for syntax correctness and async compatibility **Test execution:** ```bash # Run AsyncTokenBucket unit tests pytest tests/experimental/reward/test_async_token_bucket_on_cpu.py -v --asyncio-mode=auto # Run integration tests with mock APIs pytest tests/experimental/reward/test_rate_limited_reward_manager_on_cpu.py -v --asyncio-mode=auto # Run all reward loop tests pytest tests/experimental/reward/ -v --asyncio-mode=auto ``` ### Checklist Before Submitting - [x] Read the [Contribute Guide](https://github.com/volcengine/verl/blob/main/CONTRIBUTING.md). - [x] Apply [pre-commit checks](https://github.com/volcengine/verl/blob/main/CONTRIBUTING.md#code-linting-and-formatting): `pre-commit install && pre-commit run --all-files --show-diff-on-failure --color=always` - [x] Add / Update [the documentation](https://github.com/volcengine/verl/tree/main/docs). ✅ **Added comprehensive docstrings to AsyncTokenBucket and RateLimitedRewardLoopManager classes** - [x] Add unit or end-to-end test(s) to [the CI workflow](https://github.com/volcengine/verl/tree/main/.github/workflows) to cover all the code. ✅ **Added 35+ async unit tests and integration tests with mock API functions, automatically run in CPU CI workflow** - [x] Once your PR is ready for CI, send a message in [the `ci-request` channel](https://verl-project.slack.com/archives/C091TCESWB1) in [the `verl` Slack workspace](https://join.slack.com/t/verl-project/shared_invite/zt-3855yhg8g-CTkqXu~hKojPCmo7k_yXTQ). --- **Notes for reviewers:** - The class-level state pattern ensures rate limits are enforced globally across all workers in distributed training - The token bucket implementation provides smooth rate limiting without bursts - Timeout and error handling ensure training continues even if reward computation fails --------- Co-authored-by: Claude <noreply@anthropic.com>
1 parent c460541 commit 6db61dc

File tree

8 files changed

+1322
-10
lines changed

8 files changed

+1322
-10
lines changed

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,4 +128,4 @@ ENV/
128128
logs
129129
log
130130
outputs
131-
.history
131+
.history
Lines changed: 267 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,267 @@
1+
# Copyright 2024 Bytedance Ltd. and/or its affiliates
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import asyncio
16+
import time
17+
18+
import pytest
19+
20+
from verl.experimental.reward.reward_loop.limited import AsyncTokenBucket
21+
22+
23+
class TestAsyncTokenBucket:
24+
"""Unit tests for AsyncTokenBucket rate limiter."""
25+
26+
@pytest.mark.asyncio
27+
async def test_basic_acquire(self):
28+
"""Test basic token acquisition."""
29+
bucket = AsyncTokenBucket(rate_limit=10.0, max_tokens=10.0)
30+
31+
# Should be able to acquire tokens immediately when bucket is full
32+
start = time.time()
33+
await bucket.acquire(5.0)
34+
elapsed = time.time() - start
35+
36+
assert elapsed < 0.1, "Initial acquire should be immediate"
37+
assert bucket.tokens == pytest.approx(5.0, abs=0.1)
38+
39+
@pytest.mark.asyncio
40+
async def test_refill_mechanism(self):
41+
"""Test that tokens refill over time."""
42+
bucket = AsyncTokenBucket(rate_limit=10.0, max_tokens=10.0)
43+
44+
# Consume all tokens
45+
await bucket.acquire(10.0)
46+
assert bucket.tokens == pytest.approx(0.0, abs=0.1)
47+
48+
# Wait for refill (should get ~5 tokens in 0.5 seconds at 10 tokens/sec)
49+
await asyncio.sleep(0.5)
50+
51+
# Try to acquire 4 tokens (should succeed without waiting)
52+
start = time.time()
53+
await bucket.acquire(4.0)
54+
elapsed = time.time() - start
55+
56+
assert elapsed < 0.1, "Acquire should be quick after refill"
57+
58+
@pytest.mark.asyncio
59+
async def test_waiting_for_tokens(self):
60+
"""Test that acquire waits when insufficient tokens available."""
61+
bucket = AsyncTokenBucket(rate_limit=10.0, max_tokens=10.0)
62+
63+
# Consume all tokens
64+
await bucket.acquire(10.0)
65+
66+
# Try to acquire more tokens (should wait ~0.5 seconds for 5 tokens)
67+
start = time.time()
68+
await bucket.acquire(5.0)
69+
elapsed = time.time() - start
70+
71+
# Should wait approximately 0.5 seconds (5 tokens / 10 tokens per second)
72+
assert 0.4 < elapsed < 0.7, f"Expected ~0.5s wait, got {elapsed:.3f}s"
73+
74+
@pytest.mark.asyncio
75+
async def test_max_tokens_cap(self):
76+
"""Test that tokens don't exceed max_tokens capacity."""
77+
bucket = AsyncTokenBucket(rate_limit=10.0, max_tokens=5.0)
78+
79+
# Wait for potential overflow
80+
await asyncio.sleep(1.0)
81+
82+
# Tokens should be capped at max_tokens
83+
await bucket.acquire(1.0)
84+
85+
# After 1 second at 10 tokens/sec, should have max_tokens (5.0)
86+
# After acquiring 1, should have 4.0 remaining
87+
assert bucket.tokens <= 5.0, "Tokens should not exceed max_tokens"
88+
89+
@pytest.mark.asyncio
90+
async def test_fractional_tokens(self):
91+
"""Test acquiring fractional tokens."""
92+
bucket = AsyncTokenBucket(rate_limit=100.0, max_tokens=100.0)
93+
94+
# Acquire fractional amounts
95+
await bucket.acquire(0.5)
96+
await bucket.acquire(1.5)
97+
await bucket.acquire(2.3)
98+
99+
assert bucket.tokens == pytest.approx(100.0 - 0.5 - 1.5 - 2.3, abs=0.1)
100+
101+
@pytest.mark.asyncio
102+
async def test_concurrent_acquires(self):
103+
"""Test multiple concurrent acquire operations."""
104+
bucket = AsyncTokenBucket(rate_limit=10.0, max_tokens=10.0)
105+
106+
async def acquire_task(num_tokens: float, task_id: int):
107+
await bucket.acquire(num_tokens)
108+
return task_id
109+
110+
# Launch 5 concurrent tasks, each acquiring 3 tokens (15 total)
111+
# Bucket only has 10, so some will need to wait
112+
start = time.time()
113+
tasks = [acquire_task(3.0, i) for i in range(5)]
114+
results = await asyncio.gather(*tasks)
115+
elapsed = time.time() - start
116+
117+
# Should take at least 0.5 seconds to refill 5 tokens
118+
# (15 needed - 10 available) / 10 tokens per second = 0.5 seconds
119+
assert elapsed >= 0.4, f"Expected >=0.4s for concurrent acquires, got {elapsed:.3f}s"
120+
assert len(results) == 5, "All tasks should complete"
121+
122+
@pytest.mark.asyncio
123+
async def test_high_rate_limit(self):
124+
"""Test with high rate limit (simulating high-throughput scenarios)."""
125+
bucket = AsyncTokenBucket(rate_limit=1000.0, max_tokens=1000.0)
126+
127+
# Rapidly acquire tokens
128+
start = time.time()
129+
for _ in range(100):
130+
await bucket.acquire(10.0) # 1000 tokens total
131+
elapsed = time.time() - start
132+
133+
# Should complete in approximately 1 second
134+
assert elapsed < 1.5, f"High rate limit test took too long: {elapsed:.3f}s"
135+
136+
@pytest.mark.asyncio
137+
async def test_zero_initial_state(self):
138+
"""Test that bucket starts with full tokens."""
139+
bucket = AsyncTokenBucket(rate_limit=10.0, max_tokens=10.0)
140+
141+
assert bucket.tokens == 10.0, "Bucket should start full"
142+
assert bucket.last_update is None, "last_update should be None initially"
143+
144+
# After first acquire, last_update should be set
145+
await bucket.acquire(1.0)
146+
assert bucket.last_update is not None, "last_update should be set after acquire"
147+
148+
@pytest.mark.asyncio
149+
async def test_rate_limit_accuracy(self):
150+
"""Test rate limit accuracy over time."""
151+
rate = 50.0 # 50 tokens per second
152+
bucket = AsyncTokenBucket(rate_limit=rate, max_tokens=rate)
153+
154+
# Consume all tokens and measure refill time for 25 tokens
155+
await bucket.acquire(50.0)
156+
157+
start = time.time()
158+
await bucket.acquire(25.0)
159+
elapsed = time.time() - start
160+
161+
expected_time = 25.0 / rate # 0.5 seconds
162+
# Allow 20% margin for timing inaccuracy
163+
assert abs(elapsed - expected_time) < expected_time * 0.2, f"Expected ~{expected_time:.3f}s, got {elapsed:.3f}s"
164+
165+
@pytest.mark.asyncio
166+
async def test_sequential_acquires(self):
167+
"""Test sequential acquire operations."""
168+
bucket = AsyncTokenBucket(rate_limit=20.0, max_tokens=20.0)
169+
170+
# Sequential acquires without waiting
171+
await bucket.acquire(5.0)
172+
await bucket.acquire(5.0)
173+
await bucket.acquire(5.0)
174+
await bucket.acquire(5.0)
175+
176+
# Bucket should be empty
177+
assert bucket.tokens == pytest.approx(0.0, abs=0.1)
178+
179+
# Next acquire should wait
180+
start = time.time()
181+
await bucket.acquire(10.0)
182+
elapsed = time.time() - start
183+
184+
assert elapsed >= 0.4, "Should wait for token refill"
185+
186+
@pytest.mark.asyncio
187+
async def test_default_max_tokens(self):
188+
"""Test that max_tokens defaults to rate_limit."""
189+
bucket = AsyncTokenBucket(rate_limit=15.0)
190+
191+
assert bucket.max_tokens == 15.0, "max_tokens should default to rate_limit"
192+
assert bucket.tokens == 15.0, "Initial tokens should equal max_tokens"
193+
194+
@pytest.mark.asyncio
195+
async def test_single_token_acquire(self):
196+
"""Test default acquire of 1 token."""
197+
bucket = AsyncTokenBucket(rate_limit=10.0, max_tokens=10.0)
198+
199+
await bucket.acquire() # Default num_tokens=1.0
200+
201+
assert bucket.tokens == pytest.approx(9.0, abs=0.1)
202+
203+
@pytest.mark.asyncio
204+
async def test_large_token_acquire(self):
205+
"""Test acquiring more tokens than bucket capacity."""
206+
bucket = AsyncTokenBucket(rate_limit=10.0, max_tokens=10.0)
207+
208+
# Try to acquire 50 tokens (5x capacity)
209+
start = time.time()
210+
await bucket.acquire(50.0)
211+
elapsed = time.time() - start
212+
213+
# Should wait for: (50 - 10) / 10 = 4 seconds
214+
assert 3.5 < elapsed < 5.0, f"Expected ~4s wait for large acquire, got {elapsed:.3f}s"
215+
216+
@pytest.mark.asyncio
217+
async def test_thread_safety_with_lock(self):
218+
"""Test that lock prevents race conditions."""
219+
bucket = AsyncTokenBucket(rate_limit=100.0, max_tokens=100.0)
220+
results = []
221+
222+
async def acquire_and_record():
223+
await bucket.acquire(10.0)
224+
results.append(1)
225+
226+
# Launch many concurrent tasks
227+
tasks = [acquire_and_record() for _ in range(10)]
228+
await asyncio.gather(*tasks)
229+
230+
# All tasks should complete
231+
assert len(results) == 10, "All tasks should complete successfully"
232+
233+
# Bucket should have consumed exactly 100 tokens
234+
assert bucket.tokens == pytest.approx(0.0, abs=0.5)
235+
236+
@pytest.mark.asyncio
237+
async def test_multiple_wait_cycles(self):
238+
"""Test multiple wait cycles in the acquire loop."""
239+
bucket = AsyncTokenBucket(rate_limit=10.0, max_tokens=10.0)
240+
241+
# Consume all tokens
242+
await bucket.acquire(10.0)
243+
244+
# Acquire tokens that require multiple refill cycles
245+
start = time.time()
246+
await bucket.acquire(15.0)
247+
elapsed = time.time() - start
248+
249+
# Should wait for 15 tokens / 10 tokens per second = 1.5 seconds
250+
assert 1.3 < elapsed < 1.8, f"Expected ~1.5s for multiple refill cycles, got {elapsed:.3f}s"
251+
252+
@pytest.mark.asyncio
253+
async def test_rapid_small_acquires(self):
254+
"""Test many rapid small acquisitions."""
255+
bucket = AsyncTokenBucket(rate_limit=100.0, max_tokens=100.0)
256+
257+
start = time.time()
258+
for _ in range(50):
259+
await bucket.acquire(2.0) # 100 tokens total
260+
elapsed = time.time() - start
261+
262+
# Should complete quickly since we're within capacity
263+
assert elapsed < 0.5, f"Rapid small acquires took too long: {elapsed:.3f}s"
264+
265+
266+
if __name__ == "__main__":
267+
pytest.main([__file__, "-v"])

0 commit comments

Comments
 (0)