大模型原理:通用代码合集 作者:马育民 • 2026-01-26 11:03 • 阅读:10006 # 导包 ``` import torch import torch.nn as nn import tiktoken ``` # GPT_CONFIG_124M ``` GPT_CONFIG_124M = { "vocab_size": 50257, # 词汇表大小 "context_length": 1024, # 上下文长度 "emb_dim": 768, # 嵌入维度 "n_heads": 12, # 注意力头的数量 "n_layers": 12, # 层数 "drop_rate": 0.1, # dropout率 "qkv_bias": False # 查询-键-值偏置 } ``` # tokenizer ``` tokenizer = tiktoken.get_encoding("gpt2") ``` # MultiHeadAttention ``` class MultiHeadAttention(nn.Module): def __init__(self, d_in, d_out, context_length ,dropout, num_heads, qkv_bias=False): """ 初始化 :param d_in: 输入嵌入维度 :param d_out: 输出嵌入维度 :param context_length: 设置掩码矩阵 :param dropout: dropout忽略概率 :param num_heads: 多头数量 :param qkv_bias: """ super().__init__() assert (d_out % num_heads == 0),\ "d_out 必须被 num_heads 整除" self.d_out = d_out self.num_heads = num_heads # 减少投影维度以匹配所需的输出维度 self.head_dim = d_out // num_heads self.W_query = nn.Linear(d_in, d_out, bias=qkv_bias) self.W_key = nn.Linear(d_in, d_out, bias=qkv_bias) self.W_value = nn.Linear(d_in, d_out, bias=qkv_bias) # 使用一个线性层来组合头的输出 self.out_proj = nn.Linear(d_out, d_out) # Dropout 层,抑制过拟合 self.dropout = nn.Dropout(dropout) # 将上三角掩码注册为缓冲区,不需要梯度更新,但需要随模型一起保存/加载、迁移设备 self.register_buffer( "mask", torch.triu(torch.ones(context_length, context_length), diagonal=1) ) def forward(self, x): b, num_tokens, d_in = x.shape keys = self.W_key(x) queries = self.W_query(x) values = self.W_value(x) # 通过添加一个 num_heads维度来隐式地分隔矩阵。然后展开最后一个维度:(b,numtokens,dout) -> (b,num_tokens,num_heads,head_dim) keys = keys.view(b, num_tokens, self.num_heads, self.head_dim) values = values.view(b, num_tokens, self.num_heads, self.head_dim) queries = queries.view(b, num_tokens, self.num_heads, self.head_dim) # 从形状(b,num_tokens,num_heads,head_dim) 转换到(b,num_heads,num_tokens,head_dim) keys = keys.transpose(1, 2) queries = queries.transpose(1, 2) values = values.transpose(1, 2) # 计算每个头的注意力分数(点积) attn_scores = queries @ keys.transpose(2, 3) # 被截断为词元数量的掩码 mask_bool = self.mask.bool()[:num_tokens, :num_tokens] # 使用掩码来填充注意力分数 attn_scores.masked_fill_(mask_bool, -torch.inf) attn_weights = torch.softmax( attn_scores / keys.shape[-1]**0.5, dim=-1) attn_weights = self.dropout(attn_weights) # 张量形状:(b, num_tokens,n_heads,head_dim) context_vec = (attn_weights @ values).transpose(1, 2) # 组合头,其中 self.d_out= self.num_heads *self.head_dim context_vec = context_vec.contiguous().view(b, num_tokens, self.d_out) # 添加一个可选的线性投影 context_vec = self.out_proj(context_vec) return context_vec ``` # GELU ``` class GELU(nn.Module): def __init__(self): super().__init__() def forward(self, x): return 0.5 * x * (1 + torch.tanh( torch.sqrt(torch.tensor(2.0 / torch.pi)) * (x + 0.044715 * torch.pow(x, 3)) )) ``` # FeedForward ``` class FeedForward(nn.Module): def __init__(self, cfg): """ 由两个线性层和一个GELU激活函数组成 :param cfg: """ super().__init__() self.layers = nn.Sequential( nn.Linear(cfg["emb_dim"], 4 * cfg["emb_dim"]), GELU(), nn.Linear(4 * cfg["emb_dim"], cfg["emb_dim"]), ) def forward(self, x): return self.layers(x) ``` # LayerNorm ``` class LayerNorm(nn.Module): def __init__(self, emb_dim): """ :param emb_dim:嵌入维度 """ super().__init__() # eps是一个小常数(epsilon),在归一化过程中会被加到方差上以防止除零错误 self.eps = 1e-5 """ scale和shift是两个可训练的参数(与输入维度相同), 在训练过程中发现调整它们可以改善模型的训练任务表现,那么大语言模型会自动进行调整 """ self.scale = nn.Parameter(torch.ones(emb_dim)) self.shift = nn.Parameter(torch.zeros(emb_dim)) def forward(self, x): mean = x.mean(dim=-1, keepdim=True) """ 设置unbiased=False,在方差计算中,会使用样本数量 n 作为方差公式的除数。 这种方法没有使用 n-1 作为分母,以调整偏差。 因此,这种方法得到的是所谓有偏方差估计。 对于嵌入维度 n 非常大的大语言模型(如GPT-2),使用 n 和 n-1 的差异在实际中几乎可以忽略。 选择这种方法是为了确保与GPT-2模型的归一化层兼容,因为原始GPT-2模型是用TensorFlow实现的,TensorFlow默认行为是 n """ var = x.var(dim=-1, keepdim=True, unbiased=False) norm_x = (x - mean) / torch.sqrt(var + self.eps) # 相当于 wx + b return self.scale * norm_x + self.shift ``` # TransformerBlock ``` class TransformerBlock(nn.Module): def __init__(self, cfg): super().__init__() self.att = MultiHeadAttention( d_in=cfg["emb_dim"], d_out=cfg["emb_dim"], context_length=cfg["context_length"], num_heads=cfg["n_heads"], dropout=cfg["drop_rate"], qkv_bias=cfg["qkv_bias"]) self.ff = FeedForward(cfg) self.norm1 = LayerNorm(cfg["emb_dim"]) self.norm2 = LayerNorm(cfg["emb_dim"]) self.drop_shortcut = nn.Dropout(cfg["drop_rate"]) def forward(self, x): # 在注意力块中添加快捷连接 shortcut = x x = self.norm1(x) x = self.att(x) x = self.drop_shortcut(x) # 将原始输入添加回来 x = x + shortcut # 在前馈层中添加快捷链接 shortcut = x x = self.norm2(x) x = self.ff(x) x = self.drop_shortcut(x) # 将原始输入添加回来 x = x + shortcut return x ``` # GPTModel ``` class GPTModel(nn.Module) : def __init__(self, cfg): super().__init__() # 词元嵌入层 self.tok_emb = nn.Embedding(cfg["vocab_size"], cfg["emb_dim"]) self.pos_emb = nn.Embedding(cfg["context_length"], cfg["emb_dim"] ) self.drop_emb = nn.Dropout(cfg["drop_rate"]) tbs = [TransformerBlock(cfg) for _ in range(cfg["n_layers"])] self.trf_blocks = nn.Sequential(*tbs) self.final_norm = LayerNorm(cfg["emb_dim"]) # 线性输出层 self.out_head = nn.Linear( cfg["emb_dim"], cfg["vocab_size"], bias=False ) def forward(self, in_idx): batch_size, seq_len = in_idx.shape tok_embeds = self.tok_emb(in_idx) # device 的设置允许我们在CPU或GPU上训练模型,具体取决于输入数据所在的设备 pos_embeds = self.pos_emb( torch.arange(seq_len, device=in_idx.device) ) x = tok_embeds + pos_embeds x = self.drop_emb(x) x = self.trf_blocks(x) x = self.final_norm(x) logits = self.out_head(x) return logits ``` # generate_text_simple ``` def generate_text_simple(model, idx, max_new_tokens, context_size): """ 生成文本 :param model: :param idx:当前文本的索引数组,其形状为(batch,n_tokens) :param max_new_tokens: :param context_size: :return: """ for _ in range(max_new_tokens): # 将当前文本截断至支持的长度。如果大语言模型仅支持5个词元,但此时文本长度为10,则只有最后5个词元会被用作输入文本 idx_cond = idx[:, -context_size:] with torch.no_grad(): logits = model(idx_cond) # 只关注最后一个输出的内容,因此形状会从(batch,n_token,vocab_size)变为(batch,vocab_size) logits = logits[:, -1, :] # probas 的形状为(batch,vocab_size) probas = torch.softmax(logits, dim=-1) # idx_next 的形状为(batch,1) idx_next = torch.argmax(probas, dim=-1, keepdim=True) # 将计算出的下一个字符的索引添加到索引数组中,此时 idx的形状会变为(batch,n_tokens+1) idx = torch.cat((idx, idx_next), dim=1) return idx ``` # text_to_token_ids ``` def text_to_token_ids(text, tokenizer): """ 将文本编码为词元 :param text: :param tokenizer: :return: """ encoded = tokenizer.encode(text, allowed_special={'<|endoftext|>'}) encoded_tensor = torch.tensor(encoded).unsqueeze(0) # add batch dimension return encoded_tensor ``` ### token_ids_to_text ``` def token_ids_to_text(token_ids, tokenizer): """ 将词元解码为文本 :param token_ids: :param tokenizer: :return: """ flat = token_ids.squeeze(0) # remove batch dimension return tokenizer.decode(flat.tolist()) ``` 原文出处:http://malaoshi.top/show_1GW2f5VTAjjY.html