Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
4e1c05e
[trainer, cfg] feat: Add AlgorithmConfig dataclass for type-safe algo…
openhands-agent Jun 20, 2025
9ed10fa
Complete algorithm config dataclass migration
openhands-agent Jun 21, 2025
646a1e7
Remove temporary test files
openhands-agent Jun 21, 2025
d7aa12b
Revert changes and rename algorithm config to algo config
openhands-agent Jun 21, 2025
109340d
Update compute_advantage type annotations and fix linting
openhands-agent Jun 21, 2025
89e4b34
Update all core_algos.py functions to use AlgoConfig type annotations
openhands-agent Jun 21, 2025
f0f406f
Fix compute_grpo_outcome_advantage function signature to include Algo…
openhands-agent Jun 21, 2025
637a358
Merge main into feat/algorithm-config-dataclass
openhands-agent Jun 22, 2025
9eeab2e
init frozen adaptor
eric-haibin-lin Jun 29, 2025
1b85290
move to profiler folder
eric-haibin-lin Jun 30, 2025
ba93223
backward compat namespace move
eric-haibin-lin Jun 30, 2025
da8d771
fix lint
eric-haibin-lin Jun 30, 2025
0b1cb62
remove omega_conf_to_dataclass type
eric-haibin-lin Jun 30, 2025
2c25c76
Refactor algorithm config classes to use frozen dataclasses and BaseC…
devin-ai-integration[bot] Jun 30, 2025
520b23d
Revert documentation changes and fix omega_conf_to_dataclass call
devin-ai-integration[bot] Jun 30, 2025
80685b4
Fix config.get() call in compute_advantage function
devin-ai-integration[bot] Jun 30, 2025
2df1773
Merge main branch and resolve conflicts
devin-ai-integration[bot] Jun 30, 2025
52c62b3
Fix lint issues after merge
devin-ai-integration[bot] Jun 30, 2025
562a111
Fix type annotation and docstring coverage issues
devin-ai-integration[bot] Jun 30, 2025
81d7edf
Add test_base_config_on_cpu.py to allow list and update omega_conf_to…
devin-ai-integration[bot] Jun 30, 2025
a6df414
fix test
eric-haibin-lin Jun 30, 2025
6e743a5
fix litn
eric-haibin-lin Jun 30, 2025
ffa8d77
convert to dataclass upfront
eric-haibin-lin Jun 30, 2025
12c22b8
Merge branch 'feat/algorithm-config-dataclass' of code.byted.org:data…
eric-haibin-lin Jun 30, 2025
e2fac2c
update import stmt
eric-haibin-lin Jun 30, 2025
969a734
merge with main
eric-haibin-lin Jun 30, 2025
69a1a17
fix lint
eric-haibin-lin Jun 30, 2025
f1f4047
add _target_ to megatron config
eric-haibin-lin Jun 30, 2025
7bcd0fe
fix ranks init
eric-haibin-lin Jun 30, 2025
0eacb9f
adjust line-len
eric-haibin-lin Jul 1, 2025
ac19891
adjust len=120
eric-haibin-lin Jul 1, 2025
c907607
merge with main
eric-haibin-lin Jul 1, 2025
e63bbb0
fix lint
eric-haibin-lin Jul 1, 2025
8bce67d
merge with master
eric-haibin-lin Jul 3, 2025
fb93f20
merge with main
eric-haibin-lin Jul 4, 2025
c195f00
Merge remote-tracking branch 'oss/main' into feat/algorithm-config-da…
eric-haibin-lin Jul 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
adjust line-len
  • Loading branch information
eric-haibin-lin committed Jul 1, 2025
commit 0eacb9ff93acfe3a2293c70913ae20e83727b9b5
58 changes: 46 additions & 12 deletions recipe/dapo/dapo_ray_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,13 @@
compute_timing_metrics,
reduce_metrics,
)
from verl.trainer.ppo.ray_trainer import AdvantageEstimator, RayPPOTrainer, apply_kl_penalty, compute_advantage, compute_response_mask
from verl.trainer.ppo.ray_trainer import (
AdvantageEstimator,
RayPPOTrainer,
apply_kl_penalty,
compute_advantage,
compute_response_mask,
)
from verl.utils.profiler import marked_timer


Expand Down Expand Up @@ -139,7 +145,9 @@ def fit(self):

del gen_baseline_batch, gen_baseline_output

new_batch.non_tensor_batch["uid"] = np.array([str(uuid.uuid4()) for _ in range(len(new_batch.batch))], dtype=object)
new_batch.non_tensor_batch["uid"] = np.array(
[str(uuid.uuid4()) for _ in range(len(new_batch.batch))], dtype=object
)
# repeat to align with repeated responses in rollout
new_batch = new_batch.repeat(repeat_times=self.config.actor_rollout_ref.rollout.n, interleave=True)
new_batch = new_batch.union(gen_batch_output)
Expand Down Expand Up @@ -167,12 +175,18 @@ def fit(self):
new_batch.batch["token_level_scores"] = reward_tensor

if reward_extra_infos_dict:
new_batch.non_tensor_batch.update({k: np.array(v) for k, v in reward_extra_infos_dict.items()})
new_batch.non_tensor_batch.update(
{k: np.array(v) for k, v in reward_extra_infos_dict.items()}
)

# compute rewards. apply_kl_penalty if available
if self.config.algorithm.use_kl_in_reward:
new_batch, kl_metrics = apply_kl_penalty(new_batch, kl_ctrl=self.kl_ctrl_in_reward, kl_penalty=self.config.algorithm.kl_penalty)
metrics.update(kl_metrics) # TODO: This will be cleared if we use multiple genenration batches
new_batch, kl_metrics = apply_kl_penalty(
new_batch, kl_ctrl=self.kl_ctrl_in_reward, kl_penalty=self.config.algorithm.kl_penalty
)
metrics.update(
kl_metrics
) # TODO: This will be cleared if we use multiple genenration batches
else:
new_batch.batch["token_level_rewards"] = new_batch.batch["token_level_scores"]

Expand All @@ -183,20 +197,30 @@ def fit(self):
metric_name = self.config.algorithm.filter_groups.metric
if metric_name == "seq_final_reward":
# Turn to numpy for easier filtering
new_batch.non_tensor_batch["seq_final_reward"] = new_batch.batch["token_level_rewards"].sum(dim=-1).numpy()
new_batch.non_tensor_batch["seq_final_reward"] = (
new_batch.batch["token_level_rewards"].sum(dim=-1).numpy()
)
elif metric_name == "seq_reward":
new_batch.non_tensor_batch["seq_reward"] = new_batch.batch["token_level_scores"].sum(dim=-1).numpy()
new_batch.non_tensor_batch["seq_reward"] = (
new_batch.batch["token_level_scores"].sum(dim=-1).numpy()
)

# Collect the sequence reward for each trajectory
prompt_uid2metric_vals = defaultdict(list)
for uid, metric_val in zip(new_batch.non_tensor_batch["uid"], new_batch.non_tensor_batch[metric_name]):
for uid, metric_val in zip(
new_batch.non_tensor_batch["uid"], new_batch.non_tensor_batch[metric_name]
):
prompt_uid2metric_vals[uid].append(metric_val)

prompt_uid2metric_std = {}
for prompt_uid, metric_vals in prompt_uid2metric_vals.items():
prompt_uid2metric_std[prompt_uid] = np.std(metric_vals)

kept_prompt_uids = [uid for uid, std in prompt_uid2metric_std.items() if std > 0 or len(prompt_uid2metric_vals[uid]) == 1]
kept_prompt_uids = [
uid
for uid, std in prompt_uid2metric_std.items()
if std > 0 or len(prompt_uid2metric_vals[uid]) == 1
]
num_prompt_in_batch += len(kept_prompt_uids)

kept_traj_idxs = []
Expand All @@ -216,7 +240,11 @@ def fit(self):
progress_bar.update(1)
continue
else:
raise ValueError(f"{num_gen_batches=} >= {max_num_gen_batches=}." + " Generated too many. Please check if your data are too difficult." + " You could also try set max_num_gen_batches=0 to enable endless trials.")
raise ValueError(
f"{num_gen_batches=} >= {max_num_gen_batches=}."
+ " Generated too many. Please check if your data are too difficult."
+ " You could also try set max_num_gen_batches=0 to enable endless trials."
)
else:
# Align the batch
traj_bsz = self.config.data.train_batch_size * self.config.actor_rollout_ref.rollout.n
Expand Down Expand Up @@ -289,14 +317,20 @@ def fit(self):
metrics.update(actor_output_metrics)

# validate
if self.val_reward_fn is not None and self.config.trainer.test_freq > 0 and (is_last_step or self.global_steps % self.config.trainer.test_freq == 0):
if (
self.val_reward_fn is not None
and self.config.trainer.test_freq > 0
and (is_last_step or self.global_steps % self.config.trainer.test_freq == 0)
):
with marked_timer("testing", timing_raw, "green"):
val_metrics: dict = self._validate()
if is_last_step:
last_val_metrics = val_metrics
metrics.update(val_metrics)

if self.config.trainer.save_freq > 0 and (is_last_step or self.global_steps % self.config.trainer.save_freq == 0):
if self.config.trainer.save_freq > 0 and (
is_last_step or self.global_steps % self.config.trainer.save_freq == 0
):
with marked_timer("save_checkpoint", timing_raw, "green"):
self._save_checkpoint()

Expand Down
69 changes: 55 additions & 14 deletions recipe/entropy/entropy_ray_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,19 @@
from tqdm import tqdm

from verl import DataProto
from verl.trainer.ppo.metric_utils import compute_data_metrics, compute_throughout_metrics, compute_timing_metrics, reduce_metrics
from verl.trainer.ppo.ray_trainer import AdvantageEstimator, RayPPOTrainer, apply_kl_penalty, compute_advantage, compute_response_mask
from verl.trainer.ppo.metric_utils import (
compute_data_metrics,
compute_throughout_metrics,
compute_timing_metrics,
reduce_metrics,
)
from verl.trainer.ppo.ray_trainer import (
AdvantageEstimator,
RayPPOTrainer,
apply_kl_penalty,
compute_advantage,
compute_response_mask,
)
from verl.utils.profiler import simple_timer


Expand Down Expand Up @@ -126,7 +137,9 @@ def fit(self):

del gen_baseline_batch, gen_baseline_output

new_batch.non_tensor_batch["uid"] = np.array([str(uuid.uuid4()) for _ in range(len(new_batch.batch))], dtype=object)
new_batch.non_tensor_batch["uid"] = np.array(
[str(uuid.uuid4()) for _ in range(len(new_batch.batch))], dtype=object
)
# repeat to align with repeated responses in rollout
new_batch = new_batch.repeat(repeat_times=self.config.actor_rollout_ref.rollout.n, interleave=True)
new_batch = new_batch.union(gen_batch_output)
Expand Down Expand Up @@ -155,12 +168,18 @@ def fit(self):

print(f"{list(reward_extra_infos_dict.keys())=}")
if reward_extra_infos_dict:
new_batch.non_tensor_batch.update({k: np.array(v) for k, v in reward_extra_infos_dict.items()})
new_batch.non_tensor_batch.update(
{k: np.array(v) for k, v in reward_extra_infos_dict.items()}
)

# compute rewards. apply_kl_penalty if available
if self.config.algorithm.use_kl_in_reward:
new_batch, kl_metrics = apply_kl_penalty(new_batch, kl_ctrl=self.kl_ctrl_in_reward, kl_penalty=self.config.algorithm.kl_penalty)
metrics.update(kl_metrics) # TODO: This will be cleared if we use multiple genenration batches
new_batch, kl_metrics = apply_kl_penalty(
new_batch, kl_ctrl=self.kl_ctrl_in_reward, kl_penalty=self.config.algorithm.kl_penalty
)
metrics.update(
kl_metrics
) # TODO: This will be cleared if we use multiple genenration batches
else:
new_batch.batch["token_level_rewards"] = new_batch.batch["token_level_scores"]

Expand All @@ -171,20 +190,30 @@ def fit(self):
metric_name = self.config.algorithm.filter_groups.metric
if metric_name == "seq_final_reward":
# Turn to numpy for easier filtering
new_batch.non_tensor_batch["seq_final_reward"] = new_batch.batch["token_level_rewards"].sum(dim=-1).numpy()
new_batch.non_tensor_batch["seq_final_reward"] = (
new_batch.batch["token_level_rewards"].sum(dim=-1).numpy()
)
elif metric_name == "seq_reward":
new_batch.non_tensor_batch["seq_reward"] = new_batch.batch["token_level_scores"].sum(dim=-1).numpy()
new_batch.non_tensor_batch["seq_reward"] = (
new_batch.batch["token_level_scores"].sum(dim=-1).numpy()
)

# Collect the sequence reward for each trajectory
prompt_uid2metric_vals = defaultdict(list)
for uid, metric_val in zip(new_batch.non_tensor_batch["uid"], new_batch.non_tensor_batch[metric_name]):
for uid, metric_val in zip(
new_batch.non_tensor_batch["uid"], new_batch.non_tensor_batch[metric_name]
):
prompt_uid2metric_vals[uid].append(metric_val)

prompt_uid2metric_std = {}
for prompt_uid, metric_vals in prompt_uid2metric_vals.items():
prompt_uid2metric_std[prompt_uid] = np.std(metric_vals)

kept_prompt_uids = [uid for uid, std in prompt_uid2metric_std.items() if std > 0 or len(prompt_uid2metric_vals[uid]) == 1]
kept_prompt_uids = [
uid
for uid, std in prompt_uid2metric_std.items()
if std > 0 or len(prompt_uid2metric_vals[uid]) == 1
]
num_prompt_in_batch += len(kept_prompt_uids)

kept_traj_idxs = []
Expand All @@ -203,11 +232,17 @@ def fit(self):
print(f"{num_gen_batches=}. Keep generating...")
continue
else:
raise ValueError(f"{num_gen_batches=} >= {max_num_gen_batches=}." + " Generated too many. Please check if your data are too difficult." + " You could also try set max_num_gen_batches=0 to enable endless trials.")
raise ValueError(
f"{num_gen_batches=} >= {max_num_gen_batches=}."
+ " Generated too many. Please check if your data are too difficult."
+ " You could also try set max_num_gen_batches=0 to enable endless trials."
)
else:
# Align the batch
traj_bsz = self.config.data.train_batch_size * self.config.actor_rollout_ref.rollout.n
print(f"Collected {num_prompt_in_batch} / {self.config.data.train_batch_size} prompt. Collecting finished.")
print(
f"Collected {num_prompt_in_batch} / {self.config.data.train_batch_size} prompt. Collecting finished."
)
batch = batch[:traj_bsz]

# === Updating ===
Expand Down Expand Up @@ -268,14 +303,20 @@ def fit(self):
metrics.update(actor_output_metrics)

# validate
if self.val_reward_fn is not None and self.config.trainer.test_freq > 0 and (is_last_step or self.global_steps % self.config.trainer.test_freq == 0):
if (
self.val_reward_fn is not None
and self.config.trainer.test_freq > 0
and (is_last_step or self.global_steps % self.config.trainer.test_freq == 0)
):
with simple_timer("testing", timing_raw):
val_metrics: dict = self._validate()
if is_last_step:
last_val_metrics = val_metrics
metrics.update(val_metrics)

if self.config.trainer.save_freq > 0 and (is_last_step or self.global_steps % self.config.trainer.save_freq == 0):
if self.config.trainer.save_freq > 0 and (
is_last_step or self.global_steps % self.config.trainer.save_freq == 0
):
with simple_timer("save_checkpoint", timing_raw):
self._save_checkpoint()

Expand Down
16 changes: 13 additions & 3 deletions tests/utils/test_nvtx_profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ def test_annotate_decorator(self):
def test_func(self, *args, **kwargs):
return "result"

with patch("torch.cuda.profiler.start") as mock_start, patch("torch.cuda.profiler.stop") as mock_stop, patch("verl.utils.profiler.nvtx_profile.mark_start_range") as mock_start_range, patch("verl.utils.profiler.nvtx_profile.mark_end_range") as mock_end_range:
with patch("torch.cuda.profiler.start") as mock_start, patch("torch.cuda.profiler.stop") as mock_stop, patch(
"verl.utils.profiler.nvtx_profile.mark_start_range"
) as mock_start_range, patch("verl.utils.profiler.nvtx_profile.mark_end_range") as mock_end_range:
result = test_func(mock_self)
self.assertEqual(result, "result")
mock_start_range.assert_called_once()
Expand All @@ -97,7 +99,9 @@ def test_annotate_discrete_mode(self):
def test_func(self, *args, **kwargs):
return "result"

with patch("torch.cuda.profiler.start") as mock_start, patch("torch.cuda.profiler.stop") as mock_stop, patch("verl.utils.profiler.nvtx_profile.mark_start_range") as mock_start_range, patch("verl.utils.profiler.nvtx_profile.mark_end_range") as mock_end_range:
with patch("torch.cuda.profiler.start") as mock_start, patch("torch.cuda.profiler.stop") as mock_stop, patch(
"verl.utils.profiler.nvtx_profile.mark_start_range"
) as mock_start_range, patch("verl.utils.profiler.nvtx_profile.mark_end_range") as mock_end_range:
result = test_func(mock_self)
self.assertEqual(result, "result")
mock_start_range.assert_called_once()
Expand All @@ -108,7 +112,13 @@ def test_func(self, *args, **kwargs):
def test_config_init(self):
cfg = OmegaConf.load("verl/trainer/config/ppo_trainer.yaml")
arr = cfg.actor_rollout_ref
for config in [cfg.critic.profiler, arr.actor.profiler, cfg.reward_model.profiler, arr.ref.profiler, arr.rollout.profiler]:
for config in [
cfg.critic.profiler,
arr.actor.profiler,
cfg.reward_model.profiler,
arr.ref.profiler,
arr.rollout.profiler,
]:
profiler_config = omega_conf_to_dataclass(config)
self.assertEqual(profiler_config.discrete, config.discrete)
self.assertEqual(profiler_config.all_ranks, config.all_ranks)
Expand Down
Loading