Skip to content
Merged
Changes from 1 commit
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
12d7434
[trainer] feat: Enhance PrimeRewardManager with configurable num_proc…
JoyboyBrian Nov 10, 2025
608adae
[reward] feat: Add PrimeRewardLoopManager to reward loop module
JoyboyBrian Nov 12, 2025
bd74449
[trainer] fix: Ensure num_processes is set for PrimeRewardManager
JoyboyBrian Nov 12, 2025
744310e
[reward] fix: Remove logger level setting from prime.py
JoyboyBrian Nov 12, 2025
7fffa8f
[reward] fix: Introduce class-level flag for PrimeRewardLoopManager i…
JoyboyBrian Nov 12, 2025
1c94047
[reward] feat: Implement multi-layer rate limiting in PrimeRewardLoop…
JoyboyBrian Nov 12, 2025
7a4013a
revert
JoyboyBrian Nov 12, 2025
de39254
revert
JoyboyBrian Nov 12, 2025
9e20630
typo
JoyboyBrian Nov 12, 2025
f6fd9f6
rename
JoyboyBrian Nov 12, 2025
fa72243
[reward] docs: Enhance AsyncTokenBucket and RateLimitedRewardLoopMana…
JoyboyBrian Nov 12, 2025
2176f44
[reward] fix: Update test cases and rate limiting logic
JoyboyBrian Nov 12, 2025
05be129
[reward] fix: Refactor AsyncTokenBucket logic for handling token requ…
JoyboyBrian Nov 12, 2025
1f9bfca
[reward] refactor: Update registration of RateLimitedRewardLoopManager
JoyboyBrian Nov 12, 2025
355e42b
[reward] refactor: Enhance reward manager instantiation logic
JoyboyBrian Nov 12, 2025
61b5d11
[reward] refactor: Improve event loop management in RewardLoopManager…
JoyboyBrian Nov 12, 2025
392d085
fix: Add __call__ method to RateLimitedRewardLoopManager
JoyboyBrian Nov 13, 2025
96b918e
fix: Resolve CI failures for rate limiting PR
JoyboyBrian Nov 15, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[reward] fix: Refactor AsyncTokenBucket logic for handling token requ…
…ests

Revised the token consumption logic in the AsyncTokenBucket class to better manage requests exceeding max_tokens. Introduced a separate handling mechanism for large requests, ensuring accurate rate limiting and allowing temporary negative balances. This update enhances the efficiency of token refilling and consumption processes, improving overall performance.
  • Loading branch information
JoyboyBrian committed Nov 12, 2025
commit 05be129fe38cd0b0b7a0d040651e474c27945a6e
66 changes: 30 additions & 36 deletions verl/experimental/reward/reward_loop/limited.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,59 +118,53 @@ async def acquire(self, num_tokens: float = 1.0) -> None:
- Tokens are refilled continuously based on elapsed time
- For requests > max_tokens, allows temporary negative balance
"""
while True:
# Handle requests larger than max_tokens separately
if num_tokens > self.max_tokens:
wait_time = 0.0
async with self.lock:
loop = asyncio.get_running_loop()
now = loop.time()

if self.last_update is None:
self.last_update = now

# Refill tokens based on elapsed time
elapsed = now - self.last_update
new_tokens = elapsed * self.rate_limit
self.tokens = min(self.max_tokens, self.tokens + new_tokens)
self.last_update = now

# Check if we have enough tokens
if self.tokens >= num_tokens:
self.tokens -= num_tokens
return

# For requests larger than max_tokens, we need to make progress
# even if we can't accumulate enough tokens at once
if num_tokens > self.max_tokens:
# Wait until bucket is reasonably full, then consume
# This ensures we're rate limiting properly
tokens_needed = num_tokens - self.tokens
tokens_needed = num_tokens - self.tokens
if tokens_needed > 0:
wait_time = tokens_needed / self.rate_limit

# After waiting, we'll have accumulated enough "time credit"
# to justify consuming the tokens (even if it goes negative)
self.lock.release()
try:
await asyncio.sleep(wait_time)
finally:
await self.lock.acquire()

# Update time and consume tokens
now = loop.time()
elapsed = now - self.last_update
new_tokens = elapsed * self.rate_limit
self.tokens = min(self.max_tokens, self.tokens + new_tokens)
self.tokens -= num_tokens
self.last_update = now

if wait_time > 0:
await asyncio.sleep(wait_time)
return

# Standard case: request <= max_tokens
while True:
wait_time = 0.0
async with self.lock:
loop = asyncio.get_running_loop()
now = loop.time()
if self.last_update is None:
self.last_update = now

# Consume tokens (will go negative)
elapsed = now - self.last_update
new_tokens = elapsed * self.rate_limit
self.tokens = min(self.max_tokens, self.tokens + new_tokens)
self.last_update = now

if self.tokens >= num_tokens:
self.tokens -= num_tokens
return
else:
# Normal case: request <= max_tokens
# Calculate wait time and loop again
tokens_needed = num_tokens - self.tokens
wait_time = tokens_needed / self.rate_limit

# Sleep outside the lock to allow other coroutines to proceed
await asyncio.sleep(wait_time)
tokens_needed = num_tokens - self.tokens
wait_time = tokens_needed / self.rate_limit

if wait_time > 0:
await asyncio.sleep(wait_time)


@register("rate_limited")
Expand Down