本文主要介绍 强化学习框架:OpenRLHF源码解读,模型处理
了解一下 OpenRLHF的模型框架设计范式:
From:https://arxiv.org/pdf/2405.11143
可以知道一个大概的流程:输入Pormpt通过Actor model输出回复 Response,而后将两部分进行拼接再去由其他模型进行处理
https://github.com/OpenRLHF/OpenRLHF/blob/main/openrlhf/models/actor.py
这部分主要为加载所需要的模型
class Actor(nn.Module):
def __init__(...):
if isinstance(pretrain_or_model, str):
...
self.model = model_class.from_pretrained(
pretrain_or_model,
trust_remote_code=True,
attn_implementation=attn_implementation,
quantization_config=nf4_config,
torch_dtype=torch.bfloat16 if bf16 else "auto",
device_map=device_map,
)
if lora_rank > 0:
self.model.enable_input_require_grads()
lora_config = LoraConfig(
task_type=TaskType.CAUSAL_LM,
r=lora_rank,
lora_alpha=lora_alpha,
target_modules=target_modules,
lora_dropout=lora_dropout,
bias="none",
)
self.model = get_peft_model(self.model, lora_config)
...
else:
self.model = pretrain_or_model
@torch.no_grad()
def generate(self, input_ids: torch.Tensor, **kwargs):
...
sequences = self.model.generate(**generate_args)
eos_token_id = generate_args["eos_token_id"]
pad_token_id = generate_args["pad_token_id"]
return self.process_sequences(sequences, input_ids.size(1), eos_token_id, pad_token_id)
def forward(...):
...
output["logits"] = output["logits"].to(torch.float32) # 得到每一个token概率
...
log_probs = log_probs_from_logits(
output["logits"][:, :-1, :], sequences[:, 1:], temperature=self.temperature
)
...
action_log_probs = log_probs[:, -num_actions:]
这个actor比较简单,首先从huggingface加载需要的模型,并且对模型进行部分设置如:量化/lora微调。或者直接加载自己预训练好的模型。
1、generate
:模块则是根据输入的内容(比如说被 tokenizer处理好的文本)input_ids通过模型输出新的内容(根据 **kwargs
获取生成文本参数设置比如说:top_k
等)
2、forward
:根据输入的 token 序列(sequences),计算模型在生成最后若干个 token(即 “动作”)时的对数概率(log probs),之所以要这么处理是因为,在强化学习模型中(PPO、DPO等)一般而言模型的输出是一个序列,但优化目标不是“能不能生成这个序列”,而是:这个序列中,哪些 token 是“好”的?模型对这些 token 的概率应该更高!比如说在 DPO中:
里面的
\[r(\theta)=\pi_{\theta}(a|s)/\pi_{old}(a|s)\]就是概率比值,上面代码中:
log_probs_from_logits(output["logits"][:, :-1, :], sequences[:, 1:], temperature=self.temperature)
计算的就是:$log(\pi_{\theta}(a|s))$,在具体代码中:
def log_probs_from_logits(logits: torch.Tensor, labels: torch.Tensor, temperature: float = 1.0) -> torch.Tensor:
if temperature != 1.0:
logits.div_(temperature)
if logits.dtype in [torch.float32, torch.float64]:
batch_dim = logits.shape[:-1]
last_dim = logits.shape[-1]
try:
from flash_attn.ops.triton.cross_entropy import cross_entropy_loss
output = cross_entropy_loss(logits.reshape(-1, last_dim), labels.reshape(-1))
log_probs_labels = -output[0].view(*batch_dim)
except ImportError:
logits_labels = torch.gather(logits, dim=-1, index=labels.unsqueeze(-1)).squeeze(-1)
logsumexp_values = _logsumexp_by_chunk(logits.reshape(-1, last_dim))
logsumexp_values = logsumexp_values.view(*batch_dim)
log_probs_labels = logits_labels - logsumexp_values # log_softmax(x_i) = x_i - logsumexp(x)
else:
log_probs_labels = []
for row_logits, row_labels in zip(logits, labels): # loop to reduce peak mem consumption
row_log_probs = F.log_softmax(row_logits, dim=-1)
row_log_probs_labels = row_log_probs.gather(dim=-1, index=row_labels.unsqueeze(-1)).squeeze(-1)
log_probs_labels.append(row_log_probs_labels)
log_probs_labels = torch.stack(log_probs_labels)
return log_probs_labels
补充-1:
在使用AutoModelForCausalLM.from_pretrained
使用得到model
之后,其支持输入参数为:outputs = model( input_ids=None, # 输入的token(batch_size, seq_length) attention_mask=None, # 指示哪些 token 是有效的(非 padding),形状同 input_ids position_ids=None, # 位置编码 past_key_values=None, inputs_embeds=None, use_cache=None, # 是否使用k-v cache labels=None, # 输入标签就直接计算loss output_attentions=None, output_hidden_states=None, return_dict=None, )
补充-2:
在LLM训练过程中遇到过短的语句为了节约显存(如果都将内容补充到相同长度,那么就会有较多的padding造成浪费),因此可以将几个短的拼接起来,但是为了区分那些是一个句子那些不是的,在 OpenRLHF中通过参数:self.packing_samples
。如果没有packing
那么直接根据attention_mask
将位置编码在处理一下if not self.packing_samples: position_ids = attention_mask.long().cumsum(-1) - 1 position_ids.masked_fill_(attention_mask == 0, 1) else: # convert attention_mask to position_ids if ring_attn_group is not None: labels = sequences sequences, attention_mask, position_ids = convert_ring_attn_params( sequences, attention_mask, packed_seq_lens, ring_attn_group ) else: position_ids = reset_position_ids(attention_mask) # explicitly ignore attention_mask for packing_samples attention_mask = None
其中
reset_position_ids
做的就是重新做位置编码重新处理
https://github.com/OpenRLHF/OpenRLHF/blob/main/openrlhf/models/model.py
主要功能返回所需要的模型,主要返回2个模型:1、CriticModel
;2、RewardModel
回顾一下这几类模型的作用:无论是在GRPO还是DPO中都会输出token然后需要去对token进行评分,起评分作用的就是 reward model
对应上面图中 reward model
,除此之外都会计算 优势函数($Q(s,a)-V(s)$)来评估策略的好坏优势函数里面计算就是通过 critic model
来对某一个策略进行评估对应上面图像中的:value model
def _get_reward_model(base_pretrained_model, base_llm_model, value_head_prefix="score", packing_samples=False):
class RewardModel(base_pretrained_model):
def __init__(...):
...
# 加载模型
setattr(self, self.base_model_prefix, base_llm_model(config))
self.value_head_prefix = value_head_prefix
setattr(self, value_head_prefix, nn.Linear(config.hidden_size, 1, bias=False) # 输出评分
...
def forward(self, input_ids: torch.LongTensor = None, attention_mask: Optional[torch.Tensor] = None, return_output=False, ring_attn_group=None,pad_sequence=False, packed_seq_lens=None,):
...# 1、处理packing
outputs = getattr(self, self.base_model_prefix)(
input_ids, attention_mask=attention_mask, position_ids=position_ids
)
last_hidden_states = outputs["last_hidden_state"]
values = getattr(self, self.value_head_prefix)(last_hidden_states).squeeze(-1)
...# 1、处理packing
else:
# 输出最后一个有效token的评分代替整个句子评分
eos_indices = attention_mask.size(1) - 1 - attention_mask.long().fliplr().argmax(dim=1, keepdim=True)
reward = values.gather(dim=1, index=eos_indices).squeeze(1)
if not self.training and self.normalize_reward:
reward = (reward - self.mean) / self.std
return (reward, outputs) if return_output else reward
return RewardModel
def _get_critic_model(base_pretrained_model, base_llm_model, value_head_prefix="score", packing_samples=False):
class CriticModel(base_pretrained_model):
def __init__(...):
...
def forward(...):
...# 1、处理packing
outputs = getattr(self, self.base_model_prefix)(
input_ids, attention_mask=attention_mask, position_ids=position_ids
)
last_hidden_states = outputs["last_hidden_state"]
values = getattr(self, self.value_head_prefix)(last_hidden_states).squeeze(-1)
...
if num_actions is None:
assert return_output
return outputs
if not self.packing_samples:
action_values = values[:, -num_actions:]
else:
assert isinstance(num_actions, list) and len(num_actions) == len(packed_seq_lens)
action_values = []
offset = 0
for num_action, seq_len in zip(num_actions, packed_seq_lens):
start, end = max(0, offset + seq_len - num_action - 1), offset + seq_len - 1
action_values.append(values[:, start:end])
offset += seq_len
action_values = torch.cat(action_values, dim=1)
if return_output:
return (action_values, outputs)
else:
return action_values
return CriticModel
1、reward model
: 传入一个 base_pretrained_model(比如 PreTrainedModel)、一个 base_llm_model(比如 AutoModel)以及一些控制参数。函数内部返回一个定制化的奖励模型类 RewardModel,它可以在给定输入句子时,输出一个数值(reward 分数),反映输出文本的质量。在forward计算中,直接将输入model使用的几个参数(见上面的补充有具体解释)计算最后取最后一个状态的值,并且将这个值取计算评分。也就是说 reward model:首先计算下一个预测的token而后对这些token进行打分
2、critic model
:具体输入参数和 reward model
相同。参考之前介绍,上面代码中直接返回action_values = values[:, -num_actions:]
( num_actions
存在条件下)这样就会得到不同的Q(s, a1), Q(s, a2), …
总结上面两组模型,在 LLM 的强化学习场景下,Reward Model 和 Critic Model 都从 last_hidden_state 得到 token-level 表达,再用 Linear 层输出每个 token 的 score。
Reward Model
最后提取的是 EOS token 的 score,表示整句话的奖励。
Critic Model
会进一步提取最后 num_actions 个 token 的 value,这些 token 是 Actor 生成的动作,对应到 PPO 中的:𝐴(𝑠,𝑎)=𝑄(𝑠,𝑎)−𝑉(𝑠)。
理解上面内容,回顾最上面的框架设计,用下面例子进行解释。
Prompt:"The capital of France is"
Actor model:"Paris is beautiful"
。那么合并得到:input_ids = ["The", "capital", "of", "France", "is", " Paris", "is", "beautiful"]
Reward model:对上面每个单词进行评分,假设:values = [0.1, 0.2, 0.3, 0.2, 0.4, 0.7, 0.5, 0.8] # 每个 token 的 score
而后输出句子中整体评分 0.8
Critic model:只对最后几个 token 的 action 计算 loss,于是:action_values = values[:, -3:] # 即取出最后 3 个生成 token 的 Q 值
这些值也就对应了我们模型的生成
理解完两个模型定义以及处理范式之后,作者直接通过 get_llm_for_sequence_regression
来使用上面所定义的 _get_reward_model
和 _get_critic_model
操作也比较简单:
def get_llm_for_sequence_regression(
model_name_or_path,
model_type,
...):
...
# 1)通过model_type 选择所使用的模型
if model_type == "reward":
cls_class = _get_reward_model(base_pretrained_class, base_class, value_head_prefix, packing_samples)
else:
cls_class = _get_critic_model(base_pretrained_class, base_class, value_head_prefix, packing_samples)
...
# 2)加载模型
model = cls_class.from_pretrained(
model_name_or_path,
config=config,
trust_remote_code=True,
torch_dtype=torch.bfloat16 if bf16 else "auto",
quantization_config=nf4_config,
device_map=device_map,
**kwargs,
)
...
return model
https://github.com/OpenRLHF/OpenRLHF/blob/main/openrlhf/models/loss.py
补充-1:
裁剪使用的是torch.clamp
(https://pytorch.org/docs/stable/generated/torch.clamp.html)强制将范围外的数值处理为边界值,范围内数字保持不变
1、PolicyLoss
:Policy Loss for PPO
2、ValueLoss
: Value Loss for PPO
修改了代码见链接:https://www.big-yellow-j.top/code/OpenRLHF_model.py
本文主要介绍了在 OpenRLHF中模型框架设计,主要分为3类模型:1、actor model
;2、critic model
;3、reward model
这三类模型中分别起到作用:1、直接更具prompt输出response;2、输出token的评分(action_values = values[:, -3:]
);3、返回整句输出评分(找出最后一个有效 token 的索引,然后从 value 向量中提取该位置的值作为 reward。)