Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
68 commits
Select commit Hold shift + click to select a range
391a1fc
one step off async training recipe
imh966 Jun 27, 2025
338c2a9
simplify trainer
imh966 Jun 30, 2025
071ddc2
fix resource pool config and simplify the trainer yaml file
imh966 Jul 1, 2025
71569f5
separate actor and rollout class
imh966 Jul 3, 2025
78ef6f2
update name of recipe and add license
imh966 Jul 3, 2025
e274747
Merge branch 'volcengine:main' into recipe/async_training
ArronHZG Jul 7, 2025
b9a9618
one_step_off_policy megatron
ArronHZG Jul 7, 2025
8dc0034
use fsdp2 and clear useless code
lalala-2 Jul 7, 2025
5ea1c00
fix config
lalala-2 Jul 7, 2025
69d58c4
fix
lalala-2 Jul 7, 2025
6cdaf2e
one_step_off_policy dapo_7b 2 node
ArronHZG Jul 7, 2025
36ed4f6
recipe/one_step_off_policy
ArronHZG Jul 8, 2025
a1966ef
opt gen_next_batch
lalala-2 Jul 8, 2025
40df88f
Merge branch 'recipe/async_training_megatron' of https://github.com/i…
lalala-2 Jul 8, 2025
5d52efa
4_12_megatron
ArronHZG Jul 8, 2025
59f6be9
4_12_megatron
ArronHZG Jul 8, 2025
dfabe15
megatron config
ArronHZG Jul 8, 2025
40e8816
megatron config
ArronHZG Jul 8, 2025
fc76d4f
fix megatron
lalala-2 Jul 8, 2025
dedc436
Merge branch 'recipe/async_training_megatron' of https://github.com/i…
lalala-2 Jul 8, 2025
344581f
megatron config
ArronHZG Jul 8, 2025
0091f52
megatron config
ArronHZG Jul 9, 2025
283f7fd
megatron config
ArronHZG Jul 9, 2025
6871a29
cross epoch
ArronHZG Jul 9, 2025
1b96322
ruff format
ArronHZG Jul 9, 2025
652f91f
# Copyright 2025 Meituan Ltd. and/or its affiliates
ArronHZG Jul 9, 2025
b36918c
add Copyright
ArronHZG Jul 9, 2025
84b712d
optim sh
ArronHZG Jul 9, 2025
4685463
python3
ArronHZG Jul 9, 2025
7f3d1db
update recipe
ArronHZG Jul 10, 2025
592f393
add doc
ArronHZG Jul 10, 2025
2fb1cd9
Merge branch 'volcengine:main' into recipe/async_training
ArronHZG Jul 10, 2025
dff8f56
update date
ArronHZG Jul 11, 2025
648cb44
update date
ArronHZG Jul 11, 2025
c2395f7
config
ArronHZG Jul 11, 2025
165c1b2
Revert "fix config"
lalala-2 Jul 11, 2025
aaa356e
fix error
lalala-2 Jul 11, 2025
03f1dec
update is_last_step
ArronHZG Jul 11, 2025
e2007ef
one_step_off_policy
ArronHZG Jul 11, 2025
204d624
update readme
ArronHZG Jul 14, 2025
19fac39
e2e_one_step_off_policy
ArronHZG Jul 14, 2025
c1b86ec
add e2e test for one_step_off_policy
ArronHZG Jul 14, 2025
492ff98
add e2e test for one_step_off_policy
ArronHZG Jul 14, 2025
1e7aa47
add e2e test for one_step_off_policy
ArronHZG Jul 14, 2025
8ab0834
add e2e test for one_step_off_policy
ArronHZG Jul 14, 2025
22dc212
format
ArronHZG Jul 14, 2025
dcbfb0c
ruff check
ArronHZG Jul 14, 2025
1e8cee3
add megatron test
ArronHZG Jul 14, 2025
27c9816
Merge pull request #2 from imh966/recipe/async_training_e2e_test
ArronHZG Jul 14, 2025
727320b
Merge branch 'volcengine:main' into recipe/async_training
ArronHZG Jul 14, 2025
8727916
rm spmd
ArronHZG Jul 14, 2025
42ddeed
CI check fix some error
ArronHZG Jul 14, 2025
5ffd8b4
merge main
ArronHZG Jul 15, 2025
1c9b6eb
change author
ArronHZG Jul 15, 2025
8772b14
update e2e_one_step_off_policy CI rule
ArronHZG Jul 15, 2025
c8468e6
update comments
ArronHZG Jul 15, 2025
d8dd8b0
Merge branch 'volcengine:main' into recipe/async_training
ArronHZG Jul 15, 2025
659b108
update ruff
ArronHZG Jul 15, 2025
9b5646a
Fix pre-commit error: sort imports in async_main_ppo.py
openhands-agent Jul 15, 2025
1ed49c7
rollout.nnodes
ArronHZG Jul 16, 2025
754cfae
update code and doc by comments
ArronHZG Jul 16, 2025
8df1c1b
ruff
ArronHZG Jul 16, 2025
1837fc7
update code and doc by comments
ArronHZG Jul 16, 2025
c56467f
update docs
ArronHZG Jul 16, 2025
174d94a
Merge branch 'recipe/async_training' of https://github.com/imh966/ver…
ArronHZG Jul 16, 2025
e3db358
Merge branch 'recipe/async_training' into recipe/async_training_rollo…
ArronHZG Jul 16, 2025
8e5b714
Merge pull request #3 from imh966/recipe/async_training_rollout_nodes
ArronHZG Jul 16, 2025
40b2ebe
Merge branch 'volcengine:main' into recipe/async_training
ArronHZG Jul 16, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix resource pool config and simplify the trainer yaml file
  • Loading branch information
imh966 committed Jul 1, 2025
commit 071ddc2763aefdac20fba790a9e6c717fb1c6701
116 changes: 62 additions & 54 deletions recipe/async/async_main_ppo.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from .async_ray_trainer import AsyncRayPPOTrainer


@hydra.main(config_path="config", config_name="ppo_trainer", version_base=None)
@hydra.main(config_path="config", config_name="async_ppo_trainer", version_base=None)
def main(config):
run_ppo(config)

Expand All @@ -42,13 +42,23 @@ def run_ppo(config) -> None:
# NCCL debug level, VLLM logging level, and allow runtime LoRA updating
# `num_cpus` specifies the number of CPU cores Ray can use, obtained from the configuration
ray.init(
runtime_env={"env_vars": {"TOKENIZERS_PARALLELISM": "true", "NCCL_DEBUG": "WARN", "VLLM_LOGGING_LEVEL": "WARN", "VLLM_ALLOW_RUNTIME_LORA_UPDATING": "true"}},
runtime_env={
"env_vars": {
"TOKENIZERS_PARALLELISM": "true",
"NCCL_DEBUG": "WARN",
"VLLM_LOGGING_LEVEL": "WARN",
"VLLM_ALLOW_RUNTIME_LORA_UPDATING": "true",
}
},
num_cpus=config.ray_init.num_cpus,
)

# Create a remote instance of the TaskRunner class, and
# Execute the `run` method of the TaskRunner instance remotely and wait for it to complete
if OmegaConf.select(config.trainer, "profile_steps") is not None and len(OmegaConf.select(config.trainer, "profile_steps")) > 0:
if (
OmegaConf.select(config.trainer, "profile_steps") is not None
and len(OmegaConf.select(config.trainer, "profile_steps")) > 0
):
nsight_options = OmegaConf.to_container(config.trainer.controller_nsight_options)
runner = TaskRunner.options(runtime_env={"nsight": nsight_options}).remote()
else:
Expand Down Expand Up @@ -80,7 +90,9 @@ def run(self, config):

# Download the checkpoint from HDFS to the local machine.
# `use_shm` determines whether to use shared memory, which could lead to faster model loading if turned on
local_path = copy_to_local(config.actor_rollout_ref.model.path, use_shm=config.actor_rollout_ref.model.get("use_shm", False))
local_path = copy_to_local(
config.actor_rollout_ref.model.path, use_shm=config.actor_rollout_ref.model.get("use_shm", False)
)

# Instantiate the tokenizer and processor.
from verl.utils import hf_processor, hf_tokenizer
Expand All @@ -105,7 +117,11 @@ def run(self, config):

from .async_fsdp_workers import ActorRolloutRefWorker, AsyncActorRolloutRefWorker, CriticWorker

actor_rollout_cls = AsyncActorRolloutRefWorker if config.actor_rollout_ref.rollout.mode == "async" else ActorRolloutRefWorker
actor_rollout_cls = (
AsyncActorRolloutRefWorker
if config.actor_rollout_ref.rollout.mode == "async"
else ActorRolloutRefWorker
)
ray_worker_group_cls = RayWorkerGroup

elif config.actor_rollout_ref.actor.strategy == "megatron":
Expand All @@ -114,61 +130,49 @@ def run(self, config):
from verl.single_controller.ray.megatron import NVMegatronRayWorkerGroup
from verl.workers.megatron_workers import ActorRolloutRefWorker, AsyncActorRolloutRefWorker, CriticWorker

actor_rollout_cls = AsyncActorRolloutRefWorker if config.actor_rollout_ref.rollout.mode == "async" else ActorRolloutRefWorker
actor_rollout_cls = (
AsyncActorRolloutRefWorker
if config.actor_rollout_ref.rollout.mode == "async"
else ActorRolloutRefWorker
)
ray_worker_group_cls = NVMegatronRayWorkerGroup

else:
raise NotImplementedError

from .async_ray_trainer import ResourcePoolManager, Role

if config.actor_rollout_ref.hybrid_engine:
global_pool_id = "global_pool"
role_worker_mapping = {
Role.ActorRollout: ray.remote(actor_rollout_cls),
Role.Critic: ray.remote(CriticWorker),
}
# Define the resource pool specification.
# Map roles to the resource pool.
resource_pool_spec = {
global_pool_id: [config.trainer.n_gpus_per_node] * config.trainer.nnodes,
}
mapping = {
Role.ActorRollout: global_pool_id,
Role.Critic: global_pool_id,
}
role_worker_mapping = {
Role.Actor: ray.remote(actor_rollout_cls),
Role.Rollout: ray.remote(actor_rollout_cls),
Role.Critic: ray.remote(CriticWorker),
}

global_pool_id = "actor_pool"
n_gpus = config.trainer.n_gpus_per_node * config.trainer.nnodes
n_gpus_rollout = config.actor_rollout_ref.rollout.n_gpus
assert n_gpus_rollout is not None
assert n_gpus_rollout > 0 and n_gpus_rollout < n_gpus
n_gpus_actor = n_gpus - n_gpus_rollout
if n_gpus_rollout % config.trainer.n_gpus_per_node == 0:
nnodes_rollout = n_gpus_rollout // config.trainer.n_gpus_per_node
actor_pool = [config.trainer.n_gpus_per_node] * (config.trainer.nnodes - nnodes_rollout)
rollout_pool = [config.trainer.n_gpus_per_node] * nnodes_rollout
elif n_gpus_rollout % config.trainer.nnodes == 0:
actor_pool = [n_gpus_actor // config.trainer.nnodes] * config.trainer.nnodes
rollout_pool = [n_gpus_rollout // config.trainer.nnodes] * config.trainer.nnodes
else:
role_worker_mapping = {
Role.Actor: ray.remote(actor_rollout_cls),
Role.Rollout: ray.remote(actor_rollout_cls),
Role.Critic: ray.remote(CriticWorker),
}

global_pool_id = "actor_pool"
n_gpus = config.trainer.n_gpus_per_node * config.trainer.nnodes
n_gpus_rollout = config.actor_rollout_ref.rollout.n_gpus
assert n_gpus_rollout is not None
assert n_gpus_rollout > 0 and n_gpus_rollout < n_gpus
n_gpus_actor = n_gpus - n_gpus_rollout
actor_pool = [n_gpus_actor]
for _ in range(config.trainer.nnodes - 1):
w = actor_pool[-1]
new_w = min(config.trainer.n_gpus_per_node, w)
next_w = w - new_w
actor_pool[-1] = new_w
actor_pool.append(next_w)

rollout_pool = [config.trainer.n_gpus_per_node - x for x in actor_pool]
resource_pool_spec = {
"actor_pool": actor_pool,
"rollout_pool": rollout_pool,
}
mapping = {
Role.Actor: "actor_pool",
Role.Rollout: "rollout_pool",
Role.Critic: "actor_pool",
}

raise ValueError("rollout.n_gpus should be divisible by n_gpus_per_node or nnodes")
resource_pool_spec = {
"actor_pool": actor_pool,
"rollout_pool": rollout_pool,
}
mapping = {
Role.Actor: "actor_pool",
Role.Rollout: "rollout_pool",
Role.Critic: "actor_pool",
}
print(f"resource_pool_spec: {resource_pool_spec}")
# We should adopt a multi-source reward function here:
# - for rule-based rm, we directly call a reward score
# - for model-based rm, we call a model
Expand All @@ -191,8 +195,12 @@ def run(self, config):
mapping[Role.RefPolicy] = global_pool_id

# Load the reward manager for training and validation.
reward_fn = load_reward_manager(config, tokenizer, num_examine=0, **config.reward_model.get("reward_kwargs", {}))
val_reward_fn = load_reward_manager(config, tokenizer, num_examine=1, **config.reward_model.get("reward_kwargs", {}))
reward_fn = load_reward_manager(
config, tokenizer, num_examine=0, **config.reward_model.get("reward_kwargs", {})
)
val_reward_fn = load_reward_manager(
config, tokenizer, num_examine=1, **config.reward_model.get("reward_kwargs", {})
)
resource_pool_manager = ResourcePoolManager(resource_pool_spec=resource_pool_spec, mapping=mapping)

from verl.utils.dataset.rl_dataset import collate_fn
Expand Down
15 changes: 15 additions & 0 deletions recipe/async/config/async_ppo_trainer.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
hydra:
searchpath:
- file://verl/trainer/config

defaults:
- ppo_trainer
- _self_


actor_rollout_ref:
hybrid_engine: false
rollout:

# the number of gpu for rollout model
n_gpus: 4
Loading