LOADING

wait wait wait...

LLM学习日记 03

  • Positional Encoding
  • LLM Structure
  • 解码采样策略

Positional Encoding

Transformer丢掉了时序信息,而一个句子中词与词之间的顺序不同会导致千差万别的语义,为了解决时序问题,Transformer的作者采用了Positional Encoding的办法,即位置编码
简单概括,Positional Encoding就是将位置信息嵌入到Embedding词向量中,让Transformer保留词向量的位置信息
进一步,我们期望得到一种位置表示方式,满足:

  • 能用来表示一个token在序列中的绝对位置
  • 在序列长度不同的情况下,不同序列中token的相对位置/距离也保持一致
  • 可以用来表示模型在训练过程中从来没有看到过的句子长度,称为长度外推问题

绝对位置编码

Transformer的位置编码

  • $PE_{t}=[sin(\omega_{0}t),cos(\omega_{0}t),sin(\omega_{1}t),cos(\omega_{1}t),\cdots,sin(\omega_{\frac{d_{model}}{2}-1}t),cos(\omega_{\frac{d_{model}}{2}-1}t)]$
  • 采用正余弦函数交替进行位置编码
  • 可以通过线性变换矩阵得到其他位置的表示,因为
    $\begin{pmatrix}sin(t+\Delta t)\cr cos(t+\Delta t)\end{pmatrix}=\begin{pmatrix}cos\Delta t&sin\Delta t\cr -sin\Delta t&cos\Delta t\end{pmatrix}\begin{pmatrix}sint\cr cost\end{pmatrix}$
    img
class PositionalEncoding(nn.Module):

    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()       
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        #pe.requires_grad = False
        self.register_buffer('pe', pe)

    def forward(self, x):
        return x + self.pe[:x.size(0), :]
  • Transformer位置编码的缺点:位置编码点积的无向性,$PE_{t}^{T}*PE_{t+\Delta t}=PE_{t}^{T}*PE_{t-\Delta t}$,即两个位置编码的乘积仅取决于$\Delta T$,不能用来表示位置的方向性

BERT的可学习位置编码

  • 将位置编码当作可训练参数
  • 缺点是没有长度外推性

Structure

LLM的结构可以分为四类:

  • Decoder-only
    • 从左到右的单向注意力
    • 自回归语言模型
    • 文本生产效果好
    • 训练效率高
    • GPT、llama、BLOOM、OPT
      img
  • Encoder-only
    • 经典模型BERT
    • 目标是生成语言模型,预训练的语言模型
  • Encoder-Decoder
    • 输入双向注意力、输出单向注意力
    • 对问题的编码理解更充分,在偏理解的任务上表现较好
    • 训练效率低
    • 文本生成任务表现差
    • T5、Flan-T5、BART
  • Prefix LM(特殊的Encoder-Decoder)
    • GLM、U-PaLm
  • MOE (Mixture of Experts)
    • Deepseek V2/V3、Mistral
      img

解码采样策略

  • 每次选择概率最大的token解码
  • 简单高效
  • 导致生成的文本过于单调和重复
import torch
import torch.nn.functional as F

def greedy_search(logits, max_len=20, eos_token_id=2):
    batch_size = logits.size(0)
    seq_len = logits.size(1)
    vocab_size = logits.size(2)
    
    # 初始化生成的序列,存储每个时间步选择的词索引
    generated_sequence = torch.zeros(batch_size, max_len, dtype=torch.long)
    # 记录当前生成的序列,初始化为开始 token(一般为0或者某个特殊的起始符号)
    current_input = torch.zeros(batch_size, dtype=torch.long)  # 起始token id  
    for t in range(max_len):
        # 获取当前步骤的 logits
        current_logits = logits[:, t, :]        
        # 计算每个词的概率分布(通过softmax)
        probs = F.softmax(current_logits, dim=-1)       
        # 选择概率最高的词索引(greedy)
        next_token = torch.argmax(probs, dim=-1)        
        # 将选择的词加入到生成的序列中
        generated_sequence[:, t] = next_token        
        # 检查是否遇到结束标记,若遇到则停止生成
        if (next_token == eos_token_id).all():
            break        
        # 更新当前输入(下一个时间步的输入)
        current_input = next_token
    return generated_sequence
  • 维护一个大小为$k$的候选序列集合,每一步从每个候选序列的概率分布中选择概率最高的$k$个单词
  • 保留总概率最高的$k$个候选序列
  • 平衡生成的质量和多样性
  • 可能导致生成的文本过于保守和不自然
def pred(input):
    batch,seq_len=input.shape
    generate=torch.randn(size=(batch,1,10))
    return generate

def beam_search(input_ids,max_length,num_beams):
    batch=input_ids.shape[0]

    #输入扩展:效果是把input_ids复制成 batch_size * beam_size的个数
    expand_size=num_beams
    expanded_return_idx = (
        torch.arange(input_ids.shape[0]).view(-1, 1).repeat(1, expand_size).view(-1).to(input_ids.device)
    )
    input_ids = input_ids.index_select(0, expanded_return_idx)
    print(input_ids)

    batch_beam_size,cur_len=input_ids.shape
    beam_scores=torch.zeros(size=(batch,num_beams),dtype=torch.float,device=input_ids.device)
    beam_scores[:,1:]=-1e9

    beam_scores=beam_scores.view(size=(batch*num_beams,))
    next_tokens=torch.zeros(size=(batch,num_beams),dtype=torch.long,device=input_ids.device)
    next_indices=torch.zeros(size=(batch,num_beams),dtype=torch.long,device=input_ids.device)

    while cur_len<max_length:

        logits=pred(input_ids)    #batch,seq_len,vocab
        next_token_logits=logits[:,-1,:]  #当前时刻的输出

        #归一化
        next_token_scores=F.log_softmax(next_token_logits,dim=-1)   # (batch_size * num_beams, vocab_size)
        #求概率
        next_token_scores = next_token_scores + beam_scores[:, None].expand_as(next_token_scores)  # 当前概率+先前概率

        # reshape for beam search
        vocab_size = next_token_scores.shape[-1]
        next_token_scores = next_token_scores.view(batch, num_beams * vocab_size)

        # 当前时刻的token 得分,  token_id
        next_token_scores, next_tokens = torch.topk(
            next_token_scores, num_beams, dim=1, largest=True, sorted=True
        )

        next_indices = next_tokens // vocab_size  #对应的beam_id
        next_tokens = next_tokens % vocab_size    #对应的indices

        #集束搜索核心
        def process(input_ids,next_scores,next_tokens,next_indices):
            batch_size=3
            group_size=3
            next_beam_scores = torch.zeros((batch_size, num_beams), dtype=next_scores.dtype)
            next_beam_tokens = torch.zeros((batch_size, num_beams), dtype=next_tokens.dtype)
            next_beam_indices = torch.zeros((batch_size,num_beams), dtype=next_indices.dtype)

            for batch_idx in range(batch_size):
                beam_idx=0
                for beam_token_rank, (next_token, next_score, next_index) in enumerate(
                        zip(next_tokens[batch_idx], next_scores[batch_idx], next_indices[batch_idx])
                ):
                    batch_beam_idx=batch_idx*num_beams+next_index

                    next_beam_scores[batch_idx, beam_idx] = next_score      #当前路径得分
                    next_beam_tokens[batch_idx, beam_idx] = next_token      #当前时刻的token
                    next_beam_indices[batch_idx, beam_idx] = batch_beam_idx  #先前对应的id

                    beam_idx += 1

            return next_beam_scores.view(-1), next_beam_tokens.view(-1), next_beam_indices.view(-1)

        beam_scores, beam_next_tokens, beam_idx=process(input_ids,next_token_scores,next_tokens,next_indices)

        # 更新输入, 找到对应的beam_idx, 选择的tokens, 拼接为新的输入      #(batch*beam,seq_len)
        input_ids = torch.cat([input_ids[beam_idx, :], beam_next_tokens.unsqueeze(-1)], dim=-1)
        cur_len = cur_len + 1
    #输出
    return input_ids,beam_scores

if __name__ == '__main__':
    input_ids=torch.randint(0,100,size=(3,1))
    print(input_ids)
    input_ids,beam_scores=beam_search(input_ids,max_length=10,num_beams=3)
    print(input_ids)

Top-K Sampling

  • 从排名前$k$的token中进行抽样,允许其他分数或概率较高的token被选中
  • 将采样池限制为固定大小的k可能在分布比较尖锐的时候胡言乱语,另一方面在分布比较平坦的时候限制模型的创造力
#@torch.inference_mode()
    @torch.no_grad()
    def generate(self, idx, 
                 eos, max_new_tokens, temperature=1.0, top_k=None):
        for _ in range(max_new_tokens):
            # if the sequence context is growing too long we must crop it at block_size
            idx_cond = idx if idx.size(1) <= self.params.max_seq_len else idx[:, -self.params.max_seq_len:]
            # forward the model to get the logits for the index in the sequence
            logits = self(idx_cond)##===================================
            logits = logits[:, -1, :] # crop to just the final time step
            if temperature == 0.0:
                # "sample" the single most likely index
                _, idx_next = torch.topk(logits, k=1, dim=-1)
            else:
                # pluck the logits at the final step and scale by desired temperature
                logits = logits / temperature
                # optionally crop the logits to only the top k options
                if top_k is not None:
                    v, _ = torch.topk(logits, min(top_k, logits.size(-1)))
                    logits[logits < v[:, [-1]]] = -float('Inf')
                # apply softmax to convert logits to (normalized) probabilities
                probs = F.softmax(logits, dim=-1)
                idx_next = torch.multinomial(probs, num_samples=1)
            # append sampled index to the running sequence and continue
            idx = torch.cat((idx, idx_next), dim=1)
            if idx_next==eos:
                break
        return idx

Top-P Sampling

  • 在累积概率超过设定值$P$的最小单词集中采样
import torch
import torch.nn.functional as F

def top_p_sampling(logits, p=0.9):
    # 应用softmax得到概率分布
    probs = F.softmax(logits, dim=-1)
    # 对概率分布进行排序
    sorted_probs, sorted_indices = torch.sort(probs, descending=True, dim=-1)    
    # 计算累积概率
    cumulative_probs = torch.cumsum(sorted_probs, dim=-1)    
    # 找到第一个使得累积概率大于p的位置
    cutoff_idx = torch.sum(cumulative_probs <= p, dim=-1)
    # 根据cutoff_idx截断概率分布
    masked_probs = torch.gather(probs, dim=-1, index=sorted_indices[:, :cutoff_idx + 1])    
    # 重新归一化剩余概率
    masked_probs = masked_probs / masked_probs.sum(dim=-1, keepdim=True)
    # 从候选词中随机选择一个词
    selected_idx = torch.multinomial(masked_probs, 1)    
    return sorted_indices.gather(-1, selected_idx)

Majority Vote

  • 多数投票,输出多个回答,选择答案一致性最多的作为最终答案