[reward] fix: preserve input non_tensor_batch in AgentLoopManager when reward_loop_worker_handles is None#5195
Conversation
|
/gemini review |
There was a problem hiding this comment.
Code Review
The pull request effectively addresses the KeyError: 'data_source' that occurred when using colocate Reward Manager with asynchronous rollout. By passing the input batch's non_tensor_batch as keyword arguments to _postprocess and conditionally merging these arguments into the output non_tensor_batch when reward_loop_worker_handles is None, the necessary input metadata is preserved. This is a correct and targeted fix for the identified problem.
|
This is indeed a potential bug when using genrm for validation. |
I’d prefer to keep the fix in the agent loop rather than in ray_trainer.py. The missing data_source comes from agent_loop’s generate_sequences: its return value doesn’t carry the input’s non_tensor_batch when use colocate RM, so fixing it in the trainer would mean adding merge logic in several places (validation, training, REMAX) and would be more cumbersome. Fixing it inside the agent loop addresses the cause in one place, and all callers (validation and training) get the correct non_tensor_batch without touching ray_trainer.py. verl/verl/experimental/agent_loop/agent_loop.py Lines 692 to 710 in 2320603 |
| output.extra_fields["reward_extra_info"] = result["reward_extra_info"] | ||
|
|
||
| def _postprocess(self, inputs: list[_InternalAgentLoopOutput]) -> DataProto: | ||
| def _postprocess(self, inputs: list[_InternalAgentLoopOutput], **kwargs) -> DataProto: |
There was a problem hiding this comment.
"kwargs" parameter is a bit odd here, explicit maybe better.
There was a problem hiding this comment.
yes, the name "kwargs" can be fixed.
| non_tensor_batch = { | ||
| "__num_turns__": np.array([input.num_turns for input in inputs], dtype=np.int32), | ||
| } | ||
| if self.reward_loop_worker_handles is None and kwargs: | ||
| non_tensor_batch.update(kwargs) |
There was a problem hiding this comment.
Can we explicitly define what goes into non_tensor_batch? For example, specifying fields like "data_sources" might make things clearer.
There was a problem hiding this comment.
Can we explicitly define what goes into
non_tensor_batch? For example, specifying fields like "data_sources" might make things clearer.
We could mirror _compute_score and pass through the whole input non_tensor_batch (all keys) into the output, instead of whitelisting specific fields like "data_sources". That would keep behavior consistent and avoid maintaining an explicit list of keys. Should we do it that way?
|
The two failing checks (NPU unit tests and pre-commit) don’t appear related to the edits in this PR |
|
Please run |
What does this PR do?
Problem
When the codebase is updated to 2cd9283 (migration to the new asynchronous reward manager), using colocate RM with async rollout (
AgentLoopManager) causes validation to fail with: KeyError: 'data_source'verl/experimental/reward_loop/reward_manager/naive.py, line 42, inrun_single— it accessesdata_item.non_tensor_batch["data_source"]._validate→_compute_reward_colocate(test_output_gen_batch_padded)→reward_loop_manager.compute_rm_score(batch)→RewardLoopWorker.compute_score_batch→compute_score→run_single.reward_loop_worker_handles is None(e.g. colocate RM),AgentLoopManager.generate_sequencesreturns aDataProtowhosenon_tensor_batchis built only from agent outputs (__num_turns__,multi_modal_inputs,raw_prompt). Input metadata such asdata_sourceis never forwarded, so the batch passed to the reward manager is missingdata_sourceand the naive reward manager raisesKeyError: 'data_source'.Solution
non_tensor_batchinto_postprocessas**kwargs.reward_loop_worker_handles is None, merge thesekwargsinto the outputnon_tensor_batchsodata_sourceand other input keys are preserved.data_source, and theKeyErroris fixed.