verl icon indicating copy to clipboard operation
verl copied to clipboard

[recipe] feat: asynchronous reward agent with mini-batch pipeline and one-step off-policy training

Open haolinyan opened this issue 5 months ago • 10 comments

What does this PR do?

This PR introduces the asynchronous reward agent to schedule and mitigate communication bottlenecks in RL training scenarios that rely on remote reward services (e.g., LLM-as-a-Judge, RAG, hybrid rule-based scoring). By leveraging the “mini-batch pipeline + one-step off-policy” strategy, it overlaps communication latency with GPU computation, significantly improving training efficiency.

Checklist Before Starting

  • [x] Search for similar PRs. Paste at least one query link here: #2231, #980,
  • [x] Format the PR title as [{modules}] {type}: {description} (This will be checked by the CI)

Test

To validate this solution, we utilize the GSM8K dataset and introduce randomized artificial delays ranging from 1 to 40 seconds during reward computation for each sample, simulating the latency typically incurred when calling remote reward services.

This delay range is empirically determined based on an analysis of the communication-to-computation latency ratio observed in real-world industrial training processes, thereby ensuring an accurate simulation of practical communication bottlenecks.

The experimental results show that:

  • The proposed solution achieves comparable training accuracy to existing open-source results in the community.
  • By incorporating the mini-batch pipeline and one-step off-policy strategies, we observe a reduction of up to 30.85% in total training time relative to the baseline.
Backend Strategy Model Training Time Accuracy (last/max) Log
Megatron baseline (from community) Qwen2-7B-Instruct - 89.61 / 89.61 Log
Megatron baseline Qwen2-7B-Instruct 17h53m 89.08 / 89.92 Log
FSDP baseline Qwen2-7B-Instruct 18h24m 89.54 / 89.92 Log
Megatron mini-batch pipeline + one-step off-policy Qwen2-7B-Instruct 12h22m (-30.85%) 89.61 / 90.04 Log
FSDP mini-batch pipeline + one-step off-policy Qwen2-7B-Instruct 13h10m (-28.44%) 88.86 / 89.99 Log
FSDP baseline Qwen2.5-3B-Instruct 17h23m 87.87 / 88.10 Log
Megatron baseline Qwen2.5-3B-Instruct 17h07m 88.02 / 88.02 Log
FSDP mini-batch pipeline + one-step off-policy Qwen2.5-3B-Instruct 13h15m (-23.08%) 88.93 / 88.93 Log
Megatron mini-batch pipeline + one-step off-policy Qwen2.5-3B-Instruct 13h10m (-23.08%) 87.19 / 88.40 Log

API and Usage Example

1. Reward Function Configuration:

Users can flexibly integrate a remote reward service (such as LLM-as-a-Judge, RAG-enhanced scoring, hybrid rule-based + model scoring, etc) in two ways:

  1. Stateless function – for one-shot, context-free scoring.
  2. Stateful class – when you need caching, token management, session context, or batch post-processing.

For example, users can implement a reward class that calls the OpenAI-style API to score individual responses, then performs group-wise post-processing of the results.

class RewardAgent:
    """
    This example shows:
    - Initializing the OpenAI-style client once
    - Re-using it in compute_score
    - Optional post_process_scores for smoothing or outlier handling
    """

    def __init__(self):
        # Initialize any remote client
        self.client = OpenAI(
            api_key = "your API key",
            base_url = "custom base url")
        self.system_prompt = ...

    def compute_score(
        self,
        data_source: Any,
        solution_str: str,
        ground_truth: str,
        extra_info: Optional[dict] = None,
    ) -> tuple[float, str, str]:
        """
        Stateful scoring function,

        Parameters:  
            data_source: Data source object  
            solution_str: Solution string to be scored  
            ground_truth: Standard answer  
            extra_info: Extra information dictionary (optional)  

        Returns:  
            A tuple containing three elements:  
            - Score value (float)  
            - Original solution string (str)  
            - Score explanation string (str)  
        """
        prompt = ...
       
        try:
            resp = self.client.chat.completions.create(
                model = "gpt-3.5-turbo",
                messages=[
                    {"role": "system", "content": self.system_prompt},
                    {"role": "user", "content": prompt}
                ]
            )
            score_str = resp.choices[0].message.content.strip()
            score = float(score_str)
        except Exception as e:
            # Fallback: rule-based score or -1.0
            score = -1.0
            explanation = f"LLM judge failed: {e}"
        else:
            explanation = f"LLM judge returned {score}"

        return score, prompt, explanation

    def post_process_scores(self, rewards: list[float]) -> list[float]:
        """
        Post-process an entire group of scores, e.g.:
        - Replace NaN / -1 outliers with the group mean

        Parameters:  
            rewards: A list of scores to be processed  
        
        Returns:  
            A list of processed scores  

        Note:
            This is an optional processing step that will be automatically invoked by RayAsyncRewardAgent
            when a group of scores is ready for processing.
        """
        arr = np.array(rewards, dtype=float)
        mean_score = np.nanmean(arr)
        processed = np.where(np.isnan(arr) | (arr < 0), mean_score, arr)
        return processed.tolist()

Then, specify the function name and file path into the following training configuration:

custom_reward_function.path=${reward_file} \  
custom_reward_function.name=RewardAgent  

2. Training Configuration

When launching a training process, the parameters below should be configured:

python3 -m recipe.async_reward_agent.main_ppo \
    # make sure you set the correct path of the config folder
    --config-path="${HOME}/verl/trainer/config" \
    custom_reward_function.path=${reward_file} \  
    custom_reward_function.name=${reward_function_name} \
    reward_model.reward_manager=batch \
    reward_model.launch_reward_fn_async=True \
    # enable mini-batch pipeline strategy
    +mini_batch_pipeline=True

Design & Code Changes

We designed an asynchronous reward agent that handles concurrent requests and manages their lifecycle. Then we leveraged the one-step off-policy training and mini-batch pipeline starategies to achieve overlapping of communication latency with computation:

  1. One-step off-policy: Unlike the "One Step Off Async Trainer" implemented by Meituan, we reverted to the colocated design. This approach overlaps the computation time of the next-step rollout with the waiting time for current reward requests, thereby improving training efficiency.
  2. Mini-batch pipeline: The existing method necessitates waiting for all rewards in the global batch to be collected before performing model updates, resulting in prolonged GPU idle time. To overcome this inefficiency, we implement a pipelined execution strategy that divides the global batch into mini-batches. This approach enables concurrent processing of asynchronous reward collection and model updates, effectively overlapping communication latency with computation.

For detailed design and code changes, please refer to Doc/文档.

Checklist Before Submitting

[!IMPORTANT] Please check all the following items before requesting a review, otherwise the reviewer might deprioritize this PR for review.

haolinyan avatar Aug 01 '25 02:08 haolinyan

thanks! could u remove the tensorboard artifacts from this PR?

eric-haibin-lin avatar Aug 03 '25 16:08 eric-haibin-lin

thanks! could u remove the tensorboard artifacts from this PR?

Got it, I've removed the tensorboard artifacts in the new commit. Please check if everything looks good now. Let me know if there's anything else needed for this PR.

haolinyan avatar Aug 03 '25 16:08 haolinyan

@eric-haibin-lin hi, the latest commit (dcd8dc3) has passed all CI checks. Could you please review the changes when you have time? I've removed the TensorBoard artifacts as requested, and everything should be ready for your final check.

If you're unavailable, I'd also appreciate it if you could suggest or assign another appropriate reviewer.

Thanks for your time!

haolinyan avatar Aug 05 '25 06:08 haolinyan

@haolinyan Thanks for your great work. Batch rollout mode has some drawbacks:

  • batch mode is inefficient for multi_turn rollout, since we have to wait for all samples completion to do tool calling.
  • batch mode is inefficient for reward calculation. What even worse is that in some cases the reward is from interaction with environment, for example, in SWE-agent, the reward is by submit git patch to docker environment and run unit test cases.
  • batch mode is inefficient for long-tail problem, since we can't abort request_ids in batch mode.

Due to the above reasons, we're going to deprecate it and switch to server rollout mode: Agent Loop.

class AgentLoopBase(ABC):
    @abstractmethod
    async def run(self, sampling_params: dict[str, Any], **kwargs) -> AgentLoopOutput:
        while True:
            ...
        
        # calculate reward score: rule based/reward service/environment interaction, ...
       return AgentLoopOutput(..., score=score)

wuxibin89 avatar Aug 08 '25 14:08 wuxibin89

@wuxibin89 Thank you for your comments!

It is true that the calculation of reward scores in AgentLoop can achieve the overlap of computation and communication. However, the efficiency improvement brought by such overlap is less stable and insufficient compared with the scheme we proposed: in industry, not all tasks require tool calling, which means that AgentLoop may not necessarily overlap communication latency. For example, in short text generation scenarios, the length of text output by the model after SFT is similar, which enables AgentLoop or batch mode to complete rollout in a very short time, making it impossible to overlap communication latency.

In addition, we would like to emphasize that the main dilemma we face in using RL for many industrial tasks currently lies in how to define effective reward signals. Therefore, we adopt the method of remote rewards, guiding LLMs (such as GPT-4) to score responses through prompt design, so as to realize rapid exploration and experience accumulation in the initial stage (we believe we are not alone in this). In this context, we propose this scheme to enhance the training efficiency of verl, with the hope that it can be more efficiently applied to a wide range of tasks.

Finally, to avoid duplicate development, we would like to ask whether there are already relevant development plans to implement reward calculation in AgentLoop or a code prototype. If not, we are willing to adapt our scheme to AgentLoop and commit it.

haolinyan avatar Aug 08 '25 16:08 haolinyan

@haolinyan Good job! But I have an error after running your recipe. Error is "omegaconf.errors.ConfigAttributeError: Key 'ray_init' is not in struct" on recipe/async_reward_agent/main_ppo.py 226 num_cpus=config.ray_kwargs.ray_init.num_cpus. Can you help me?

edc3000 avatar Oct 10 '25 07:10 edc3000

@haolinyan Good job! But I have an error after running your recipe. Error is "omegaconf.errors.ConfigAttributeError: Key 'ray_init' is not in struct" on recipe/async_reward_agent/main_ppo.py 226 num_cpus=config.ray_kwargs.ray_init.num_cpus. Can you help me?

@edc3000 Thanks for using our recipe! The error occurs because ray_init is defined in an older version of verl, such as in this commit: https://github.com/volcengine/verl/blob/3e2bceb1afcaa77ebc40106a64f7b440509b67e1/verl/trainer/config/ppo_megatron_trainer.yaml#L132

We recommend merging our PR based on this commit: https://github.com/volcengine/verl/commit/3e2bceb1afcaa77ebc40106a64f7b440509b67e1 and trying the training again. Let us know if you run into any further issues!

haolinyan avatar Oct 19 '25 13:10 haolinyan

@haolinyan Good job! But I have an error after running your recipe. Error is "omegaconf.errors.ConfigAttributeError: Key 'ray_init' is not in struct" on recipe/async_reward_agent/main_ppo.py 226 num_cpus=config.ray_kwargs.ray_init.num_cpus. Can you help me?

@edc3000 Thanks for using our recipe! The error occurs because ray_init is defined in an older version of verl, such as in this commit:

https://github.com/volcengine/verl/blob/3e2bceb1afcaa77ebc40106a64f7b440509b67e1/verl/trainer/config/ppo_megatron_trainer.yaml#L132

We recommend merging our PR based on this commit: 3e2bceb and trying the training again. Let us know if you run into any further issues!

@haolinyan, Your work is very helpful to me, and I am using your recipe to train my RL model. But now I found that entropy in actor is unusual, like this picture (However, the reward and response length are normal and rising). I very need your help.

image image

edc3000 avatar Nov 26 '25 11:11 edc3000

@edc3000 I suspect this might tie to your training parameter settings—specifically, PPO-related coefficients like kl_loss_coef and entropy_coeff. Could you double-check if these values are configured appropriately (e.g., whether they’re set too high/low, or not adjusted as intended during training)?

haolinyan avatar Nov 28 '25 07:11 haolinyan

@haolinyan I think this might be the code in async_reward_agent/main_ppo.py. In the function run(), the fsdp_workers is imported by verl, not your code. I changed it like from .fsdp_workers import ActorRolloutRefWorker, AsyncActorRolloutRefWorker and it is work. So can you confirm it?

edc3000 avatar Nov 28 '25 08:11 edc3000