Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
4e1c05e
[trainer, cfg] feat: Add AlgorithmConfig dataclass for type-safe algo…
openhands-agent Jun 20, 2025
9ed10fa
Complete algorithm config dataclass migration
openhands-agent Jun 21, 2025
646a1e7
Remove temporary test files
openhands-agent Jun 21, 2025
d7aa12b
Revert changes and rename algorithm config to algo config
openhands-agent Jun 21, 2025
109340d
Update compute_advantage type annotations and fix linting
openhands-agent Jun 21, 2025
89e4b34
Update all core_algos.py functions to use AlgoConfig type annotations
openhands-agent Jun 21, 2025
f0f406f
Fix compute_grpo_outcome_advantage function signature to include Algo…
openhands-agent Jun 21, 2025
637a358
Merge main into feat/algorithm-config-dataclass
openhands-agent Jun 22, 2025
9eeab2e
init frozen adaptor
eric-haibin-lin Jun 29, 2025
1b85290
move to profiler folder
eric-haibin-lin Jun 30, 2025
ba93223
backward compat namespace move
eric-haibin-lin Jun 30, 2025
da8d771
fix lint
eric-haibin-lin Jun 30, 2025
0b1cb62
remove omega_conf_to_dataclass type
eric-haibin-lin Jun 30, 2025
2c25c76
Refactor algorithm config classes to use frozen dataclasses and BaseC…
devin-ai-integration[bot] Jun 30, 2025
520b23d
Revert documentation changes and fix omega_conf_to_dataclass call
devin-ai-integration[bot] Jun 30, 2025
80685b4
Fix config.get() call in compute_advantage function
devin-ai-integration[bot] Jun 30, 2025
2df1773
Merge main branch and resolve conflicts
devin-ai-integration[bot] Jun 30, 2025
52c62b3
Fix lint issues after merge
devin-ai-integration[bot] Jun 30, 2025
562a111
Fix type annotation and docstring coverage issues
devin-ai-integration[bot] Jun 30, 2025
81d7edf
Add test_base_config_on_cpu.py to allow list and update omega_conf_to…
devin-ai-integration[bot] Jun 30, 2025
a6df414
fix test
eric-haibin-lin Jun 30, 2025
6e743a5
fix litn
eric-haibin-lin Jun 30, 2025
ffa8d77
convert to dataclass upfront
eric-haibin-lin Jun 30, 2025
12c22b8
Merge branch 'feat/algorithm-config-dataclass' of code.byted.org:data…
eric-haibin-lin Jun 30, 2025
e2fac2c
update import stmt
eric-haibin-lin Jun 30, 2025
969a734
merge with main
eric-haibin-lin Jun 30, 2025
69a1a17
fix lint
eric-haibin-lin Jun 30, 2025
f1f4047
add _target_ to megatron config
eric-haibin-lin Jun 30, 2025
7bcd0fe
fix ranks init
eric-haibin-lin Jun 30, 2025
0eacb9f
adjust line-len
eric-haibin-lin Jul 1, 2025
ac19891
adjust len=120
eric-haibin-lin Jul 1, 2025
c907607
merge with main
eric-haibin-lin Jul 1, 2025
e63bbb0
fix lint
eric-haibin-lin Jul 1, 2025
8bce67d
merge with master
eric-haibin-lin Jul 3, 2025
fb93f20
merge with main
eric-haibin-lin Jul 4, 2025
c195f00
Merge remote-tracking branch 'oss/main' into feat/algorithm-config-da…
eric-haibin-lin Jul 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
merge with main
  • Loading branch information
eric-haibin-lin committed Jul 1, 2025
commit c907607ee122936f2007028cb56297db5ac68b59
3 changes: 2 additions & 1 deletion recipe/entropy/entropy_ray_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,8 @@ def fit(self):
# Align the batch
traj_bsz = self.config.data.train_batch_size * self.config.actor_rollout_ref.rollout.n
print(
f"Collected {num_prompt_in_batch} / {self.config.data.train_batch_size} prompt. Collecting finished."
f"Collected {num_prompt_in_batch} / {self.config.data.train_batch_size} prompt. "
f"Collecting finished."
)
batch = batch[:traj_bsz]

Expand Down
29 changes: 25 additions & 4 deletions tests/trainer/config/test_algo_config_on_cpu.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@
from omegaconf import OmegaConf

from verl.trainer.config import AlgoConfig, KLControlConfig, PFPPOConfig
from verl.trainer.ppo.core_algos import compute_gae_advantage_return, compute_grpo_outcome_advantage, get_adv_estimator_fn
from verl.trainer.ppo.core_algos import (
compute_gae_advantage_return,
compute_grpo_outcome_advantage,
get_adv_estimator_fn,
)
from verl.utils.config import omega_conf_to_dataclass


Expand All @@ -37,7 +41,13 @@ def setUp(self):
"norm_adv_by_std_in_grpo": True,
"use_kl_in_reward": True,
"kl_penalty": "kl",
"kl_ctrl": {"_target_": "verl.trainer.config.KLControlConfig", "type": "adaptive", "kl_coef": 0.002, "horizon": 5000, "target_kl": 0.05},
"kl_ctrl": {
"_target_": "verl.trainer.config.KLControlConfig",
"type": "adaptive",
"kl_coef": 0.002,
"horizon": 5000,
"target_kl": 0.05,
},
"use_pf_ppo": True,
"pf_ppo": {"_target_": "verl.trainer.config.PFPPOConfig", "reweight_method": "max_min", "weight_pow": 3.0},
}
Expand Down Expand Up @@ -130,7 +140,13 @@ def test_advantage_estimator_with_cfg(self):
values = torch.randn(batch_size, seq_len)
response_mask = torch.ones(batch_size, seq_len)

advantages, returns = compute_gae_advantage_return(token_level_rewards=token_level_rewards, values=values, response_mask=response_mask, gamma=config.gamma, lam=config.lam)
advantages, returns = compute_gae_advantage_return(
token_level_rewards=token_level_rewards,
values=values,
response_mask=response_mask,
gamma=config.gamma,
lam=config.lam,
)

self.assertEqual(advantages.shape, (batch_size, seq_len))
self.assertEqual(returns.shape, (batch_size, seq_len))
Expand All @@ -145,7 +161,12 @@ def test_grpo_advantage_estimator_with_cfg(self):
response_mask = torch.ones(batch_size, seq_len)
index = np.array([0, 0, 1, 1]) # Two groups

advantages, returns = compute_grpo_outcome_advantage(token_level_rewards=token_level_rewards, response_mask=response_mask, index=index, norm_adv_by_std_in_grpo=grpo_config.norm_adv_by_std_in_grpo)
advantages, returns = compute_grpo_outcome_advantage(
token_level_rewards=token_level_rewards,
response_mask=response_mask,
index=index,
norm_adv_by_std_in_grpo=grpo_config.norm_adv_by_std_in_grpo,
)

self.assertEqual(advantages.shape, (batch_size, seq_len))
self.assertEqual(returns.shape, (batch_size, seq_len))
Expand Down
3 changes: 2 additions & 1 deletion verl/trainer/fsdp_sft_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,8 @@ def _build_model_optimizer(self):

if self.device_mesh.get_rank() == 0:
print(
f"Number of steps/epoch {self.steps_per_epoch}, number of epochs {self.config.trainer.total_epochs}, total number of steps {self.total_steps}"
f"Number of steps/epoch {self.steps_per_epoch}, number of epochs "
f"{self.config.trainer.total_epochs}, total number of steps {self.total_steps}"
)

num_warmup_steps = int(self.total_steps * self.config.optim.warmup_steps_ratio)
Expand Down
23 changes: 14 additions & 9 deletions verl/trainer/ppo/ray_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,9 @@ def compute_advantage(
lam (float, optional): Lambda parameter for GAE. Defaults to 1.0.
num_repeat (int, optional): Number of times to repeat the computation. Defaults to 1.
multi_turn (bool, optional): Whether the data is from a multi-turn conversation. Defaults to False.
norm_adv_by_std_in_grpo (bool, optional): Whether to normalize advantages by standard deviation in GRPO. Defaults to True.
config (AlgoConfig, optional): Algorithm configuration containing settings. Defaults to None.
norm_adv_by_std_in_grpo (bool, optional): Whether to normalize advantages by standard deviation in
GRPO. Defaults to True.
config (dict, optional): Configuration dictionary for algorithm settings. Defaults to None.

Returns:
DataProto: The updated data with computed advantages and returns.
Expand Down Expand Up @@ -396,7 +397,8 @@ def _validate_config(self):
assert (
n_gpus % (model_parallel_size * config.actor_rollout_ref.actor.megatron.context_parallel_size) == 0
), (
f"n_gpus ({n_gpus}) must be divisible by model_parallel_size ({model_parallel_size}) times context_parallel_size ({config.actor_rollout_ref.actor.megatron.context_parallel_size})"
f"n_gpus ({n_gpus}) must be divisible by model_parallel_size ({model_parallel_size}) times "
f"context_parallel_size ({config.actor_rollout_ref.actor.megatron.context_parallel_size})"
)
megatron_dp = n_gpus // (
model_parallel_size * config.actor_rollout_ref.actor.megatron.context_parallel_size
Expand All @@ -408,7 +410,8 @@ def _validate_config(self):
# 1. Check total batch size for data correctness
real_train_batch_size = config.data.train_batch_size * config.actor_rollout_ref.rollout.n
assert real_train_batch_size % minimal_bsz == 0, (
f"real_train_batch_size ({real_train_batch_size}) must be divisible by minimal possible batch size ({minimal_bsz})"
f"real_train_batch_size ({real_train_batch_size}) must be divisible by minimal possible batch size "
f"({minimal_bsz})"
)

# A helper function to check "micro_batch_size" vs "micro_batch_size_per_gpu"
Expand All @@ -433,8 +436,8 @@ def check_mutually_exclusive(mbs, mbs_per_gpu, name: str):

if mbs is not None and mbs_per_gpu is not None:
raise ValueError(
f"[{name}] You have set both '{name}.{param}' AND '{name}.{param_per_gpu}'. Please remove '{name}.{param}' because only '*_{param_per_gpu}'"
+ "is supported (the former is deprecated)."
f"[{name}] You have set both '{name}.{param}' AND '{name}.{param_per_gpu}'. Please remove "
f"'{name}.{param}' because only '*_{param_per_gpu}' is supported (the former is deprecated)."
)

if not config.actor_rollout_ref.actor.use_dynamic_bsz:
Expand Down Expand Up @@ -540,9 +543,10 @@ def check_mutually_exclusive(mbs, mbs_per_gpu, name: str):
config.actor_rollout_ref.rollout.multi_turn.tool_config_path is not None
or config.actor_rollout_ref.rollout.multi_turn.interaction_config_path is not None
), (
"tool_config_path or interaction_config_path must be set when enabling multi_turn with tool, due to no role-playing support"
"tool_config_path or interaction_config_path must be set when enabling multi_turn with tool, "
"due to no role-playing support"
)
assert self.config.algorithm.adv_estimator in [AdvantageEstimator.GRPO], (
assert config.algorithm.adv_estimator in [AdvantageEstimator.GRPO], (
"only GRPO is tested for multi-turn with tool"
)

Expand Down Expand Up @@ -598,7 +602,8 @@ def _create_dataloader(self, train_dataset, val_dataset, collate_fn, train_sampl
assert len(self.val_dataloader) >= 1, "Validation dataloader is empty!"

print(
f"Size of train dataloader: {len(self.train_dataloader)}, Size of val dataloader: {len(self.val_dataloader)}"
f"Size of train dataloader: {len(self.train_dataloader)}, Size of val dataloader: "
f"{len(self.val_dataloader)}"
)

total_training_steps = len(self.train_dataloader) * self.config.trainer.total_epochs
Expand Down
4 changes: 3 additions & 1 deletion verl/utils/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ def omega_conf_to_dataclass(config: Union[DictConfig, dict], dataclass_type: Opt
return config

if dataclass_type is None:
assert "_target_" in config, "When dataclass_type is not provided, config must contain _target_. See trainer/config/ppo_trainer.yaml algorithm section for an example."
assert "_target_" in config, (
"When dataclass_type is not provided, config must contain _target_. See trainer/config/ppo_trainer.yaml algorithm section for an example."
)
from hydra.utils import instantiate

return instantiate(config, _convert_="partial")
Expand Down
4 changes: 3 additions & 1 deletion verl/utils/profiler/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,6 @@ def intersect(self, other: "ProfilerConfig") -> "ProfilerConfig":

def __post_init__(self) -> None:
"""config validation logics go here"""
assert isinstance(self.ranks, (set, list, tuple)), f"Profiler ranks must be of type list, got {type(self.ranks)}"
assert isinstance(self.ranks, (set, list, tuple)), (
f"Profiler ranks must be of type list, got {type(self.ranks)}"
)
21 changes: 14 additions & 7 deletions verl/workers/fsdp_workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,8 @@ def __init__(self, config: DictConfig, role: str):
self.config.actor.ppo_mini_batch_size *= self.config.rollout.n
self.config.actor.ppo_mini_batch_size //= self.device_mesh.size() // self.ulysses_sequence_parallel_size
assert self.config.actor.ppo_mini_batch_size > 0, (
f"ppo_mini_batch_size {self.config.actor.ppo_mini_batch_size} should be larger than 0 after normalization"
f"ppo_mini_batch_size {self.config.actor.ppo_mini_batch_size} should be larger than 0 after "
f"normalization"
)
# micro bsz
if self.config.actor.ppo_micro_batch_size is not None:
Expand All @@ -183,10 +184,12 @@ def __init__(self, config: DictConfig, role: str):

if self.config.actor.ppo_micro_batch_size_per_gpu is not None:
assert self.config.actor.ppo_mini_batch_size % self.config.actor.ppo_micro_batch_size_per_gpu == 0, (
f"normalized ppo_mini_batch_size {self.config.actor.ppo_mini_batch_size} should be divisible by ppo_micro_batch_size_per_gpu {self.config.actor.ppo_micro_batch_size_per_gpu}"
f"normalized ppo_mini_batch_size {self.config.actor.ppo_mini_batch_size} should be divisible by "
f"ppo_micro_batch_size_per_gpu {self.config.actor.ppo_micro_batch_size_per_gpu}"
)
assert self.config.actor.ppo_mini_batch_size // self.config.actor.ppo_micro_batch_size_per_gpu > 0, (
f"normalized ppo_mini_batch_size {self.config.actor.ppo_mini_batch_size} should be larger than ppo_micro_batch_size_per_gpu {self.config.actor.ppo_micro_batch_size_per_gpu}"
f"normalized ppo_mini_batch_size {self.config.actor.ppo_mini_batch_size} should be larger than "
f"ppo_micro_batch_size_per_gpu {self.config.actor.ppo_micro_batch_size_per_gpu}"
)

# normalize rollout config
Expand Down Expand Up @@ -887,7 +890,8 @@ def save_checkpoint(self, local_path, hdfs_path=None, global_step=0, max_ckpt_to
@register(dispatch_mode=Dispatch.ONE_TO_ALL)
def load_checkpoint(self, local_path, hdfs_path=None, del_local_after_load=False):
assert self._is_actor or (not self._is_actor and self._is_rollout), (
f"Checkpoint loading is only supported for Actor or standalone Rollout Workers, but got {self._is_actor} and {self._is_rollout}"
f"Checkpoint loading is only supported for Actor or standalone Rollout Workers, but got "
f"{self._is_actor} and {self._is_rollout}"
)

if self._is_offload_param:
Expand Down Expand Up @@ -962,10 +966,12 @@ def __init__(self, config):

if self.config.ppo_micro_batch_size_per_gpu is not None:
assert self.config.ppo_mini_batch_size % self.config.ppo_micro_batch_size_per_gpu == 0, (
f"normalized ppo_mini_batch_size {self.config.ppo_mini_batch_size} should be divisible by ppo_micro_batch_size_per_gpu {self.config.ppo_micro_batch_size_per_gpu}"
f"normalized ppo_mini_batch_size {self.config.ppo_mini_batch_size} should be divisible by "
f"ppo_micro_batch_size_per_gpu {self.config.ppo_micro_batch_size_per_gpu}"
)
assert self.config.ppo_mini_batch_size // self.config.ppo_micro_batch_size_per_gpu > 0, (
f"normalized ppo_mini_batch_size {self.config.ppo_mini_batch_size} should be larger than ppo_micro_batch_size_per_gpu {self.config.ppo_micro_batch_size_per_gpu}"
f"normalized ppo_mini_batch_size {self.config.ppo_mini_batch_size} should be larger than "
f"ppo_micro_batch_size_per_gpu {self.config.ppo_micro_batch_size_per_gpu}"
)
self._is_lora = self.config.model.get("lora_rank", 0) > 0

Expand Down Expand Up @@ -1662,7 +1668,8 @@ def execute_method(self, method: Union[str, bytes], *args, **kwargs):
"""Called by ExternalRayDistributedExecutor collective_rpc."""
if self.vllm_tp_rank == 0 and method != "execute_model":
print(
f"[DP={self.vllm_dp_rank},TP={self.vllm_tp_rank}] execute_method: {method if isinstance(method, str) else 'Callable'}"
f"[DP={self.vllm_dp_rank},TP={self.vllm_tp_rank}] execute_method: "
f"{method if isinstance(method, str) else 'Callable'}"
)
return self.rollout.execute_method(method, *args, **kwargs)

Expand Down
6 changes: 4 additions & 2 deletions verl/workers/megatron_workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ def __init__(self, config: DictConfig, role: str):
self.config.ref.log_prob_micro_batch_size_per_gpu = self.config.ref.log_prob_micro_batch_size
else:
assert self.config.ref.get("log_prob_micro_batch_size_per_gpu", None) is not None, (
"Please note that in the ref policy configuration, `log_prob_micro_batch_size_per_gpu` and `log_prob_micro_batch_size` should not be None at the same time."
"Please note that in the ref policy configuration, `log_prob_micro_batch_size_per_gpu` and "
"`log_prob_micro_batch_size` should not be None at the same time."
)
self._ref_is_offload_param = self.config.ref.megatron.get("param_offload", False)

Expand Down Expand Up @@ -665,7 +666,8 @@ def execute_method(self, method: Union[str, bytes], *args, **kwargs):
"""Called by ExternalRayDistributedExecutor collective_rpc."""
if self.vllm_tp_rank == 0 and method != "execute_model":
print(
f"[DP={self.vllm_dp_rank},TP={self.vllm_tp_rank}] execute_method: {method if isinstance(method, str) else 'Callable'}"
f"[DP={self.vllm_dp_rank},TP={self.vllm_tp_rank}] execute_method: "
f"{method if isinstance(method, str) else 'Callable'}"
)
return self.rollout.execute_method(method, *args, **kwargs)

Expand Down
You are viewing a condensed version of this merge commit. You can view the full changes here.