diff --git a/recipe/dapo/config/GRM_template.txt b/recipe/dapo/config/GRM_template.txt new file mode 100644 index 00000000000..d5783c40a6b --- /dev/null +++ b/recipe/dapo/config/GRM_template.txt @@ -0,0 +1 @@ +The above is a Q&A dialogue between a user and an assistant. It is now known that the standard answer to the user's question is {ground_truth}. Please determine whether the assistant has answered the user's question clearly, precisely, and accurately. Present your reasoning and judgment in the following format:\n\nThink: Content of Thinking\nJudgment: Correct / Incorrect \ No newline at end of file diff --git a/recipe/dapo/dapo_ray_trainer.py b/recipe/dapo/dapo_ray_trainer.py index cb3d3365be3..62394213fd7 100644 --- a/recipe/dapo/dapo_ray_trainer.py +++ b/recipe/dapo/dapo_ray_trainer.py @@ -16,6 +16,7 @@ This trainer supports model-agonistic model initialization with huggingface """ +import random import uuid from collections import defaultdict from copy import deepcopy @@ -41,6 +42,175 @@ class RayDAPOTrainer(RayPPOTrainer): Note that this trainer runs on the driver process on a single CPU/GPU node. """ + def _validate(self): + """Override the parent validation method to add GRM support""" + from verl.protocol import pad_dataproto_to_divisor, unpad_dataproto + + data_source_lst = [] + reward_extra_infos_dict: dict[str, list] = defaultdict(list) + + # Lists to collect samples for the table + sample_inputs = [] + sample_outputs = [] + sample_scores = [] + + # Collect all test batches for logging + all_test_batches = None + + for test_data in self.val_dataloader: + test_batch = DataProto.from_single_dict(test_data) + + # repeat test batch + test_batch = test_batch.repeat(repeat_times=self.config.actor_rollout_ref.rollout.val_kwargs.n, interleave=True) + + # we only do validation on rule-based rm + if self.config.reward_model.enable and test_batch[0].non_tensor_batch["reward_model"]["style"] == "model": + return {} + + # Store original inputs + input_ids = test_batch.batch["input_ids"] + # TODO: Can we keep special tokens except for padding tokens? + input_texts = [self.tokenizer.decode(ids, skip_special_tokens=True) for ids in input_ids] + sample_inputs.extend(input_texts) + + batch_keys_to_pop = ["input_ids", "attention_mask", "position_ids"] + non_tensor_batch_keys_to_pop = ["raw_prompt_ids"] + if "multi_modal_inputs" in test_batch.non_tensor_batch: + non_tensor_batch_keys_to_pop.extend(["multi_modal_data", "multi_modal_inputs"]) + if "raw_prompt" in test_batch.non_tensor_batch: + non_tensor_batch_keys_to_pop.append("raw_prompt") + if "tools_kwargs" in test_batch.non_tensor_batch: + non_tensor_batch_keys_to_pop.append("tools_kwargs") + test_gen_batch = test_batch.pop( + batch_keys=batch_keys_to_pop, + non_tensor_batch_keys=non_tensor_batch_keys_to_pop, + ) + + test_gen_batch.meta_info = { + "eos_token_id": self.tokenizer.eos_token_id, + "pad_token_id": self.tokenizer.pad_token_id, + "recompute_log_prob": False, + "do_sample": self.config.actor_rollout_ref.rollout.val_kwargs.do_sample, + "validate": True, + } + print(f"test_gen_batch meta info: {test_gen_batch.meta_info}") + + # pad to be divisible by dp_size + test_gen_batch_padded, pad_size = pad_dataproto_to_divisor(test_gen_batch, self.actor_rollout_wg.world_size) + test_output_gen_batch_padded = self.actor_rollout_wg.generate_sequences(test_gen_batch_padded) + + # unpad + test_output_gen_batch = unpad_dataproto(test_output_gen_batch_padded, pad_size=pad_size) + print("validation generation end") + + # Store generated outputs + output_ids = test_output_gen_batch.batch["responses"] + output_texts = [self.tokenizer.decode(ids, skip_special_tokens=True) for ids in output_ids] + sample_outputs.extend(output_texts) + + test_batch = test_batch.union(test_output_gen_batch) + + # Add GRM support similar to training phase + if self.use_grm: + # pad to be divisible by grm worker group size before GRM processing + test_batch_grm_padded, grm_pad_size = pad_dataproto_to_divisor(test_batch, self.grm_wg.world_size) + grm_output_padded = self.grm_wg.generate_sequences_as_grm(test_batch_grm_padded) + + # unpad GRM output + grm_output = unpad_dataproto(grm_output_padded, pad_size=grm_pad_size) + + # Add "_grm" suffix to all keys from output and union into test_batch + grm_reward_tensor = {} + if hasattr(grm_output, "batch") and grm_output.batch is not None: + for key, value in grm_output.batch.items(): + grm_reward_tensor[key + "_grm"] = value + + # Create DataProto object for union + if grm_reward_tensor: + grm_reward_data_proto = DataProto.from_dict(grm_reward_tensor) + test_batch = test_batch.union(grm_reward_data_proto) + else: + print("Validation: No grm_reward_tensor from GRM output") + + # evaluate using reward_function + result = self.val_reward_fn(test_batch, return_dict=True) + reward_tensor = result["reward_tensor"] + scores = reward_tensor.sum(-1).cpu().tolist() + sample_scores.extend(scores) + + reward_extra_infos_dict["reward"].extend(scores) + if reward_extra_infos_dict: + test_batch.non_tensor_batch.update({k: np.array(v) for k, v in reward_extra_infos_dict.items()}) + if "reward_extra_info" in result: + for key, lst in result["reward_extra_info"].items(): + reward_extra_infos_dict[key].extend(lst) + + # Add reward scores to non_tensor_batch for logging + current_scores = reward_tensor.sum(-1).cpu().tolist() + test_batch.non_tensor_batch["reward"] = np.array(current_scores) + if "reward_extra_info" in result: + for key, lst in result["reward_extra_info"].items(): + if key not in test_batch.non_tensor_batch: + test_batch.non_tensor_batch[key] = np.array(lst) + + # Ensure test_batch has prompts field for logging + if "prompts" not in test_batch.batch: + # Reconstruct prompts from the original input_ids we saved + original_input_ids = test_gen_batch.batch["input_ids"] + test_batch.batch["prompts"] = original_input_ids + + # Collect test_batch for logging + if all_test_batches is None: + all_test_batches = test_batch + else: + all_test_batches = DataProto.concat([all_test_batches, test_batch]) + + data_source_lst.append(test_batch.non_tensor_batch.get("data_source", ["unknown"] * reward_tensor.shape[0])) + + # Log generations from all collected batches + if all_test_batches is not None: + self._maybe_log_val_generations(all_test_batches) + + # dump generations + val_data_dir = self.config.trainer.get("validation_data_dir", None) + if val_data_dir: + self._dump_generations( + inputs=sample_inputs, + outputs=sample_outputs, + scores=sample_scores, + reward_extra_infos_dict=reward_extra_infos_dict, + dump_path=val_data_dir, + ) + + for key_info, lst in reward_extra_infos_dict.items(): + assert len(lst) == 0 or len(lst) == len(sample_scores), f"{key_info}: {len(lst)=}, {len(sample_scores)=}" + + data_sources = np.concatenate(data_source_lst, axis=0) + + from verl.trainer.ppo.metric_utils import process_validation_metrics + + data_src2var2metric2val = process_validation_metrics(data_sources, sample_inputs, reward_extra_infos_dict) + metric_dict = {} + for data_source, var2metric2val in data_src2var2metric2val.items(): + core_var = "acc" if "acc" in var2metric2val else "reward" + for var_name, metric2val in var2metric2val.items(): + n_max = max([int(name.split("@")[-1].split("/")[0]) for name in metric2val.keys()]) + for metric_name, metric_val in metric2val.items(): + if (var_name == core_var) and any(metric_name.startswith(pfx) for pfx in ["mean", "maj", "best"]) and (f"@{n_max}" in metric_name): + metric_sec = "val-core" + else: + metric_sec = "val-aux" + pfx = f"{metric_sec}/{data_source}/{var_name}/{metric_name}" + metric_dict[pfx] = metric_val + + # statistics val response len + response_lens = [len(text) for text in sample_outputs] + metric_dict["val/response_len/mean"] = np.mean(response_lens) + metric_dict["val/response_len/max"] = np.max(response_lens) + metric_dict["val/response_len/min"] = np.min(response_lens) + + return metric_dict + def fit(self): """ The training loop of PPO. @@ -137,11 +307,26 @@ def fit(self): # compute scores. Support both model and function-based. # We first compute the scores using reward model. Then, we call reward_fn to combine # the results from reward model and rule-based results. - if self.use_rm: + if self.use_rm and not self.use_grm: # we first compute reward model score reward_tensor = self.rm_wg.compute_rm_score(new_batch) new_batch = new_batch.union(reward_tensor) + if self.use_grm: + output = self.grm_wg.generate_sequences_as_grm(new_batch) + # Add "_grm" suffix to all keys from output and union into new_batch + reward_tensor = {} + if hasattr(output, "batch") and output.batch is not None: + for key, value in output.batch.items(): + reward_tensor[key + "_grm"] = value + + # Create DataProto object for union + if reward_tensor: + reward_data_proto = DataProto.from_dict(reward_tensor) + new_batch = new_batch.union(reward_data_proto) + else: + print("No reward_tensor from GRM output") + # we combine with rule-based rm reward_extra_infos_dict: dict[str, list] try: @@ -312,3 +497,63 @@ def fit(self): progress_bar.update(1) self.global_steps += 1 + + def _maybe_log_val_generations(self, batch: DataProto): + """Log a table of validation samples to the configured logger (wandb or swanlab)""" + generations_to_log = self.config.trainer.get("log_val_generations", 0) + + if generations_to_log == 0: + return + + prompts, response = batch.batch["prompts"], batch.batch["responses"] + prompts = self.tokenizer.batch_decode(prompts, skip_special_tokens=True) + response = self.tokenizer.batch_decode(response, skip_special_tokens=True) + + if batch.batch.get("prompts_grm", None) is not None: + prompts_grm = batch.batch["prompts_grm"] + prompts_grm = self.tokenizer.batch_decode(prompts_grm, skip_special_tokens=True) + if batch.batch.get("responses_grm", None) is not None: + response_grm = batch.batch["responses_grm"] + response_grm = self.tokenizer.batch_decode(response_grm, skip_special_tokens=True) + + res_ids = list(range(len(prompts))) + sample_ids = random.sample(res_ids, min(generations_to_log, len(res_ids))) + + sample_inputs = [] + sample_outputs = [] + sample_scores = [] + sample_inputs_grm = [] + sample_outputs_grm = [] + for idx in sample_ids: + sample_inputs.append(prompts[idx]) + sample_outputs.append(response[idx]) + # Use reward score from non_tensor_batch, fallback to acc if available + if "reward" in batch.non_tensor_batch: + sample_scores.append(f"{batch.non_tensor_batch['reward'][idx]:.2f}") + elif "acc" in batch.non_tensor_batch: + sample_scores.append(f"{batch.non_tensor_batch['acc'][idx]:.2f}") + else: + sample_scores.append("N/A") + + if batch.batch.get("prompts_grm", None) is not None: + sample_inputs_grm.append(prompts_grm[idx]) + sample_outputs_grm.append(response_grm[idx]) + + # Create samples as dict[dict] format + samples = {} + for i, idx in enumerate(sample_ids): + sample_key = f"sample_{i + 1}" + score_val = "N/A" + if "reward" in batch.non_tensor_batch: + score_val = f"{batch.non_tensor_batch['reward'][idx]:.2f}" + elif "acc" in batch.non_tensor_batch: + score_val = f"{batch.non_tensor_batch['acc'][idx]:.2f}" + + sample_data = {"input": prompts[idx], "output": response[idx], "score": score_val} + if batch.batch.get("prompts_grm", None) is not None: + sample_data["input_grm"] = prompts_grm[idx] + sample_data["output_grm"] = response_grm[idx] + samples[sample_key] = sample_data + + # Log to each configured logger + self.validation_generations_logger.log(self.config.trainer.logger, samples, self.global_steps) diff --git a/recipe/dapo/main_dapo.py b/recipe/dapo/main_dapo.py index d2d7b6190f9..04f6b5e7b55 100644 --- a/recipe/dapo/main_dapo.py +++ b/recipe/dapo/main_dapo.py @@ -15,7 +15,6 @@ Note that we don't combine the main with ray_trainer as ray_trainer is used by other main. """ - import hydra import ray @@ -89,8 +88,10 @@ def run(self, config): } global_pool_id = "global_pool" + grm_pool_id = "grm_pool" resource_pool_spec = { global_pool_id: [config.trainer.n_gpus_per_node] * config.trainer.nnodes, + grm_pool_id: [config.trainer.n_gpus_per_node] * config.trainer.get("grm_nnodes", 0), } mapping = { Role.ActorRollout: global_pool_id, @@ -113,6 +114,10 @@ def run(self, config): role_worker_mapping[Role.RewardModel] = ray.remote(RewardModelWorker) mapping[Role.RewardModel] = global_pool_id + if config.reward_model.grm.enable: + role_worker_mapping[Role.GenerativeRewardModel] = ray.remote(ActorRolloutRefWorker) + mapping[Role.GenerativeRewardModel] = grm_pool_id + # reference model if config.algorithm.use_kl_in_reward or config.actor_rollout_ref.actor.use_kl_loss: role_worker_mapping[Role.RefPolicy] = ray.remote(ActorRolloutRefWorker) diff --git a/recipe/dapo/run_dapo_qwen2.5_llm_judge.sh b/recipe/dapo/run_dapo_qwen2.5_llm_judge.sh new file mode 100644 index 00000000000..3fb25fea7e5 --- /dev/null +++ b/recipe/dapo/run_dapo_qwen2.5_llm_judge.sh @@ -0,0 +1,165 @@ +#!/usr/bin/env bash +set -xeuo pipefail + +project_name='DAPO-GRM' +date_str=$(date +%Y%m%d%H%M%S) +model_name="Qwen2.5-7B-Instruct" +exp_name="DAPO-GRM-${model_name}-${date_str}" + +adv_estimator=grpo + +use_kl_in_reward=False +kl_coef=0.0 +use_kl_loss=False +kl_loss_coef=0.0 + +clip_ratio_low=0.2 +clip_ratio_high=0.28 + +max_prompt_length=$((1024 * 2)) +max_response_length=$((1024 * 20)) +enable_overlong_buffer=True +overlong_buffer_len=$((1024 * 4)) +overlong_penalty_factor=1.0 + +loss_agg_mode="token-mean" + +enable_filter_groups=True +filter_groups_metric=acc +max_num_gen_batches=10 +train_prompt_bsz=32 +gen_prompt_bsz=$((train_prompt_bsz * 2)) +n_resp_per_prompt=8 +train_prompt_mini_bsz=8 + +# Ray +PWD=. +RAY_ADDRESS=${RAY_ADDRESS:-""} +WORKING_DIR=${WORKING_DIR:-"${PWD}"} +RUNTIME_ENV=${RUNTIME_ENV:-"${WORKING_DIR}/verl/trainer/runtime_env.yaml"} +MAIN_NNODES=${MAIN_NNODES:-8} +# Paths +RAY_DATA_HOME=${RAY_DATA_HOME:-"${HOME}/verl"} +MODEL_PATH=${MODEL_PATH:-"models/${model_name}"} +CKPTS_DIR=${CKPTS_DIR:-"ckpts/${project_name}/${exp_name}"} + +TRAIN_FILE=${TRAIN_FILE:-"data/dapo-math-17k.parquet"} +TEST_FILE=${TEST_FILE:-"data/aime-2024.parquet"} + +# GRM +enable_grm=True +grm_name="Qwen2.5-7B-Instruct" +GRM_PATH=${GRM_PATH:-"models/${grm_name}"} +GRM_template_file=${GRM_template_file:-"${WORKING_DIR}/recipe/dapo/config/GRM_template.txt"} +GRM_NNODES=${GRM_NNODES:-4} +GRM_max_response_length=$((1024 * 2)) +GRM_sp_size=4 +GRM_tp=4 + +# Sample Write +enable_sample_write=False +sample_write_dir=${sample_write_dir:-"${CKPTS_DIR}/sample"} +sample_write_max_rows=1000 + +# Algorithm +temperature=1.0 +top_p=1.0 +top_k=-1 # 0 for HF rollout, -1 for vLLM rollout + +# Performance Related Parameter +sp_size=4 +use_dynamic_bsz=True +actor_ppo_max_token_len=$((max_prompt_length + max_response_length)) +infer_ppo_max_token_len=$((max_prompt_length + max_response_length)) +offload=True +gen_tp=4 + +ray job submit --no-wait --runtime-env="${RUNTIME_ENV}" \ + --working-dir "${WORKING_DIR}" \ + --address "${RAY_ADDRESS}" \ + -- python3 -m recipe.dapo.main_dapo \ + data.train_files="${TRAIN_FILE}" \ + data.val_files="${TEST_FILE}" \ + data.prompt_key=prompt \ + data.truncation='left' \ + data.max_prompt_length=${max_prompt_length} \ + data.max_response_length=${max_response_length} \ + data.gen_batch_size=${gen_prompt_bsz} \ + data.train_batch_size=${train_prompt_bsz} \ + actor_rollout_ref.rollout.n=${n_resp_per_prompt} \ + algorithm.adv_estimator=${adv_estimator} \ + algorithm.use_kl_in_reward=${use_kl_in_reward} \ + algorithm.kl_ctrl.kl_coef=${kl_coef} \ + actor_rollout_ref.actor.use_kl_loss=${use_kl_loss} \ + actor_rollout_ref.actor.kl_loss_coef=${kl_loss_coef} \ + actor_rollout_ref.actor.clip_ratio_low=${clip_ratio_low} \ + actor_rollout_ref.actor.clip_ratio_high=${clip_ratio_high} \ + actor_rollout_ref.actor.clip_ratio_c=10.0 \ + algorithm.filter_groups.enable=${enable_filter_groups} \ + algorithm.filter_groups.max_num_gen_batches=${max_num_gen_batches} \ + algorithm.filter_groups.metric=${filter_groups_metric} \ + actor_rollout_ref.model.use_remove_padding=True \ + actor_rollout_ref.actor.use_dynamic_bsz=${use_dynamic_bsz} \ + actor_rollout_ref.ref.log_prob_use_dynamic_bsz=${use_dynamic_bsz} \ + actor_rollout_ref.rollout.log_prob_use_dynamic_bsz=${use_dynamic_bsz} \ + actor_rollout_ref.actor.ppo_max_token_len_per_gpu=${actor_ppo_max_token_len} \ + actor_rollout_ref.ref.log_prob_max_token_len_per_gpu=${infer_ppo_max_token_len} \ + actor_rollout_ref.rollout.log_prob_max_token_len_per_gpu=${infer_ppo_max_token_len} \ + actor_rollout_ref.model.path="${MODEL_PATH}" \ + +actor_rollout_ref.model.override_config.attention_dropout=0. \ + +actor_rollout_ref.model.override_config.embd_pdrop=0. \ + +actor_rollout_ref.model.override_config.resid_pdrop=0. \ + actor_rollout_ref.model.enable_gradient_checkpointing=True \ + actor_rollout_ref.actor.optim.lr=1e-6 \ + actor_rollout_ref.actor.optim.lr_warmup_steps=10 \ + actor_rollout_ref.actor.optim.weight_decay=0.1 \ + actor_rollout_ref.actor.ppo_mini_batch_size=${train_prompt_mini_bsz} \ + actor_rollout_ref.actor.fsdp_config.param_offload=${offload} \ + actor_rollout_ref.actor.fsdp_config.optimizer_offload=${offload} \ + actor_rollout_ref.actor.entropy_coeff=0 \ + actor_rollout_ref.actor.grad_clip=1.0 \ + actor_rollout_ref.actor.loss_agg_mode=${loss_agg_mode} \ + actor_rollout_ref.actor.ulysses_sequence_parallel_size=${sp_size} \ + actor_rollout_ref.rollout.gpu_memory_utilization=0.80 \ + actor_rollout_ref.rollout.tensor_model_parallel_size=${gen_tp} \ + actor_rollout_ref.rollout.enable_chunked_prefill=True \ + actor_rollout_ref.rollout.max_num_batched_tokens=$((max_prompt_length + max_response_length)) \ + actor_rollout_ref.rollout.temperature=${temperature} \ + actor_rollout_ref.rollout.top_p=${top_p} \ + actor_rollout_ref.rollout.top_k="${top_k}" \ + actor_rollout_ref.rollout.val_kwargs.temperature=${temperature} \ + actor_rollout_ref.rollout.val_kwargs.top_p=${top_p} \ + actor_rollout_ref.rollout.val_kwargs.top_k=${top_k} \ + actor_rollout_ref.rollout.val_kwargs.do_sample=True \ + actor_rollout_ref.rollout.val_kwargs.n=1 \ + actor_rollout_ref.ref.fsdp_config.param_offload=${offload} \ + actor_rollout_ref.ref.ulysses_sequence_parallel_size=${sp_size} \ + actor_rollout_ref.actor.fsdp_config.fsdp_size=-1 \ + reward_model.reward_manager=dapo \ + reward_model.overlong_buffer.enable=${enable_overlong_buffer} \ + reward_model.overlong_buffer.len=${overlong_buffer_len} \ + reward_model.overlong_buffer.penalty_factor=${overlong_penalty_factor} \ + +reward_model.grm.enable=${enable_grm} \ + +reward_model.grm.model.path="${GRM_PATH}" \ + +reward_model.grm.rollout.prompt_length=$((actor_ppo_max_token_len - GRM_max_response_length)) \ + +reward_model.grm.rollout.response_length=${GRM_max_response_length} \ + +reward_model.grm.rollout.max_num_batched_tokens=${actor_ppo_max_token_len} \ + +reward_model.grm.rollout.n=1 \ + +reward_model.grm.rollout.ulysses_sequence_parallel_size=${GRM_sp_size} \ + +reward_model.grm.rollout.tensor_model_parallel_size=${GRM_tp} \ + +reward_model.grm.rollout.fsdp_config.fsdp_size=-1 \ + +reward_model.grm.rollout.template_file="${GRM_template_file}" \ + trainer.logger=['console','wandb'] \ + trainer.project_name="${project_name}" \ + trainer.experiment_name="${exp_name}" \ + trainer.n_gpus_per_node=8 \ + trainer.nnodes="${MAIN_NNODES}" \ + +trainer.grm_nnodes="${GRM_NNODES}" \ + trainer.val_before_train=False \ + trainer.test_freq=5 \ + trainer.save_freq=50 \ + trainer.total_epochs=1 \ + trainer.log_val_generations=10 \ + +trainer.log_train_generations=10 \ + trainer.default_local_dir="${CKPTS_DIR}" \ + trainer.resume_mode=auto \ No newline at end of file diff --git a/verl/trainer/ppo/ray_trainer.py b/verl/trainer/ppo/ray_trainer.py index f805798b806..9a6829eeb89 100644 --- a/verl/trainer/ppo/ray_trainer.py +++ b/verl/trainer/ppo/ray_trainer.py @@ -20,6 +20,7 @@ import json import os +import random import uuid from collections import defaultdict from copy import deepcopy @@ -74,6 +75,7 @@ class Role(Enum): RefPolicy = 4 RewardModel = 5 ActorRolloutRef = 6 + GenerativeRewardModel = 7 @dataclass @@ -257,14 +259,15 @@ def compute_advantage(data: DataProto, adv_estimator, gamma=1.0, lam=1.0, num_re else: # handle all other adv estimator type other than GAE and GRPO adv_estimator_fn = core_algos.get_adv_estimator_fn(adv_estimator) - adv_kwargs = {"token_level_rewards": data.batch["token_level_rewards"], - "response_mask": data.batch["response_mask"], - "config": config, + adv_kwargs = { + "token_level_rewards": data.batch["token_level_rewards"], + "response_mask": data.batch["response_mask"], + "config": config, } - if "uid" in data.non_tensor_batch: # optional - adv_kwargs['index'] = data.non_tensor_batch["uid"] - if "reward_baselines" in data.batch:# optional - adv_kwargs['reward_baselines'] = data.batch["reward_baselines"] + if "uid" in data.non_tensor_batch: # optional + adv_kwargs["index"] = data.non_tensor_batch["uid"] + if "reward_baselines" in data.batch: # optional + adv_kwargs["reward_baselines"] = data.batch["reward_baselines"] # calculate advantage estimator advantages, returns = adv_estimator_fn(**adv_kwargs) @@ -314,6 +317,7 @@ def __init__( self.resource_pool_manager = resource_pool_manager self.use_reference_policy = Role.RefPolicy in role_worker_mapping self.use_rm = Role.RewardModel in role_worker_mapping + self.use_grm = Role.GenerativeRewardModel in role_worker_mapping self.ray_worker_group_cls = ray_worker_group_cls self.device_name = device_name self.validation_generations_logger = ValidationGenerationsLogger() @@ -553,29 +557,96 @@ def _dump_generations(self, inputs, outputs, scores, reward_extra_infos_dict, du print(f"Dumped generations to {filename}") - def _maybe_log_val_generations(self, inputs, outputs, scores): + def _maybe_log_val_generations(self, batch: DataProto): """Log a table of validation samples to the configured logger (wandb or swanlab)""" - generations_to_log = self.config.trainer.log_val_generations if generations_to_log == 0: return + prompts, response = batch.batch["prompts"], batch.batch["responses"] + prompts = self.tokenizer.batch_decode(prompts, skip_special_tokens=True) + response = self.tokenizer.batch_decode(response, skip_special_tokens=True) - import numpy as np + if batch.batch.get("prompts_grm", None) is not None: + prompts_grm = batch.batch["prompts_grm"] + prompts_grm = self.tokenizer.batch_decode(prompts_grm, skip_special_tokens=True) + if batch.batch.get("responses_grm", None) is not None: + response_grm = batch.batch["responses_grm"] + response_grm = self.tokenizer.batch_decode(response_grm, skip_special_tokens=True) - # Create tuples of (input, output, score) and sort by input text - samples = list(zip(inputs, outputs, scores)) - samples.sort(key=lambda x: x[0]) # Sort by input text + res_ids = list(range(len(prompts))) + sample_ids = random.sample(res_ids, generations_to_log) - # Use fixed random seed for deterministic shuffling - rng = np.random.RandomState(42) - rng.shuffle(samples) + sample_inputs = [] + sample_outputs = [] + sample_scores = [] + sample_inputs_grm = [] + sample_outputs_grm = [] + for idx in sample_ids: + sample_inputs.append(prompts[idx]) + sample_outputs.append(response[idx]) + sample_scores.append(f"{batch.non_tensor_batch['acc'][idx]:.2f}") + if batch.batch.get("prompts_grm", None) is not None: + sample_inputs_grm.append(prompts_grm[idx]) + sample_outputs_grm.append(response_grm[idx]) + + # Create samples as dict[dict] format + samples = {} + for i, idx in enumerate(sample_ids): + sample_key = f"sample_{i + 1}" + sample_data = {"input": prompts[idx], "output": response[idx], "score": f"{batch.non_tensor_batch['acc'][idx]:.2f}"} + if batch.batch.get("prompts_grm", None) is not None: + sample_data["input_grm"] = prompts_grm[idx] + sample_data["output_grm"] = response_grm[idx] + samples[sample_key] = sample_data + # Log to each configured logger + self.validation_generations_logger.log(self.config.trainer.logger, samples, self.global_steps) - # Take first N samples after shuffling - samples = samples[:generations_to_log] + def _maybe_log_train_generations(self, batch: DataProto): + """Log a table of training samples to the configured logger (wandb or swanlab)""" + generations_to_log = self.config.trainer.log_train_generations + + if generations_to_log == 0: + return + prompts, response = batch.batch["prompts"], batch.batch["responses"] + prompts = self.tokenizer.batch_decode(prompts, skip_special_tokens=True) + response = self.tokenizer.batch_decode(response, skip_special_tokens=True) + + if batch.batch.get("prompts_grm", None) is not None: + prompts_grm = batch.batch["prompts_grm"] + prompts_grm = self.tokenizer.batch_decode(prompts_grm, skip_special_tokens=True) + if batch.batch.get("responses_grm", None) is not None: + response_grm = batch.batch["responses_grm"] + response_grm = self.tokenizer.batch_decode(response_grm, skip_special_tokens=True) + + res_ids = list(range(len(prompts))) + sample_ids = random.sample(res_ids, generations_to_log) + + sample_inputs = [] + sample_outputs = [] + sample_scores = [] + sample_inputs_grm = [] + sample_outputs_grm = [] + for idx in sample_ids: + sample_inputs.append(prompts[idx]) + sample_outputs.append(response[idx]) + sample_scores.append(f"{batch.non_tensor_batch['acc'][idx]:.2f}") + if batch.batch.get("prompts_grm", None) is not None: + sample_inputs_grm.append(prompts_grm[idx]) + sample_outputs_grm.append(response_grm[idx]) + + # Create samples as dict[dict] format + samples = {} + for i, idx in enumerate(sample_ids): + sample_key = f"sample_{i + 1}" + sample_data = {"input": prompts[idx], "output": response[idx], "score": f"{batch.non_tensor_batch['acc'][idx]:.2f}"} + if batch.batch.get("prompts_grm", None) is not None: + sample_data["input_grm"] = prompts_grm[idx] + sample_data["output_grm"] = response_grm[idx] + samples[sample_key] = sample_data # Log to each configured logger - self.validation_generations_logger.log(self.config.trainer.logger, samples, self.global_steps) + self.training_generations_logger.log(self.config.trainer.logger, samples, self.global_steps) def _validate(self): data_source_lst = [] @@ -733,6 +804,37 @@ def init_workers(self): rm_cls = RayClassWithInitArgs(self.role_worker_mapping[Role.RewardModel], config=self.config.reward_model) self.resource_pool_to_cls[resource_pool]["rm"] = rm_cls + if self.use_grm: + + def copy_missing_params_to_grm(config): + """只复制grm中缺失的参数""" + with open_dict(config): + # 确保grm配置存在 + if "grm" not in config.reward_model: + config.reward_model.grm = {} + + rollout_config = config.actor_rollout_ref.rollout + grm_config = config.reward_model.grm.rollout + + # 遍历actor配置,只复制grm中没有的字段 + for key, value in rollout_config.items(): + if key not in grm_config: + grm_config[key] = value + print(f"Copied {key} to grm: {value}") + else: + print(f"Skipped {key} (already exists in grm): {grm_config[key]}") + config.reward_model.grm.rollout = grm_config + + return config + + # 使用 + self.config = copy_missing_params_to_grm(self.config) + print(f"GRM config: {self.config.reward_model.grm}") + print(f"Actor rollout ref config: {self.config.actor_rollout_ref}") + resource_pool = self.resource_pool_manager.get_resource_pool(Role.GenerativeRewardModel) + grm_cls = RayClassWithInitArgs(self.role_worker_mapping[Role.GenerativeRewardModel], config=self.config.reward_model.grm, role="grm") + self.resource_pool_to_cls[resource_pool]["grm"] = grm_cls + # initialize WorkerGroup # NOTE: if you want to use a different resource pool for each role, which can support different parallel size, # you should not use `create_colocated_worker_cls`. @@ -743,6 +845,8 @@ def init_workers(self): if OmegaConf.select(self.config.trainer, "ray_wait_register_center_timeout") is not None: wg_kwargs["ray_wait_register_center_timeout"] = self.config.trainer.ray_wait_register_center_timeout + print(f"resource_pool_to_cls: {self.resource_pool_to_cls}") + for resource_pool, class_dict in self.resource_pool_to_cls.items(): worker_dict_cls = create_colocated_worker_cls(class_dict=class_dict) wg_dict = self.ray_worker_group_cls(resource_pool=resource_pool, ray_cls_with_init=worker_dict_cls, device_name=self.device_name, **wg_kwargs) @@ -761,6 +865,10 @@ def init_workers(self): self.rm_wg = all_wg["rm"] self.rm_wg.init_model() + if self.use_grm: + self.grm_wg = all_wg["grm"] + self.grm_wg.init_model() + # we should create rollout at the end so that vllm can have a better estimation of kv cache memory self.actor_rollout_wg = all_wg["actor_rollout"] self.actor_rollout_wg.init_model() @@ -1071,7 +1179,7 @@ def fit(self): num_repeat=self.config.actor_rollout_ref.rollout.n, norm_adv_by_std_in_grpo=norm_adv_by_std_in_grpo, multi_turn=self.config.actor_rollout_ref.rollout.multi_turn.enable, - config=self.config.algorithm + config=self.config.algorithm, ) # update critic diff --git a/verl/trainer/runtime_env.yaml b/verl/trainer/runtime_env.yaml index 5aa693cd71a..0630b33c862 100644 --- a/verl/trainer/runtime_env.yaml +++ b/verl/trainer/runtime_env.yaml @@ -3,4 +3,6 @@ excludes: ["/.git/"] env_vars: TORCH_NCCL_AVOID_RECORD_STREAMS: "1" # If you are using vllm<=0.6.3, you might need to set the following environment variable to avoid bugs: - # VLLM_ATTENTION_BACKEND: "XFORMERS" \ No newline at end of file + # VLLM_ATTENTION_BACKEND: "XFORMERS" +pip: + - "peft" \ No newline at end of file diff --git a/verl/utils/tracking.py b/verl/utils/tracking.py index a2d658bea1a..116086b013e 100644 --- a/verl/utils/tracking.py +++ b/verl/utils/tracking.py @@ -16,11 +16,12 @@ """ import dataclasses +import os from enum import Enum from functools import partial from pathlib import Path from typing import Any, Dict, List, Union -import os + class Tracking: """A unified tracking interface for logging experiment data to multiple backends. @@ -257,6 +258,35 @@ def _flatten_dict(raw: Dict[str, Any], *, sep: str) -> Dict[str, Any]: return ans +def _generate_table_columns(samples): + """Generate table columns based on samples format. + + Args: + samples: dict[dict] format where each key is sample_id and value is a dict of fields + + Returns: + list: Column names for the table + """ + if not samples: + return ["step"] + + # Get all unique field names from all samples + all_fields = set() + for sample_data in samples.values(): + all_fields.update(sample_data.keys()) + + # Sort fields for consistent ordering + sorted_fields = sorted(all_fields) + + # Create columns: step + sample_1_field1, sample_1_field2, ..., sample_2_field1, sample_2_field2, ... + columns = ["step"] + for sample_key in sorted(samples.keys(), key=lambda x: int(x.split("_")[1])): # Sort by sample number + for field in sorted_fields: + columns.append(f"{sample_key}_{field}") + + return columns + + @dataclasses.dataclass class ValidationGenerationsLogger: def log(self, loggers, samples, step): @@ -271,13 +301,13 @@ def log(self, loggers, samples, step): self.log_generations_to_clearml(samples, step) if "tensorboard" in loggers: self.log_generations_to_tensorboard(samples, step) - + def log_generations_to_wandb(self, samples, step): """Log samples to wandb as a table""" import wandb # Create column names for all samples - columns = ["step"] + sum([[f"input_{i + 1}", f"output_{i + 1}", f"score_{i + 1}"] for i in range(len(samples))], []) + columns = _generate_table_columns(samples) if not hasattr(self, "validation_table"): # Initialize the table on first call @@ -290,8 +320,11 @@ def log_generations_to_wandb(self, samples, step): # Add new row with all data row_data = [] row_data.append(step) - for sample in samples: - row_data.extend(sample) + for sample_key in sorted(samples.keys(), key=lambda x: int(x.split("_")[1])): # Sort by sample number + sample_data = samples[sample_key] + sorted_fields = sorted(sample_data.keys()) + for field in sorted_fields: + row_data.append(sample_data[field]) new_table.add_data(*row_data) @@ -304,17 +337,17 @@ def log_generations_to_swanlab(self, samples, step): import swanlab swanlab_text_list = [] - for i, sample in enumerate(samples): + for i, sample in enumerate(samples.values()): row_text = f""" - input: {sample[0]} + input: {sample["input"]} --- - output: {sample[1]} + output: {sample["output"]} --- - score: {sample[2]} + score: {sample["score"]} """ swanlab_text_list.append(swanlab.Text(row_text, caption=f"sample {i + 1}")) @@ -334,8 +367,8 @@ def log_generations_to_mlflow(self, samples, step): with tempfile.TemporaryDirectory() as tmp_dir: validation_gen_step_file = Path(tmp_dir, f"val_step{step}.json") row_data = [] - for sample in samples: - data = {"input": sample[0], "output": sample[1], "score": sample[2]} + for sample in samples.values(): + data = {"input": sample["input"], "output": sample["output"], "score": sample["score"]} row_data.append(data) with open(validation_gen_step_file, "w") as file: json.dump(row_data, file) @@ -370,36 +403,37 @@ def log_generations_to_clearml(self, samples, step): table_plot=pd.DataFrame.from_records(table), iteration=step, ) - + def log_generations_to_tensorboard(self, samples, step): """Log samples to tensorboard as text""" # Initialize tensorboard writer if not exists if not hasattr(self, "writer"): from torch.utils.tensorboard import SummaryWriter + tensorboard_dir = os.environ.get("TENSORBOARD_DIR", "tensorboard_log") os.makedirs(tensorboard_dir, exist_ok=True) self.writer = SummaryWriter(log_dir=tensorboard_dir) - + # Format the samples data into readable text text_content = f"**Generation Results - Step {step}**\n\n" - + for i, sample in enumerate(samples): text_content += f"### Sample {i + 1}\n" - + # Assuming sample contains [input, output, score] if len(sample) >= 3: input_text, output_text, score = sample[0], sample[1], sample[2] - + text_content += f"**Input:** {input_text}\n\n" text_content += f"**Output:** {output_text}\n\n" text_content += f"**Score:** {score}\n\n" else: # Handle cases where sample format might be different text_content += f"**Data:** {sample}\n\n" - + text_content += "---\n\n" - + # Log to tensorboard as text - self.writer.add_text('val/generations', text_content, step) + self.writer.add_text("val/generations", text_content, step) # Flush to ensure data is written - self.writer.flush() \ No newline at end of file + self.writer.flush() diff --git a/verl/workers/fsdp_workers.py b/verl/workers/fsdp_workers.py index 797124f9d5c..de0227b831c 100644 --- a/verl/workers/fsdp_workers.py +++ b/verl/workers/fsdp_workers.py @@ -111,11 +111,25 @@ def __init__(self, config: DictConfig, role: str): # build device mesh for FSDP world_size = torch.distributed.get_world_size() # TODO(sgm): support FSDP hybrid shard for larger model - self.device_mesh = create_device_mesh(world_size=world_size, fsdp_size=self.config.actor.fsdp_config.fsdp_size) - # build device mesh for Ulysses Sequence Parallel + self.role = role + assert self.role in ["actor", "rollout", "ref", "actor_rollout", "actor_rollout_ref", "grm"] + + self._is_actor = self.role in ["actor", "actor_rollout", "actor_rollout_ref"] + self._is_rollout = self.role in ["rollout", "actor_rollout", "actor_rollout_ref", "grm"] + self._is_ref = self.role in ["ref", "actor_rollout_ref"] + + # build device mesh for Tensor Model Parallel and Ulysses Sequence Parallel + if self._is_actor or self._is_ref: + self.device_mesh = create_device_mesh(world_size=world_size, fsdp_size=self.config.actor.fsdp_config.fsdp_size) + self.ulysses_sequence_parallel_size = self.config.actor.get("ulysses_sequence_parallel_size", 1) + elif self._is_rollout: + self.device_mesh = create_device_mesh(world_size=world_size, fsdp_size=self.config.rollout.fsdp_config.fsdp_size) + self.ulysses_sequence_parallel_size = self.config.rollout.get("ulysses_sequence_parallel_size", 1) + else: + raise ValueError(f"Invalid role: {self.role}") + self.ulysses_device_mesh = None - self.ulysses_sequence_parallel_size = self.config.actor.get("ulysses_sequence_parallel_size", 1) dp = world_size // self.ulysses_sequence_parallel_size if self.ulysses_sequence_parallel_size > 1: self.ulysses_device_mesh = init_device_mesh(device_name, mesh_shape=(dp, self.ulysses_sequence_parallel_size), mesh_dim_names=["dp", "sp"]) @@ -124,13 +138,6 @@ def __init__(self, config: DictConfig, role: str): self._lora_rank = self.config.model.get("lora_rank", 0) self._is_lora = self._lora_rank > 0 - self.role = role - assert self.role in ["actor", "rollout", "ref", "actor_rollout", "actor_rollout_ref"] - - self._is_actor = self.role in ["actor", "actor_rollout", "actor_rollout_ref"] - self._is_rollout = self.role in ["rollout", "actor_rollout", "actor_rollout_ref"] - self._is_ref = self.role in ["ref", "actor_rollout_ref"] - self._is_offload_param = False self._is_offload_optimizer = False if self._is_actor: @@ -184,7 +191,7 @@ def _build_model_optimizer( from verl.utils.model import get_generation_config, print_model_size, update_model_config from verl.utils.torch_dtypes import PrecisionType - assert role in ["actor", "ref"] + assert role in ["actor", "ref", "grm"], f"role {role} is not supported" log_gpu_memory_usage(f"Before init {role} from HF AutoModel", logger=logger) local_path = model_path @@ -296,7 +303,7 @@ def _build_model_optimizer( # We force reference policy to use CPUOffload to save memory. # We force turn off CPUOffload for actor because it causes incorrect results when using grad accumulation cpu_offload = None if role == "actor" else CPUOffload(offload_params=True) - fsdp_strategy = self.config.actor.strategy + fsdp_strategy = self.config.actor.strategy if self.config.get("actor", None) is not None else "fsdp" if fsdp_strategy == "fsdp": actor_module_fsdp = FSDP( actor_module, @@ -638,6 +645,134 @@ def generate_sequences(self, prompts: DataProto): # to make sure meta_info["timing"] is the same timing_generate = reduce_timing(timing_generate) output.meta_info["timing"] = timing_generate + + output = output.to("cpu") + + # clear kv cache + torch.cuda.empty_cache() + return output + + def _switch_to_grm_template(self, data: DataProto): + # prompts: [batch_size, 2048] with left padding + # responses: [batch_size, 20480] with right padding + # target: remove right padding from responses, then concatenate to form proper prompt with only left padding + + prompts = data.batch["prompts"] # [bs, 2048] + responses = data.batch["responses"] # [bs, 20480] + full_attention_mask = data.batch["attention_mask"] # [bs, 2048+20480] + + batch_size = prompts.shape[0] + prompt_len = prompts.shape[1] + response_len = responses.shape[1] + + # Extract attention masks for prompts and responses + prompt_attention_mask = full_attention_mask[:, :prompt_len] # [bs, 2048] + response_attention_mask = full_attention_mask[:, prompt_len:] # [bs, 20480] + + rm_input_ids_list = [] + rm_attention_mask_list = [] + + for i in range(batch_size): + # Get valid response length by finding the last non-zero attention mask position + # Traverse from right to left to handle internal zeros in attention mask + response_mask = response_attention_mask[i] + valid_response_length = 0 + for j in range(response_len - 1, -1, -1): + if response_mask[j] != 0: + valid_response_length = j + 1 + break + + valid_responses = responses[i][:valid_response_length] # Remove right padding + + # Add GRM judgment prompt at the end + ground_truth = data.non_tensor_batch["reward_model"][i]["ground_truth"] + assert self.config.rollout.template_file is not None, "template_file is not set" + with open(self.config.rollout.template_file) as f: + grm_template = f.read() + grm_content = grm_template.format(ground_truth=ground_truth) + + # Format as user message manually to avoid unwanted system messages + grm_prompt = f"<|im_start|>user\n{grm_content}<|im_end|>\n<|im_start|>assistant\n" + grm_tokens = self.tokenizer.encode(grm_prompt, add_special_tokens=False) + grm_tensor = torch.tensor(grm_tokens, dtype=prompts.dtype, device=prompts.device) + + # Concatenate prompt + valid response + grm prompt + concatenated = torch.cat([prompts[i], valid_responses, grm_tensor], dim=0) + concatenated_attention = torch.cat([prompt_attention_mask[i], response_attention_mask[i][:valid_response_length], torch.ones(len(grm_tokens), dtype=response_attention_mask.dtype, device=response_attention_mask.device)], dim=0) + + rm_input_ids_list.append(concatenated) + rm_attention_mask_list.append(concatenated_attention) + + # Pad all sequences to the same length (left padding for input_ids, corresponding padding for attention_mask) + max_len = full_attention_mask.shape[1] # 2048+20480 = 22528 + pad_token_id = self.tokenizer.pad_token_id + + rm_input_ids = [] + rm_attention_mask = [] + + for i in range(batch_size): + seq_len = rm_input_ids_list[i].shape[0] + + if seq_len > max_len: + # If sequence is too long, truncate from the left (keep the end part with GRM) + if self.rank == 0: + print(f"Warning: sequence {i} length {seq_len} > max_len {max_len}, truncating from left") + rm_input_ids_list[i] = rm_input_ids_list[i][-max_len:] + rm_attention_mask_list[i] = rm_attention_mask_list[i][-max_len:] + seq_len = max_len + + pad_len = max_len - seq_len + + # Left padding for input_ids + padded_input_ids = torch.cat([torch.full((pad_len,), pad_token_id, dtype=rm_input_ids_list[i].dtype, device=rm_input_ids_list[i].device), rm_input_ids_list[i]], dim=0) + + # Corresponding padding for attention_mask (0 for padding positions) + padded_attention_mask = torch.cat([torch.zeros(pad_len, dtype=rm_attention_mask_list[i].dtype, device=rm_attention_mask_list[i].device), rm_attention_mask_list[i]], dim=0) + + rm_input_ids.append(padded_input_ids) + rm_attention_mask.append(padded_attention_mask) + + rm_input_ids = torch.stack(rm_input_ids) # [batch_size, max_len] + rm_attention_mask = torch.stack(rm_attention_mask) # [batch_size, max_len] + + # Compute position ids + rm_position_ids = compute_position_id_with_mask(rm_attention_mask) + + rm_inputs = {"input_ids": rm_input_ids, "attention_mask": rm_attention_mask, "position_ids": rm_position_ids} + + return DataProto.from_dict(rm_inputs) + + @register(dispatch_mode=Dispatch.DP_COMPUTE_PROTO) + def generate_sequences_as_grm(self, data: DataProto, **kwargs): + """ + This function is used to generate sequences as a generative reward model. + """ + # Support all hardwares + + data = data.to(torch.cuda.current_device()) + prompts = self._switch_to_grm_template(data) + + assert self._is_rollout + + meta_info = { + "eos_token_id": self.generation_config.eos_token_id if self.generation_config is not None else self.tokenizer.eos_token_id, + "pad_token_id": self.generation_config.pad_token_id if self.generation_config is not None else self.tokenizer.pad_token_id, + } + prompts.meta_info.update(meta_info) + with self.rollout_sharding_manager: + log_gpu_memory_usage("After entering rollout sharding manager", logger=logger) + + prompts = self.rollout_sharding_manager.preprocess_data(prompts) + + if self.config.rollout.name == "sglang_async": + raise ValueError("AsyncSGLangRollout is not supported for GRM currently") + else: + output = self.rollout.generate_sequences(prompts=prompts, **self.config.rollout) + + log_gpu_memory_usage("After rollout generation", logger=logger) + + output = self.rollout_sharding_manager.postprocess_data(output) + output = output.to("cpu") # clear kv cache @@ -850,7 +985,7 @@ def _build_critic_model_optimizer(self, config): torch_dtype = self.config.model.fsdp_config.get("model_dtype", "fp32") torch_dtype = PrecisionType.to_dtype(torch_dtype) - from transformers import AutoConfig, AutoModelForTokenClassification + from transformers import AutoConfig critic_model_config = AutoConfig.from_pretrained(local_path, attn_implementation="flash_attention_2", trust_remote_code=config.model.get("trust_remote_code", False)) critic_model_config.num_labels = 1 diff --git a/verl/workers/reward_manager/dapo.py b/verl/workers/reward_manager/dapo.py index 2787361d950..d2b78bb6055 100644 --- a/verl/workers/reward_manager/dapo.py +++ b/verl/workers/reward_manager/dapo.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import re from collections import defaultdict import torch @@ -41,6 +42,14 @@ def __init__( self.overlong_buffer_cfg = overlong_buffer_cfg self.max_resp_len = max_resp_len + # Initialize rank attribute + import torch.distributed + + if torch.distributed.is_initialized(): + self.rank = torch.distributed.get_rank() + else: + self.rank = 0 + if self.overlong_buffer_cfg is not None: assert self.max_resp_len is not None, f"max_resp_len must be provided if {overlong_buffer_cfg=}, but got None" @@ -61,6 +70,8 @@ def __call__(self, data: DataProto, return_dict: bool = False): for i in range(len(data)): data_item = data[i] # DataProtoItem + if self.rank == 0 and i == 0: + print(f"data_item keys: {data_item.batch.keys()}") prompt_ids = data_item.batch["prompts"] @@ -86,12 +97,34 @@ def __call__(self, data: DataProto, return_dict: bool = False): extra_info = data_item.non_tensor_batch.get("extra_info", None) - result = self.compute_score( - data_source=data_source, - solution_str=response_str, - ground_truth=ground_truth, - extra_info=extra_info, - ) + responses_grm = data_item.batch.get("responses_grm", []) + result = None + if responses_grm is not None and len(responses_grm) > 0: + if not isinstance(responses_grm[0], str): + responses_grm = self.tokenizer.decode(responses_grm, skip_special_tokens=True) + # Judgment: Correct / Incorrect + parsed_res = re.findall(r"Judgment: (Correct|Incorrect)", responses_grm) + if len(parsed_res) > 0: + correct = 1.0 if parsed_res[-1] == "Correct" else -1.0 + else: + correct = -1.0 + + result = { + "score": correct, + "acc": 1.0 if correct > 0 else 0.0, # acc should be 1.0 for correct, 0.0 for incorrect + "pred": responses_grm, + } + else: + if self.rank == 0: + print("No valid responses_grm found, falling back to compute_score") + + if result is None: + result = self.compute_score( + data_source=data_source, + solution_str=response_str, + ground_truth=ground_truth, + extra_info=extra_info, + ) score: float if isinstance(result, dict):