强化学习框架:OpenRLHF源码解读,模型处理

HuangJie 于 2025-04-22 在 changsha 发布 ⏳ 预计阅读 5 分钟 浏览量

本文主要介绍 强化学习框架:OpenRLHF源码解读,模型处理

models框架设计

了解一下 OpenRLHF的模型框架设计范式:

Image

From:https://arxiv.org/pdf/2405.11143

可以知道一个大概的流程:输入Pormpt通过Actor model输出回复 Response,而后将两部分进行拼接再去由其他模型进行处理

1、actor.py

https://github.com/OpenRLHF/OpenRLHF/blob/main/openrlhf/models/actor.py

这部分主要为加载所需要的模型

class Actor(nn.Module):
    def __init__(...):
        if isinstance(pretrain_or_model, str):
            ...
            self.model = model_class.from_pretrained(
                pretrain_or_model,
                trust_remote_code=True,
                attn_implementation=attn_implementation,
                quantization_config=nf4_config,
                torch_dtype=torch.bfloat16 if bf16 else "auto",
                device_map=device_map,
            )
            if lora_rank > 0:
                self.model.enable_input_require_grads()
                lora_config = LoraConfig(
                    task_type=TaskType.CAUSAL_LM,
                    r=lora_rank,
                    lora_alpha=lora_alpha,
                    target_modules=target_modules,
                    lora_dropout=lora_dropout,
                    bias="none",
                )
                self.model = get_peft_model(self.model, lora_config)
                ...
        else:
            self.model = pretrain_or_model
    @torch.no_grad()
    def generate(self, input_ids: torch.Tensor, **kwargs):
        ...
        sequences = self.model.generate(**generate_args)
        eos_token_id = generate_args["eos_token_id"]
        pad_token_id = generate_args["pad_token_id"]
        return self.process_sequences(sequences, input_ids.size(1), eos_token_id, pad_token_id)
    def forward(...):
        ...
        output["logits"] = output["logits"].to(torch.float32) # 得到每一个token概率
        ...
        log_probs = log_probs_from_logits(
                    output["logits"][:, :-1, :], sequences[:, 1:], temperature=self.temperature
                )
        ...
        action_log_probs = log_probs[:, -num_actions:]

这个actor比较简单,首先从huggingface加载需要的模型,并且对模型进行部分设置如:量化/lora微调。或者直接加载自己预训练好的模型。
1、generate:模块则是根据输入的内容(比如说被 tokenizer处理好的文本)input_ids通过模型输出新的内容(根据 **kwargs获取生成文本参数设置比如说:top_k等)
2、forward根据输入的 token 序列(sequences),计算模型在生成最后若干个 token(即 “动作”)时的对数概率(log probs),之所以要这么处理是因为,在强化学习模型中(PPO、DPO等)一般而言模型的输出是一个序列,但优化目标不是“能不能生成这个序列”,而是:这个序列中,哪些 token 是“好”的?模型对这些 token 的概率应该更高!比如说在 DPO中:

\[L(θ) = E[ min(r(θ) * A, clip(r(θ), 1-ε, 1+ε) * A) ]\]

里面的

\[r(\theta)=\pi_{\theta}(a|s)/\pi_{old}(a|s)\]

就是概率比值,上面代码中:

log_probs_from_logits(output["logits"][:, :-1, :], sequences[:, 1:], temperature=self.temperature)

计算的就是:$log(\pi_{\theta}(a|s))$,在具体代码中:

def log_probs_from_logits(logits: torch.Tensor, labels: torch.Tensor, temperature: float = 1.0) -> torch.Tensor:
    if temperature != 1.0:
        logits.div_(temperature)
    if logits.dtype in [torch.float32, torch.float64]:
        batch_dim = logits.shape[:-1]
        last_dim = logits.shape[-1]
        try:
            from flash_attn.ops.triton.cross_entropy import cross_entropy_loss

            output = cross_entropy_loss(logits.reshape(-1, last_dim), labels.reshape(-1))
            log_probs_labels = -output[0].view(*batch_dim)
        except ImportError:
            logits_labels = torch.gather(logits, dim=-1, index=labels.unsqueeze(-1)).squeeze(-1)
            logsumexp_values = _logsumexp_by_chunk(logits.reshape(-1, last_dim))
            logsumexp_values = logsumexp_values.view(*batch_dim)
            log_probs_labels = logits_labels - logsumexp_values  # log_softmax(x_i) = x_i - logsumexp(x)
    else:
        log_probs_labels = []
        for row_logits, row_labels in zip(logits, labels):  # loop to reduce peak mem consumption
            row_log_probs = F.log_softmax(row_logits, dim=-1)
            row_log_probs_labels = row_log_probs.gather(dim=-1, index=row_labels.unsqueeze(-1)).squeeze(-1)
            log_probs_labels.append(row_log_probs_labels)
        log_probs_labels = torch.stack(log_probs_labels)
    return log_probs_labels

补充-1
在使用 AutoModelForCausalLM.from_pretrained使用得到 model之后,其支持输入参数为:

outputs = model(
    input_ids=None,            # 输入的token(batch_size, seq_length)
    attention_mask=None,       # 指示哪些 token 是有效的(非 padding),形状同 input_ids
    position_ids=None,         # 位置编码
    past_key_values=None,
    inputs_embeds=None,
    use_cache=None,            # 是否使用k-v cache
    labels=None,               # 输入标签就直接计算loss
    output_attentions=None,
    output_hidden_states=None,
    return_dict=None,
)

补充-2
在LLM训练过程中遇到过短的语句为了节约显存(如果都将内容补充到相同长度,那么就会有较多的padding造成浪费),因此可以将几个短的拼接起来,但是为了区分那些是一个句子那些不是的,在 OpenRLHF中通过参数:self.packing_samples。如果没有 packing那么直接根据 attention_mask将位置编码在处理一下

if not self.packing_samples:
    position_ids = attention_mask.long().cumsum(-1) - 1
    position_ids.masked_fill_(attention_mask == 0, 1)
else:
    # convert attention_mask to position_ids
    if ring_attn_group is not None:
        labels = sequences
        sequences, attention_mask, position_ids = convert_ring_attn_params(
            sequences, attention_mask, packed_seq_lens, ring_attn_group
        )
    else:
        position_ids = reset_position_ids(attention_mask)
    # explicitly ignore attention_mask for packing_samples
    attention_mask = None

其中 reset_position_ids做的就是重新做位置编码重新处理

2、model.py

https://github.com/OpenRLHF/OpenRLHF/blob/main/openrlhf/models/model.py

Image

主要功能返回所需要的模型,主要返回2个模型:1、CriticModel;2、RewardModel 回顾一下这几类模型的作用:无论是在GRPO还是DPO中都会输出token然后需要去对token进行评分,起评分作用的就是 reward model 对应上面图中 reward model,除此之外都会计算 优势函数($Q(s,a)-V(s)$)来评估策略的好坏优势函数里面计算就是通过 critic model来对某一个策略进行评估对应上面图像中的:value model

def _get_reward_model(base_pretrained_model, base_llm_model, value_head_prefix="score", packing_samples=False):
    class RewardModel(base_pretrained_model):
        def __init__(...):
            ...
            # 加载模型
            setattr(self, self.base_model_prefix, base_llm_model(config))
            self.value_head_prefix = value_head_prefix
            setattr(self, value_head_prefix, nn.Linear(config.hidden_size, 1, bias=False) # 输出评分
            ...
        def forward(self, input_ids: torch.LongTensor = None, attention_mask: Optional[torch.Tensor] = None, return_output=False, ring_attn_group=None,pad_sequence=False, packed_seq_lens=None,):
            ...# 1、处理packing
            outputs = getattr(self, self.base_model_prefix)(
                input_ids, attention_mask=attention_mask, position_ids=position_ids
            )
            last_hidden_states = outputs["last_hidden_state"]
            values = getattr(self, self.value_head_prefix)(last_hidden_states).squeeze(-1)
            ...# 1、处理packing
            else:
                # 输出最后一个有效token的评分代替整个句子评分
                eos_indices = attention_mask.size(1) - 1 - attention_mask.long().fliplr().argmax(dim=1, keepdim=True)
                reward = values.gather(dim=1, index=eos_indices).squeeze(1)
            if not self.training and self.normalize_reward:
                reward = (reward - self.mean) / self.std
            return (reward, outputs) if return_output else reward
    return RewardModel

def _get_critic_model(base_pretrained_model, base_llm_model, value_head_prefix="score", packing_samples=False):
    class CriticModel(base_pretrained_model):
        def __init__(...):
            ...
        def forward(...):
            ...# 1、处理packing
            outputs = getattr(self, self.base_model_prefix)(
                input_ids, attention_mask=attention_mask, position_ids=position_ids
            )
            last_hidden_states = outputs["last_hidden_state"]
            values = getattr(self, self.value_head_prefix)(last_hidden_states).squeeze(-1)
            ...
            if num_actions is None:
                assert return_output
                return outputs
            if not self.packing_samples:
                action_values = values[:, -num_actions:]
            else:
                assert isinstance(num_actions, list) and len(num_actions) == len(packed_seq_lens)
                action_values = []
                offset = 0
                for num_action, seq_len in zip(num_actions, packed_seq_lens):
                    start, end = max(0, offset + seq_len - num_action - 1), offset + seq_len - 1
                    action_values.append(values[:, start:end])
                    offset += seq_len
                action_values = torch.cat(action_values, dim=1)

            if return_output:
                return (action_values, outputs)
            else:
                return action_values

    return CriticModel

1、reward model: 传入一个 base_pretrained_model(比如 PreTrainedModel)、一个 base_llm_model(比如 AutoModel)以及一些控制参数。函数内部返回一个定制化的奖励模型类 RewardModel,它可以在给定输入句子时,输出一个数值(reward 分数),反映输出文本的质量。在forward计算中,直接将输入model使用的几个参数(见上面的补充有具体解释)计算最后取最后一个状态的值,并且将这个值取计算评分。也就是说 reward model:首先计算下一个预测的token而后对这些token进行打分
2、critic model:具体输入参数和 reward model相同。参考之前介绍,上面代码中直接返回action_values = values[:, -num_actions:]num_actions存在条件下)这样就会得到不同的Q(s, a1), Q(s, a2), …

总结上面两组模型,在 LLM 的强化学习场景下,Reward Model 和 Critic Model 都从 last_hidden_state 得到 token-level 表达,再用 Linear 层输出每个 token 的 score。
Reward Model 最后提取的是 EOS token 的 score,表示整句话的奖励。
Critic Model 会进一步提取最后 num_actions 个 token 的 value,这些 token 是 Actor 生成的动作,对应到 PPO 中的:𝐴(𝑠,𝑎)=𝑄(𝑠,𝑎)−𝑉(𝑠)。

理解上面内容,回顾最上面的框架设计,用下面例子进行解释。
Prompt:"The capital of France is"
Actor model:"Paris is beautiful"。那么合并得到:input_ids = ["The", "capital", "of", "France", "is", " Paris", "is", "beautiful"]
Reward model:对上面每个单词进行评分,假设:values = [0.1, 0.2, 0.3, 0.2, 0.4, 0.7, 0.5, 0.8] # 每个 token 的 score 而后输出句子中整体评分 0.8
Critic model:只对最后几个 token 的 action 计算 loss,于是:action_values = values[:, -3:] # 即取出最后 3 个生成 token 的 Q 值这些值也就对应了我们模型的生成

理解完两个模型定义以及处理范式之后,作者直接通过 get_llm_for_sequence_regression 来使用上面所定义的 _get_reward_model_get_critic_model 操作也比较简单:

def get_llm_for_sequence_regression(
    model_name_or_path,
    model_type,
    ...):
    ...
    # 1)通过model_type 选择所使用的模型
    if model_type == "reward":
        cls_class = _get_reward_model(base_pretrained_class, base_class, value_head_prefix, packing_samples)
    else:
        cls_class = _get_critic_model(base_pretrained_class, base_class, value_head_prefix, packing_samples)
    ...
    # 2)加载模型
    model = cls_class.from_pretrained(
        model_name_or_path,
        config=config,
        trust_remote_code=True,
        torch_dtype=torch.bfloat16 if bf16 else "auto",
        quantization_config=nf4_config,
        device_map=device_map,
        **kwargs,
    )
    ...
    return model

3、loss.py

https://github.com/OpenRLHF/OpenRLHF/blob/main/openrlhf/models/loss.py

补充-1:
裁剪使用的是torch.clamp(https://pytorch.org/docs/stable/generated/torch.clamp.html)强制将范围外的数值处理为边界值,范围内数字保持不变

1、PolicyLoss:Policy Loss for PPO

\[\begin{align*} r_t &= \exp(\log \pi(a_t \mid s_t) - \log \pi_{\text{old}}(a_t \mid s_t)) \\ \mathcal{L}_{\text{clip}}(t) &= \min\left(r_t \cdot A_t,\ \text{clip}(r_t,\ 1 - \epsilon,\ 1 + \epsilon) \cdot A_t\right) \\ \mathcal{L}_{\text{policy}} &= -\mathbb{E}_t \left[ \mathcal{L}_{\text{clip}}(t) \right] \end{align*}\]

2、ValueLoss: Value Loss for PPO

\[\mathcal{L}_{\text{value}} = \frac{1}{2} \cdot \mathbb{E}_{t \sim \text{mask}} \left[ \max \left( (V_{\text{clip}, t} - R_t)^2, \, (V_t - R_t)^2 \right) \right]\\ \text{其中:}V_{\text{clip}} = V_{\text{old}} + \text{clip}(V - V_{\text{old}}, -\epsilon, \epsilon)\]

代码测试

修改了代码见链接:https://www.big-yellow-j.top/code/OpenRLHF_model.py

总结

本文主要介绍了在 OpenRLHF中模型框架设计,主要分为3类模型:1、actor model;2、critic model;3、reward model这三类模型中分别起到作用:1、直接更具prompt输出response;2、输出token的评分(action_values = values[:, -3:]);3、返回整句输出评分(找出最后一个有效 token 的索引,然后从 value 向量中提取该位置的值作为 reward。)

Footer Image