[algo] feat: Add RateLimitedRewardLoopManager with three-layer rate limiting for API-based rewards#4107
Conversation
…esses Added support for passing the number of concurrent processes to the PrimeRewardManager. The `load_reward_manager` function now retrieves `max_concurrent` from the configuration when initializing the prime reward manager, allowing for more flexible resource management.
Included PrimeRewardLoopManager in the reward loop module's exports. This enhances the reward management capabilities within the system.
Updated the `load_reward_manager` function to check if `num_processes` is already specified in `reward_kwargs`. If not, it retrieves the value from the configuration, ensuring proper initialization of the PrimeRewardManager with concurrent processes.
…nitialization Added a new class-level flag `_prime_class_initialized` to manage the initialization state of the `PrimeRewardLoopManager`. This change ensures that the class can properly initialize its semaphore without conflicts with the base class's initialization logic.
…Manager Added an AsyncTokenBucket class for managing rate limits and integrated it into the PrimeRewardLoopManager. This enhancement introduces request rate limiting (RPM) and token rate limiting (TPM) alongside existing concurrency limits, improving the management of API-based reward functions. Updated logging to reflect the new rate limiting configuration.
…ger documentation Expanded the docstrings for AsyncTokenBucket and RateLimitedRewardLoopManager classes to provide detailed explanations of their functionality, parameters, and usage examples. This update improves clarity on the rate limiting mechanisms and thread safety features, aiding developers in understanding and utilizing these components effectively.
Modified test cases in `test_rate_limited_reward_manager_on_cpu.py` to reflect changes in request handling and rate limiting behavior. Adjusted assertions to ensure proper validation of rate limiting under varying request loads. Enhanced the `AsyncTokenBucket` class to allow temporary negative token balances for requests exceeding maximum tokens, improving rate limiting accuracy. Updated documentation to clarify these changes.
There was a problem hiding this comment.
Code Review
This pull request introduces a well-designed RateLimitedRewardLoopManager with a three-layer rate limiting system, which is a great feature for interacting with external APIs. The implementation is thorough, and the addition of comprehensive unit and integration tests is excellent.
My main feedback is a critical issue in the AsyncTokenBucket implementation. The acquire method uses a risky anti-pattern for lock management that can lead to correctness and maintainability problems. I have provided a detailed comment with a suggested refactoring to make the implementation more robust and easier to understand.
Overall, this is a strong contribution, and once the critical issue is addressed, it will be a solid addition to the codebase.
…ests Revised the token consumption logic in the AsyncTokenBucket class to better manage requests exceeding max_tokens. Introduced a separate handling mechanism for large requests, ensuring accurate rate limiting and allowing temporary negative balances. This update enhances the efficiency of token refilling and consumption processes, improving overall performance.
Refactored the registration process for the RateLimitedRewardLoopManager by renaming the import for clarity and ensuring it is registered with the reward manager. This change enhances the modularity of the reward loop components and maintains compatibility with existing functionality.
Updated the reward manager instantiation process to accommodate different subclasses of RewardLoopManagerBase and AbstractRewardManager. This change introduces conditional handling for parameters based on the manager type, improving flexibility and ensuring compatibility with various reward manager implementations.
…Base Refactored the RewardLoopManagerBase class to lazily initialize the event loop. Introduced a property method to handle the event loop retrieval, ensuring compatibility with scenarios where no event loop is currently running. This change enhances the robustness of asynchronous operations within the reward loop manager.
Make RateLimitedRewardLoopManager callable to fix TypeError when used as val_reward_fn in ray_trainer.py validation. Key improvements: - Add __call__ method wrapping async run_single - Use self.loop.run_until_complete to preserve rate limiting - Process validation batches in parallel with asyncio.gather - Maintain compatibility with traditional reward manager interface Fixes: TypeError: 'RateLimitedRewardLoopManager' object is not callable
1. Fix type checker error in verl/trainer/ppo/reward.py - Use TYPE_CHECKING pattern for optional import - Add type: ignore comment for runtime fallback 2. Fix docstring coverage check failure - Move RateLimitedRewardLoopManager to conditional __all__ - Only export if import succeeds - Add noqa: F401 to suppress false positive These fixes address pre-commit and type-coverage-check failures. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
|
Hi @yyDing1, I've addressed all the CI failures. Here's a summary of the status: Previous CI Results:
Fixes Applied:
Note on
This PR does not modify any model-related code that All core checks ( Thank you! 🙏 |
|
LGTM @wuxibin89 |
…imiting for API-based rewards (verl-project#4107) ### What does this PR do? This PR implements a **three-layer rate limiting system** for API-based reward functions in the reward loop manager, specifically designed for LLM-as-judge scenarios. The new `RateLimitedRewardLoopManager` provides: 1. **Concurrency limiting** (`max_concurrent`) - Controls maximum parallel requests 2. **Request rate limiting** (`max_rpm`) - Limits requests per minute (RPM) 3. **Token rate limiting** (`max_tpm`) - Limits tokens per minute (TPM) This is essential for integrating with external API-based reward models (e.g., OpenAI, Anthropic) that have rate limits, preventing request failures and ensuring smooth training workflows. The implementation uses a custom `AsyncTokenBucket` algorithm for smooth rate limiting and includes timeout handling and error recovery mechanisms. **Related context:** This addresses the need for controlled API usage during RL training when using external LLM-as-judge reward models. ### Checklist Before Starting - [x] Search for similar PRs: [GitHub search for "rate limit"](https://github.com/volcengine/verl/search?q=rate+limit&type=pullrequests) - [x] Format the PR title as `[reward] feat: Add RateLimitedRewardLoopManager with three-layer rate limiting` ### Test **Testing approach:** - Unit tests should cover the `AsyncTokenBucket` rate limiting logic - Integration tests should validate rate limiting with mock API reward functions - Manual validation: Configure different rate limits and verify request patterns comply with limits ### API and Usage Example **Configuration:** ```yaml # In config file or via command line overrides reward_model: manager: rate_limited # Use the new rate-limited manager max_concurrent: 10 # Max parallel requests max_rpm: 100 # Max 100 requests per minute max_tpm: 20000 # Max 20,000 tokens per minute estimated_tokens_per_request: 2000 # Token estimate for TPM limiting timeout: 300.0 # Timeout in seconds ``` ### Design & Code Changes **High-level design:** 1. **AsyncTokenBucket class** ([limited.py:30-63](verl/experimental/reward/reward_loop/limited.py#L30-L63)) - Implements token bucket algorithm for smooth rate limiting - Supports variable token consumption (useful for TPM limiting) - Thread-safe with asyncio locks - Auto-refills tokens based on configured rate 2. **RateLimitedRewardLoopManager class** ([limited.py:66-235](verl/experimental/reward/reward_loop/limited.py#L66-L235)) - **Class-level state**: Rate limiters are shared globally across all worker instances to ensure limits apply system-wide - **Three-layer limiting**: - Layer 1 (Concurrency): `asyncio.Semaphore` limits parallel requests - Layer 2 (RPM): `AsyncTokenBucket` limits requests per minute - Layer 3 (TPM): `AsyncTokenBucket` limits tokens per minute - **Initialization guard**: `_class_initialized` flag prevents duplicate initialization - **Error handling**: Timeout and exception handling with fallback rewards - **Logging**: Detailed configuration logging on initialization **Specific changes:** - Added `verl/experimental/reward/reward_loop/limited.py` (235 lines) - Updated `verl/experimental/reward/reward_loop/__init__.py` to export `RateLimitedRewardLoopManager` - Added `CLAUDE.md` to `.gitignore` **Key implementation details:** - Rate limiters acquire in order: RPM → TPM → Concurrency, ensuring smooth throttling - Timeout handling returns reward=0.0 with metadata for debugging - Supports both sync and async reward functions via `inspect.iscoroutinefunction` - Estimated tokens per request used for TPM limiting (configurable) ### Documentation **Added comprehensive docstrings** to [verl/experimental/reward/reward_loop/limited.py](verl/experimental/reward/reward_loop/limited.py): 1. **AsyncTokenBucket class** (lines 30-137) - Detailed algorithm explanation (token bucket rate limiting) - Complete Args and Attributes documentation with types - Thread safety guarantees - Usage examples for RPM and TPM limiting scenarios - Step-by-step algorithm details - Implementation notes about asyncio event loop usage 2. **RateLimitedRewardLoopManager class** (lines 140-219) - Comprehensive overview of three-layer rate limiting architecture - Detailed Rate Limiting Flow explanation - Full configuration parameters with defaults and descriptions - Global class-level state management documentation - Example configuration with concrete values - Thread safety notes for distributed training - Cross-references to related classes and functions **Documentation coverage:** - All public methods have detailed docstrings with Args, Returns, Raises, and Examples - Class-level documentation explains design patterns and use cases - Code examples demonstrate common usage patterns - Algorithm details help developers understand implementation ### Test Coverage **Created comprehensive test suite** with 35+ test cases covering both unit and integration scenarios: #### 1. **Unit Tests for AsyncTokenBucket** ([tests/experimental/reward/test_async_token_bucket_on_cpu.py](tests/experimental/reward/test_async_token_bucket_on_cpu.py)) 19 test cases covering: - ✅ Basic token acquisition and refill mechanism - ✅ Waiting behavior when tokens are insufficient - ✅ Max tokens capacity cap enforcement - ✅ Fractional token consumption - ✅ Concurrent acquires with race condition handling - ✅ High rate limit scenarios (1000 tokens/sec) - ✅ Rate limit accuracy verification (within 20% margin) - ✅ Sequential vs concurrent acquisition patterns - ✅ Large token acquisitions (5x capacity) - ✅ Multiple wait cycles in refill loop - ✅ Thread safety with locks under high concurrency - ✅ Default parameters (max_tokens = rate_limit) - ✅ Zero initial state and first-use behavior - ✅ Rapid small acquisitions (50x 2-token requests) #### 2. **Integration Tests with Mock API Functions** ([tests/experimental/reward/test_rate_limited_reward_manager_on_cpu.py](tests/experimental/reward/test_rate_limited_reward_manager_on_cpu.py)) 16 test cases covering: - ✅ Basic reward computation (sync and async) - ✅ RPM rate limiting validation (60 RPM = 1 req/sec) - ✅ TPM rate limiting validation (6000 TPM with 2000 tokens/req) - ✅ Concurrency limiting (max 2 concurrent requests) - ✅ Timeout handling for slow APIs (500ms timeout) - ✅ Error handling for failing APIs with exception catching - ✅ Dict vs float return format handling - ✅ Combined multi-layer rate limits (all 3 layers active) - ✅ Correct vs incorrect answer scoring - ✅ High throughput scenarios (50+ concurrent requests) - ✅ Class initialization idempotency - ✅ Extra info propagation through reward pipeline - ✅ Synchronous reward function compatibility - ✅ Global rate limiter sharing across instances **Mock API functions implemented:** - `mock_sync_reward_function` - Synchronous API simulation - `mock_async_reward_function` - Asynchronous API with call tracking - `mock_slow_api_function` - Timeout testing (2s delay) - `mock_failing_api_function` - Error handling testing - `mock_dict_result_function` - Complex result format testing - `MockAPICounter` - Global call tracking and rate measurement **CI Integration:** - Both test files follow `*_on_cpu.py` naming convention - Automatically discovered by [.github/workflows/cpu_unit_tests.yml](.github/workflows/cpu_unit_tests.yml) - No workflow changes needed - tests run automatically on every PR - Tests validated for syntax correctness and async compatibility **Test execution:** ```bash # Run AsyncTokenBucket unit tests pytest tests/experimental/reward/test_async_token_bucket_on_cpu.py -v --asyncio-mode=auto # Run integration tests with mock APIs pytest tests/experimental/reward/test_rate_limited_reward_manager_on_cpu.py -v --asyncio-mode=auto # Run all reward loop tests pytest tests/experimental/reward/ -v --asyncio-mode=auto ``` ### Checklist Before Submitting - [x] Read the [Contribute Guide](https://github.com/volcengine/verl/blob/main/CONTRIBUTING.md). - [x] Apply [pre-commit checks](https://github.com/volcengine/verl/blob/main/CONTRIBUTING.md#code-linting-and-formatting): `pre-commit install && pre-commit run --all-files --show-diff-on-failure --color=always` - [x] Add / Update [the documentation](https://github.com/volcengine/verl/tree/main/docs). ✅ **Added comprehensive docstrings to AsyncTokenBucket and RateLimitedRewardLoopManager classes** - [x] Add unit or end-to-end test(s) to [the CI workflow](https://github.com/volcengine/verl/tree/main/.github/workflows) to cover all the code. ✅ **Added 35+ async unit tests and integration tests with mock API functions, automatically run in CPU CI workflow** - [x] Once your PR is ready for CI, send a message in [the `ci-request` channel](https://verl-project.slack.com/archives/C091TCESWB1) in [the `verl` Slack workspace](https://join.slack.com/t/verl-project/shared_invite/zt-3855yhg8g-CTkqXu~hKojPCmo7k_yXTQ). --- **Notes for reviewers:** - The class-level state pattern ensures rate limits are enforced globally across all workers in distributed training - The token bucket implementation provides smooth rate limiting without bursts - Timeout and error handling ensure training continues even if reward computation fails --------- Co-authored-by: Claude <noreply@anthropic.com>
…imiting for API-based rewards (verl-project#4107) ### What does this PR do? This PR implements a **three-layer rate limiting system** for API-based reward functions in the reward loop manager, specifically designed for LLM-as-judge scenarios. The new `RateLimitedRewardLoopManager` provides: 1. **Concurrency limiting** (`max_concurrent`) - Controls maximum parallel requests 2. **Request rate limiting** (`max_rpm`) - Limits requests per minute (RPM) 3. **Token rate limiting** (`max_tpm`) - Limits tokens per minute (TPM) This is essential for integrating with external API-based reward models (e.g., OpenAI, Anthropic) that have rate limits, preventing request failures and ensuring smooth training workflows. The implementation uses a custom `AsyncTokenBucket` algorithm for smooth rate limiting and includes timeout handling and error recovery mechanisms. **Related context:** This addresses the need for controlled API usage during RL training when using external LLM-as-judge reward models. ### Checklist Before Starting - [x] Search for similar PRs: [GitHub search for "rate limit"](https://github.com/volcengine/verl/search?q=rate+limit&type=pullrequests) - [x] Format the PR title as `[reward] feat: Add RateLimitedRewardLoopManager with three-layer rate limiting` ### Test **Testing approach:** - Unit tests should cover the `AsyncTokenBucket` rate limiting logic - Integration tests should validate rate limiting with mock API reward functions - Manual validation: Configure different rate limits and verify request patterns comply with limits ### API and Usage Example **Configuration:** ```yaml # In config file or via command line overrides reward_model: manager: rate_limited # Use the new rate-limited manager max_concurrent: 10 # Max parallel requests max_rpm: 100 # Max 100 requests per minute max_tpm: 20000 # Max 20,000 tokens per minute estimated_tokens_per_request: 2000 # Token estimate for TPM limiting timeout: 300.0 # Timeout in seconds ``` ### Design & Code Changes **High-level design:** 1. **AsyncTokenBucket class** ([limited.py:30-63](verl/experimental/reward/reward_loop/limited.py#L30-L63)) - Implements token bucket algorithm for smooth rate limiting - Supports variable token consumption (useful for TPM limiting) - Thread-safe with asyncio locks - Auto-refills tokens based on configured rate 2. **RateLimitedRewardLoopManager class** ([limited.py:66-235](verl/experimental/reward/reward_loop/limited.py#L66-L235)) - **Class-level state**: Rate limiters are shared globally across all worker instances to ensure limits apply system-wide - **Three-layer limiting**: - Layer 1 (Concurrency): `asyncio.Semaphore` limits parallel requests - Layer 2 (RPM): `AsyncTokenBucket` limits requests per minute - Layer 3 (TPM): `AsyncTokenBucket` limits tokens per minute - **Initialization guard**: `_class_initialized` flag prevents duplicate initialization - **Error handling**: Timeout and exception handling with fallback rewards - **Logging**: Detailed configuration logging on initialization **Specific changes:** - Added `verl/experimental/reward/reward_loop/limited.py` (235 lines) - Updated `verl/experimental/reward/reward_loop/__init__.py` to export `RateLimitedRewardLoopManager` - Added `CLAUDE.md` to `.gitignore` **Key implementation details:** - Rate limiters acquire in order: RPM → TPM → Concurrency, ensuring smooth throttling - Timeout handling returns reward=0.0 with metadata for debugging - Supports both sync and async reward functions via `inspect.iscoroutinefunction` - Estimated tokens per request used for TPM limiting (configurable) ### Documentation **Added comprehensive docstrings** to [verl/experimental/reward/reward_loop/limited.py](verl/experimental/reward/reward_loop/limited.py): 1. **AsyncTokenBucket class** (lines 30-137) - Detailed algorithm explanation (token bucket rate limiting) - Complete Args and Attributes documentation with types - Thread safety guarantees - Usage examples for RPM and TPM limiting scenarios - Step-by-step algorithm details - Implementation notes about asyncio event loop usage 2. **RateLimitedRewardLoopManager class** (lines 140-219) - Comprehensive overview of three-layer rate limiting architecture - Detailed Rate Limiting Flow explanation - Full configuration parameters with defaults and descriptions - Global class-level state management documentation - Example configuration with concrete values - Thread safety notes for distributed training - Cross-references to related classes and functions **Documentation coverage:** - All public methods have detailed docstrings with Args, Returns, Raises, and Examples - Class-level documentation explains design patterns and use cases - Code examples demonstrate common usage patterns - Algorithm details help developers understand implementation ### Test Coverage **Created comprehensive test suite** with 35+ test cases covering both unit and integration scenarios: #### 1. **Unit Tests for AsyncTokenBucket** ([tests/experimental/reward/test_async_token_bucket_on_cpu.py](tests/experimental/reward/test_async_token_bucket_on_cpu.py)) 19 test cases covering: - ✅ Basic token acquisition and refill mechanism - ✅ Waiting behavior when tokens are insufficient - ✅ Max tokens capacity cap enforcement - ✅ Fractional token consumption - ✅ Concurrent acquires with race condition handling - ✅ High rate limit scenarios (1000 tokens/sec) - ✅ Rate limit accuracy verification (within 20% margin) - ✅ Sequential vs concurrent acquisition patterns - ✅ Large token acquisitions (5x capacity) - ✅ Multiple wait cycles in refill loop - ✅ Thread safety with locks under high concurrency - ✅ Default parameters (max_tokens = rate_limit) - ✅ Zero initial state and first-use behavior - ✅ Rapid small acquisitions (50x 2-token requests) #### 2. **Integration Tests with Mock API Functions** ([tests/experimental/reward/test_rate_limited_reward_manager_on_cpu.py](tests/experimental/reward/test_rate_limited_reward_manager_on_cpu.py)) 16 test cases covering: - ✅ Basic reward computation (sync and async) - ✅ RPM rate limiting validation (60 RPM = 1 req/sec) - ✅ TPM rate limiting validation (6000 TPM with 2000 tokens/req) - ✅ Concurrency limiting (max 2 concurrent requests) - ✅ Timeout handling for slow APIs (500ms timeout) - ✅ Error handling for failing APIs with exception catching - ✅ Dict vs float return format handling - ✅ Combined multi-layer rate limits (all 3 layers active) - ✅ Correct vs incorrect answer scoring - ✅ High throughput scenarios (50+ concurrent requests) - ✅ Class initialization idempotency - ✅ Extra info propagation through reward pipeline - ✅ Synchronous reward function compatibility - ✅ Global rate limiter sharing across instances **Mock API functions implemented:** - `mock_sync_reward_function` - Synchronous API simulation - `mock_async_reward_function` - Asynchronous API with call tracking - `mock_slow_api_function` - Timeout testing (2s delay) - `mock_failing_api_function` - Error handling testing - `mock_dict_result_function` - Complex result format testing - `MockAPICounter` - Global call tracking and rate measurement **CI Integration:** - Both test files follow `*_on_cpu.py` naming convention - Automatically discovered by [.github/workflows/cpu_unit_tests.yml](.github/workflows/cpu_unit_tests.yml) - No workflow changes needed - tests run automatically on every PR - Tests validated for syntax correctness and async compatibility **Test execution:** ```bash # Run AsyncTokenBucket unit tests pytest tests/experimental/reward/test_async_token_bucket_on_cpu.py -v --asyncio-mode=auto # Run integration tests with mock APIs pytest tests/experimental/reward/test_rate_limited_reward_manager_on_cpu.py -v --asyncio-mode=auto # Run all reward loop tests pytest tests/experimental/reward/ -v --asyncio-mode=auto ``` ### Checklist Before Submitting - [x] Read the [Contribute Guide](https://github.com/volcengine/verl/blob/main/CONTRIBUTING.md). - [x] Apply [pre-commit checks](https://github.com/volcengine/verl/blob/main/CONTRIBUTING.md#code-linting-and-formatting): `pre-commit install && pre-commit run --all-files --show-diff-on-failure --color=always` - [x] Add / Update [the documentation](https://github.com/volcengine/verl/tree/main/docs). ✅ **Added comprehensive docstrings to AsyncTokenBucket and RateLimitedRewardLoopManager classes** - [x] Add unit or end-to-end test(s) to [the CI workflow](https://github.com/volcengine/verl/tree/main/.github/workflows) to cover all the code. ✅ **Added 35+ async unit tests and integration tests with mock API functions, automatically run in CPU CI workflow** - [x] Once your PR is ready for CI, send a message in [the `ci-request` channel](https://verl-project.slack.com/archives/C091TCESWB1) in [the `verl` Slack workspace](https://join.slack.com/t/verl-project/shared_invite/zt-3855yhg8g-CTkqXu~hKojPCmo7k_yXTQ). --- **Notes for reviewers:** - The class-level state pattern ensures rate limits are enforced globally across all workers in distributed training - The token bucket implementation provides smooth rate limiting without bursts - Timeout and error handling ensure training continues even if reward computation fails --------- Co-authored-by: Claude <noreply@anthropic.com>
…imiting for API-based rewards (verl-project#4107) ### What does this PR do? This PR implements a **three-layer rate limiting system** for API-based reward functions in the reward loop manager, specifically designed for LLM-as-judge scenarios. The new `RateLimitedRewardLoopManager` provides: 1. **Concurrency limiting** (`max_concurrent`) - Controls maximum parallel requests 2. **Request rate limiting** (`max_rpm`) - Limits requests per minute (RPM) 3. **Token rate limiting** (`max_tpm`) - Limits tokens per minute (TPM) This is essential for integrating with external API-based reward models (e.g., OpenAI, Anthropic) that have rate limits, preventing request failures and ensuring smooth training workflows. The implementation uses a custom `AsyncTokenBucket` algorithm for smooth rate limiting and includes timeout handling and error recovery mechanisms. **Related context:** This addresses the need for controlled API usage during RL training when using external LLM-as-judge reward models. ### Checklist Before Starting - [x] Search for similar PRs: [GitHub search for "rate limit"](https://github.com/volcengine/verl/search?q=rate+limit&type=pullrequests) - [x] Format the PR title as `[reward] feat: Add RateLimitedRewardLoopManager with three-layer rate limiting` ### Test **Testing approach:** - Unit tests should cover the `AsyncTokenBucket` rate limiting logic - Integration tests should validate rate limiting with mock API reward functions - Manual validation: Configure different rate limits and verify request patterns comply with limits ### API and Usage Example **Configuration:** ```yaml # In config file or via command line overrides reward_model: manager: rate_limited # Use the new rate-limited manager max_concurrent: 10 # Max parallel requests max_rpm: 100 # Max 100 requests per minute max_tpm: 20000 # Max 20,000 tokens per minute estimated_tokens_per_request: 2000 # Token estimate for TPM limiting timeout: 300.0 # Timeout in seconds ``` ### Design & Code Changes **High-level design:** 1. **AsyncTokenBucket class** ([limited.py:30-63](verl/experimental/reward/reward_loop/limited.py#L30-L63)) - Implements token bucket algorithm for smooth rate limiting - Supports variable token consumption (useful for TPM limiting) - Thread-safe with asyncio locks - Auto-refills tokens based on configured rate 2. **RateLimitedRewardLoopManager class** ([limited.py:66-235](verl/experimental/reward/reward_loop/limited.py#L66-L235)) - **Class-level state**: Rate limiters are shared globally across all worker instances to ensure limits apply system-wide - **Three-layer limiting**: - Layer 1 (Concurrency): `asyncio.Semaphore` limits parallel requests - Layer 2 (RPM): `AsyncTokenBucket` limits requests per minute - Layer 3 (TPM): `AsyncTokenBucket` limits tokens per minute - **Initialization guard**: `_class_initialized` flag prevents duplicate initialization - **Error handling**: Timeout and exception handling with fallback rewards - **Logging**: Detailed configuration logging on initialization **Specific changes:** - Added `verl/experimental/reward/reward_loop/limited.py` (235 lines) - Updated `verl/experimental/reward/reward_loop/__init__.py` to export `RateLimitedRewardLoopManager` - Added `CLAUDE.md` to `.gitignore` **Key implementation details:** - Rate limiters acquire in order: RPM → TPM → Concurrency, ensuring smooth throttling - Timeout handling returns reward=0.0 with metadata for debugging - Supports both sync and async reward functions via `inspect.iscoroutinefunction` - Estimated tokens per request used for TPM limiting (configurable) ### Documentation **Added comprehensive docstrings** to [verl/experimental/reward/reward_loop/limited.py](verl/experimental/reward/reward_loop/limited.py): 1. **AsyncTokenBucket class** (lines 30-137) - Detailed algorithm explanation (token bucket rate limiting) - Complete Args and Attributes documentation with types - Thread safety guarantees - Usage examples for RPM and TPM limiting scenarios - Step-by-step algorithm details - Implementation notes about asyncio event loop usage 2. **RateLimitedRewardLoopManager class** (lines 140-219) - Comprehensive overview of three-layer rate limiting architecture - Detailed Rate Limiting Flow explanation - Full configuration parameters with defaults and descriptions - Global class-level state management documentation - Example configuration with concrete values - Thread safety notes for distributed training - Cross-references to related classes and functions **Documentation coverage:** - All public methods have detailed docstrings with Args, Returns, Raises, and Examples - Class-level documentation explains design patterns and use cases - Code examples demonstrate common usage patterns - Algorithm details help developers understand implementation ### Test Coverage **Created comprehensive test suite** with 35+ test cases covering both unit and integration scenarios: #### 1. **Unit Tests for AsyncTokenBucket** ([tests/experimental/reward/test_async_token_bucket_on_cpu.py](tests/experimental/reward/test_async_token_bucket_on_cpu.py)) 19 test cases covering: - ✅ Basic token acquisition and refill mechanism - ✅ Waiting behavior when tokens are insufficient - ✅ Max tokens capacity cap enforcement - ✅ Fractional token consumption - ✅ Concurrent acquires with race condition handling - ✅ High rate limit scenarios (1000 tokens/sec) - ✅ Rate limit accuracy verification (within 20% margin) - ✅ Sequential vs concurrent acquisition patterns - ✅ Large token acquisitions (5x capacity) - ✅ Multiple wait cycles in refill loop - ✅ Thread safety with locks under high concurrency - ✅ Default parameters (max_tokens = rate_limit) - ✅ Zero initial state and first-use behavior - ✅ Rapid small acquisitions (50x 2-token requests) #### 2. **Integration Tests with Mock API Functions** ([tests/experimental/reward/test_rate_limited_reward_manager_on_cpu.py](tests/experimental/reward/test_rate_limited_reward_manager_on_cpu.py)) 16 test cases covering: - ✅ Basic reward computation (sync and async) - ✅ RPM rate limiting validation (60 RPM = 1 req/sec) - ✅ TPM rate limiting validation (6000 TPM with 2000 tokens/req) - ✅ Concurrency limiting (max 2 concurrent requests) - ✅ Timeout handling for slow APIs (500ms timeout) - ✅ Error handling for failing APIs with exception catching - ✅ Dict vs float return format handling - ✅ Combined multi-layer rate limits (all 3 layers active) - ✅ Correct vs incorrect answer scoring - ✅ High throughput scenarios (50+ concurrent requests) - ✅ Class initialization idempotency - ✅ Extra info propagation through reward pipeline - ✅ Synchronous reward function compatibility - ✅ Global rate limiter sharing across instances **Mock API functions implemented:** - `mock_sync_reward_function` - Synchronous API simulation - `mock_async_reward_function` - Asynchronous API with call tracking - `mock_slow_api_function` - Timeout testing (2s delay) - `mock_failing_api_function` - Error handling testing - `mock_dict_result_function` - Complex result format testing - `MockAPICounter` - Global call tracking and rate measurement **CI Integration:** - Both test files follow `*_on_cpu.py` naming convention - Automatically discovered by [.github/workflows/cpu_unit_tests.yml](.github/workflows/cpu_unit_tests.yml) - No workflow changes needed - tests run automatically on every PR - Tests validated for syntax correctness and async compatibility **Test execution:** ```bash # Run AsyncTokenBucket unit tests pytest tests/experimental/reward/test_async_token_bucket_on_cpu.py -v --asyncio-mode=auto # Run integration tests with mock APIs pytest tests/experimental/reward/test_rate_limited_reward_manager_on_cpu.py -v --asyncio-mode=auto # Run all reward loop tests pytest tests/experimental/reward/ -v --asyncio-mode=auto ``` ### Checklist Before Submitting - [x] Read the [Contribute Guide](https://github.com/volcengine/verl/blob/main/CONTRIBUTING.md). - [x] Apply [pre-commit checks](https://github.com/volcengine/verl/blob/main/CONTRIBUTING.md#code-linting-and-formatting): `pre-commit install && pre-commit run --all-files --show-diff-on-failure --color=always` - [x] Add / Update [the documentation](https://github.com/volcengine/verl/tree/main/docs). ✅ **Added comprehensive docstrings to AsyncTokenBucket and RateLimitedRewardLoopManager classes** - [x] Add unit or end-to-end test(s) to [the CI workflow](https://github.com/volcengine/verl/tree/main/.github/workflows) to cover all the code. ✅ **Added 35+ async unit tests and integration tests with mock API functions, automatically run in CPU CI workflow** - [x] Once your PR is ready for CI, send a message in [the `ci-request` channel](https://verl-project.slack.com/archives/C091TCESWB1) in [the `verl` Slack workspace](https://join.slack.com/t/verl-project/shared_invite/zt-3855yhg8g-CTkqXu~hKojPCmo7k_yXTQ). --- **Notes for reviewers:** - The class-level state pattern ensures rate limits are enforced globally across all workers in distributed training - The token bucket implementation provides smooth rate limiting without bursts - Timeout and error handling ensure training continues even if reward computation fails --------- Co-authored-by: Claude <noreply@anthropic.com>
…imiting for API-based rewards (verl-project#4107) ### What does this PR do? This PR implements a **three-layer rate limiting system** for API-based reward functions in the reward loop manager, specifically designed for LLM-as-judge scenarios. The new `RateLimitedRewardLoopManager` provides: 1. **Concurrency limiting** (`max_concurrent`) - Controls maximum parallel requests 2. **Request rate limiting** (`max_rpm`) - Limits requests per minute (RPM) 3. **Token rate limiting** (`max_tpm`) - Limits tokens per minute (TPM) This is essential for integrating with external API-based reward models (e.g., OpenAI, Anthropic) that have rate limits, preventing request failures and ensuring smooth training workflows. The implementation uses a custom `AsyncTokenBucket` algorithm for smooth rate limiting and includes timeout handling and error recovery mechanisms. **Related context:** This addresses the need for controlled API usage during RL training when using external LLM-as-judge reward models. ### Checklist Before Starting - [x] Search for similar PRs: [GitHub search for "rate limit"](https://github.com/volcengine/verl/search?q=rate+limit&type=pullrequests) - [x] Format the PR title as `[reward] feat: Add RateLimitedRewardLoopManager with three-layer rate limiting` ### Test **Testing approach:** - Unit tests should cover the `AsyncTokenBucket` rate limiting logic - Integration tests should validate rate limiting with mock API reward functions - Manual validation: Configure different rate limits and verify request patterns comply with limits ### API and Usage Example **Configuration:** ```yaml # In config file or via command line overrides reward_model: manager: rate_limited # Use the new rate-limited manager max_concurrent: 10 # Max parallel requests max_rpm: 100 # Max 100 requests per minute max_tpm: 20000 # Max 20,000 tokens per minute estimated_tokens_per_request: 2000 # Token estimate for TPM limiting timeout: 300.0 # Timeout in seconds ``` ### Design & Code Changes **High-level design:** 1. **AsyncTokenBucket class** ([limited.py:30-63](verl/experimental/reward/reward_loop/limited.py#L30-L63)) - Implements token bucket algorithm for smooth rate limiting - Supports variable token consumption (useful for TPM limiting) - Thread-safe with asyncio locks - Auto-refills tokens based on configured rate 2. **RateLimitedRewardLoopManager class** ([limited.py:66-235](verl/experimental/reward/reward_loop/limited.py#L66-L235)) - **Class-level state**: Rate limiters are shared globally across all worker instances to ensure limits apply system-wide - **Three-layer limiting**: - Layer 1 (Concurrency): `asyncio.Semaphore` limits parallel requests - Layer 2 (RPM): `AsyncTokenBucket` limits requests per minute - Layer 3 (TPM): `AsyncTokenBucket` limits tokens per minute - **Initialization guard**: `_class_initialized` flag prevents duplicate initialization - **Error handling**: Timeout and exception handling with fallback rewards - **Logging**: Detailed configuration logging on initialization **Specific changes:** - Added `verl/experimental/reward/reward_loop/limited.py` (235 lines) - Updated `verl/experimental/reward/reward_loop/__init__.py` to export `RateLimitedRewardLoopManager` - Added `CLAUDE.md` to `.gitignore` **Key implementation details:** - Rate limiters acquire in order: RPM → TPM → Concurrency, ensuring smooth throttling - Timeout handling returns reward=0.0 with metadata for debugging - Supports both sync and async reward functions via `inspect.iscoroutinefunction` - Estimated tokens per request used for TPM limiting (configurable) ### Documentation **Added comprehensive docstrings** to [verl/experimental/reward/reward_loop/limited.py](verl/experimental/reward/reward_loop/limited.py): 1. **AsyncTokenBucket class** (lines 30-137) - Detailed algorithm explanation (token bucket rate limiting) - Complete Args and Attributes documentation with types - Thread safety guarantees - Usage examples for RPM and TPM limiting scenarios - Step-by-step algorithm details - Implementation notes about asyncio event loop usage 2. **RateLimitedRewardLoopManager class** (lines 140-219) - Comprehensive overview of three-layer rate limiting architecture - Detailed Rate Limiting Flow explanation - Full configuration parameters with defaults and descriptions - Global class-level state management documentation - Example configuration with concrete values - Thread safety notes for distributed training - Cross-references to related classes and functions **Documentation coverage:** - All public methods have detailed docstrings with Args, Returns, Raises, and Examples - Class-level documentation explains design patterns and use cases - Code examples demonstrate common usage patterns - Algorithm details help developers understand implementation ### Test Coverage **Created comprehensive test suite** with 35+ test cases covering both unit and integration scenarios: #### 1. **Unit Tests for AsyncTokenBucket** ([tests/experimental/reward/test_async_token_bucket_on_cpu.py](tests/experimental/reward/test_async_token_bucket_on_cpu.py)) 19 test cases covering: - ✅ Basic token acquisition and refill mechanism - ✅ Waiting behavior when tokens are insufficient - ✅ Max tokens capacity cap enforcement - ✅ Fractional token consumption - ✅ Concurrent acquires with race condition handling - ✅ High rate limit scenarios (1000 tokens/sec) - ✅ Rate limit accuracy verification (within 20% margin) - ✅ Sequential vs concurrent acquisition patterns - ✅ Large token acquisitions (5x capacity) - ✅ Multiple wait cycles in refill loop - ✅ Thread safety with locks under high concurrency - ✅ Default parameters (max_tokens = rate_limit) - ✅ Zero initial state and first-use behavior - ✅ Rapid small acquisitions (50x 2-token requests) #### 2. **Integration Tests with Mock API Functions** ([tests/experimental/reward/test_rate_limited_reward_manager_on_cpu.py](tests/experimental/reward/test_rate_limited_reward_manager_on_cpu.py)) 16 test cases covering: - ✅ Basic reward computation (sync and async) - ✅ RPM rate limiting validation (60 RPM = 1 req/sec) - ✅ TPM rate limiting validation (6000 TPM with 2000 tokens/req) - ✅ Concurrency limiting (max 2 concurrent requests) - ✅ Timeout handling for slow APIs (500ms timeout) - ✅ Error handling for failing APIs with exception catching - ✅ Dict vs float return format handling - ✅ Combined multi-layer rate limits (all 3 layers active) - ✅ Correct vs incorrect answer scoring - ✅ High throughput scenarios (50+ concurrent requests) - ✅ Class initialization idempotency - ✅ Extra info propagation through reward pipeline - ✅ Synchronous reward function compatibility - ✅ Global rate limiter sharing across instances **Mock API functions implemented:** - `mock_sync_reward_function` - Synchronous API simulation - `mock_async_reward_function` - Asynchronous API with call tracking - `mock_slow_api_function` - Timeout testing (2s delay) - `mock_failing_api_function` - Error handling testing - `mock_dict_result_function` - Complex result format testing - `MockAPICounter` - Global call tracking and rate measurement **CI Integration:** - Both test files follow `*_on_cpu.py` naming convention - Automatically discovered by [.github/workflows/cpu_unit_tests.yml](.github/workflows/cpu_unit_tests.yml) - No workflow changes needed - tests run automatically on every PR - Tests validated for syntax correctness and async compatibility **Test execution:** ```bash # Run AsyncTokenBucket unit tests pytest tests/experimental/reward/test_async_token_bucket_on_cpu.py -v --asyncio-mode=auto # Run integration tests with mock APIs pytest tests/experimental/reward/test_rate_limited_reward_manager_on_cpu.py -v --asyncio-mode=auto # Run all reward loop tests pytest tests/experimental/reward/ -v --asyncio-mode=auto ``` ### Checklist Before Submitting - [x] Read the [Contribute Guide](https://github.com/volcengine/verl/blob/main/CONTRIBUTING.md). - [x] Apply [pre-commit checks](https://github.com/volcengine/verl/blob/main/CONTRIBUTING.md#code-linting-and-formatting): `pre-commit install && pre-commit run --all-files --show-diff-on-failure --color=always` - [x] Add / Update [the documentation](https://github.com/volcengine/verl/tree/main/docs). ✅ **Added comprehensive docstrings to AsyncTokenBucket and RateLimitedRewardLoopManager classes** - [x] Add unit or end-to-end test(s) to [the CI workflow](https://github.com/volcengine/verl/tree/main/.github/workflows) to cover all the code. ✅ **Added 35+ async unit tests and integration tests with mock API functions, automatically run in CPU CI workflow** - [x] Once your PR is ready for CI, send a message in [the `ci-request` channel](https://verl-project.slack.com/archives/C091TCESWB1) in [the `verl` Slack workspace](https://join.slack.com/t/verl-project/shared_invite/zt-3855yhg8g-CTkqXu~hKojPCmo7k_yXTQ). --- **Notes for reviewers:** - The class-level state pattern ensures rate limits are enforced globally across all workers in distributed training - The token bucket implementation provides smooth rate limiting without bursts - Timeout and error handling ensure training continues even if reward computation fails --------- Co-authored-by: Claude <noreply@anthropic.com>
What does this PR do?
This PR implements a three-layer rate limiting system for API-based reward functions in the reward loop manager, specifically designed for LLM-as-judge scenarios. The new
RateLimitedRewardLoopManagerprovides:max_concurrent) - Controls maximum parallel requestsmax_rpm) - Limits requests per minute (RPM)max_tpm) - Limits tokens per minute (TPM)This is essential for integrating with external API-based reward models (e.g., OpenAI, Anthropic) that have rate limits, preventing request failures and ensuring smooth training workflows.
The implementation uses a custom
AsyncTokenBucketalgorithm for smooth rate limiting and includes timeout handling and error recovery mechanisms.Related context: This addresses the need for controlled API usage during RL training when using external LLM-as-judge reward models.
Checklist Before Starting
[reward] feat: Add RateLimitedRewardLoopManager with three-layer rate limitingTest
Testing approach:
AsyncTokenBucketrate limiting logicAPI and Usage Example
Configuration:
Design & Code Changes
High-level design:
AsyncTokenBucket class (limited.py:30-63)
RateLimitedRewardLoopManager class (limited.py:66-235)
asyncio.Semaphorelimits parallel requestsAsyncTokenBucketlimits requests per minuteAsyncTokenBucketlimits tokens per minute_class_initializedflag prevents duplicate initializationSpecific changes:
verl/experimental/reward/reward_loop/limited.py(235 lines)verl/experimental/reward/reward_loop/__init__.pyto exportRateLimitedRewardLoopManagerCLAUDE.mdto.gitignoreKey implementation details:
inspect.iscoroutinefunctionDocumentation
Added comprehensive docstrings to verl/experimental/reward/reward_loop/limited.py:
AsyncTokenBucket class (lines 30-137)
RateLimitedRewardLoopManager class (lines 140-219)
Documentation coverage:
Test Coverage
Created comprehensive test suite with 35+ test cases covering both unit and integration scenarios:
1. Unit Tests for AsyncTokenBucket (tests/experimental/reward/test_async_token_bucket_on_cpu.py)
19 test cases covering:
2. Integration Tests with Mock API Functions (tests/experimental/reward/test_rate_limited_reward_manager_on_cpu.py)
16 test cases covering:
Mock API functions implemented:
mock_sync_reward_function- Synchronous API simulationmock_async_reward_function- Asynchronous API with call trackingmock_slow_api_function- Timeout testing (2s delay)mock_failing_api_function- Error handling testingmock_dict_result_function- Complex result format testingMockAPICounter- Global call tracking and rate measurementCI Integration:
*_on_cpu.pynaming conventionTest execution:
Checklist Before Submitting
pre-commit install && pre-commit run --all-files --show-diff-on-failure --color=alwaysci-requestchannel in theverlSlack workspace.Notes for reviewers: