
✍ 在上一章里,我们已经把现代 LLM 的“四件标配武器”(GQA / RoPE / SwiGLU / RMSNorm + Pre-Norm)系统的学习一遍。现在再看 LLaMA、Qwen、Gemma 这类模型,你就不会觉得学起来那么费劲,因为这些模块早就成为现在的baseline了。学习完这些标配,可以帮助我们更快的把握文章的重点是什么。在这一章中,我们正式学习25年最火的中文模型——DeepSeek 系列。
技术圈有一个有趣的现象:真正爆火的那个时间点,往往已经是技术“量变到质变”的结果,而不是起点。
DeepSeek 也是这样——大家印象里是某个时间点突然刷屏、对标海外大模型,但如果往前扒时间线,你会发现它的技术演进其实是循序渐进 + 一路扩写 MoE 这条路线的。为了后面好讲清楚 DeepSeek 的架构设计,我们先用一小节,把它的几个关键“里程碑版本”梳理一下:
① 2024 年初: DeepSeek-V1 发布,提出 DeepSeekMoE 架构
② 2024 年中:DeepSeek-V2 发布,架构升级为大规模 Decoder-only LLM + DeepSeekMoE + MLA(Multi-head Latent Attention);
③ 2024 年底及之后:V2.5 / V3 / R1 等迭代,在 同一条 MoE + MLA 技术路线上继续加码,扩大总参数量、增强代码/推理能力;

看完DeepSeek模型的时间线后,是不是发现,实际上DeepSeek的主要内容就是一个DeepSeekMoE + MLA。
在普通 Transformer Block 中,对于所有的token,FFN 是这样的将其一视同仁的输入输出,公式为:
\text{FFN}(x) = W_2 \,\sigma(W_1 x)
而在 MoE(Mixture-of-Experts) 里,你可以粗暴理解成:“把原来一个大 FFN,拆成很多个 Expert,小模型(Router/Gate)决定某个 token 去找哪个 Expert 看病。在DeepSeek论文中,把每一个小模型称为一个Router,为每个token分配路径
① 给定 N 个专家: E_1, \dots, E_N
② Router 给每个 token 出一个打分:p = \text{softmax}(W_{\text{gate}} x) \in \mathbb{R}^N
③ 然后选 Top-K 个专家给它服务:y = \sum_{i\in\text{TopK}(p)} p_i \, E_i(x)

虽然需要很多个专家,参数量可以堆到非常大,但每个 token 只走 K 个 Expert,算力是比较稀疏的,只要 Router 分配合理,就能得到“参数大、算得不多”的甜蜜点。
Router代码实现import torch
import torch.nn as nn
import torch.nn.functional as F
class Router(nn.Module):
"""给每个 token 生成一个长度为 num_experts 的 gate 分布。"""
def __init__(self, d_model, num_experts):
super().__init__()
self.num_experts = num_experts
self.gate = nn.Linear(d_model, num_experts, bias=False)
def forward(self, x):
"""
x: [B, L, D]
return:
gate_scores: [B, L, num_experts],每个 token 的专家概率分布
"""
logits = self.gate(x) # [B, L, E]
gate_scores = F.softmax(logits, dim=-1)
return gate_scoresclass Top1GatedMoE(nn.Module):
"""
极简版 Top-1 MoE:
- Router 给每个 token 选 1 个专家
- 实现公式:y = p_i * E_i(x)
- 不实现负载均衡 / 容量限制,先把计算图搭清楚
"""
def __init__(self, d_model, d_ff, num_experts):
super().__init__()
self.d_model = d_model
self.num_experts = num_experts
# 一堆专家 E_1 ... E_N
self.experts = nn.ModuleList(
[ExpertFFN(d_model, d_ff) for _ in range(num_experts)]
)
# Router: W_gate
self.router = Router(d_model, num_experts)
def forward(self, x):
"""
x: [B, L, D]
return:
y: [B, L, D]
"""
B, L, D = x.shape
gate_scores = self.router(x) # [B, L, E]
top1_score, top1_idx = gate_scores.max(dim=-1) # [B, L], [B, L]
x_flat = x.reshape(B * L, D) # [T, D], T = B*L
idx_flat = top1_idx.reshape(B * L) # [T]
score_flat = top1_score.reshape(B * L) # [T]
out_flat = torch.zeros_like(x_flat) # [T, D]
for expert_id, expert in enumerate(self.experts):
mask = (idx_flat == expert_id) # [T]
if not mask.any():
continue
token_idx = mask.nonzero(as_tuple=True)[0] # [N_e]
x_e = x_flat[token_idx] # [N_e, D]
s_e = score_flat[token_idx].unsqueeze(-1) # [N_e, 1]
# 过 expert FFN
y_e = expert(x_e) # [N_e, D]
y_e = y_e * s_e
out_flat[token_idx] = y_e
# 3) reshape 回 [B, L, D]
out = out_flat.view(B, L, D)
return outDeepSeekMoE 这篇 ACL 长文里,作者先点名了传统 Top-K MoE 的两个问题:
简单说:目前的MoE 看起来是“多专家会诊”,实际上专家经常“兼职”,还爱抄彼此作业。
💡 为了解决这两个问题,DeepSeek 提出了 DeepSeekMoE,核心为:
直觉理解:
以前:一个 token 找两个“万金油医生”; 现在:一个 token 可以同时挂很多“专科门诊”,组合出更细腻的“专家拼盘”。
🧠 可能有些读者和博主一样,刚开始无法理解,一个模型怎么就拆成多个小专家了,实际上,在 LLM 里的 MoE,所谓的“专家(Expert)不是一整个大模型,而只是 Transformer 里的一小块子网络——通常就是 FFN 那一块,大概只有两三层全连接层而已。大骨架:Attention / Norm / Embedding 这些大家都共用,某些层里的 FFN 被换成“很多个 Expert FFN + 一个 Router”。简易代码:
class DenseFFN(nn.Module):
def __init__(self, d_model, d_ff):
super().__init__()
self.w1 = nn.Linear(d_model, d_ff)
self.w2 = nn.Linear(d_ff, d_model)
def forward(self, x):
# x: [B, L, D]
h = F.relu(self.w1(x)) # [B, L, d_ff]
y = self.w2(h) # [B, L, D]
return y
除此之外,如果不对专家的使用进行限制,Router 可能把 80% 甚至 90% 的 token 都丢给少数几个“强专家”,可能会导致:
为了解决这个问题,DeepSeekMoE 加入了负载均衡损失(balance loss):
统计每个 Expert 在一个 batch 内被路由到的 token 比例 / gate 概率平均值;鼓励这些比例不要差得太夸张,让所有专家都有机会参与训练。
DeepSeek 的 MoE 是多卡/多节点并行的,不同卡上放着不同 Expert;Device-level loss 进一步约束:不同设备上的整体专家负载也要相对均衡,避免某几块 GPU 爆掉、其他 GPU 闲着。
import torch
import torch.nn as nn
import torch.nn.functional as F
class MoEBalanceLoss(nn.Module):
def __init__(self, alpha_expert=1e-2, alpha_device=1e-2):
super().__init__()
self.alpha_expert = alpha_expert
self.alpha_device = alpha_device
def forward(self,
gate_scores: torch.Tensor,
topk_idx: torch.Tensor,
B, L, E = gate_scores.shape
K = topk_idx.shape[-1]
device = gate_scores.device
T = B * L
gate_flat = gate_scores.reshape(T, E) # [T, E]
importance = gate_flat.sum(dim=0) # [E]
importance = importance / (importance.sum() + 1e-9) # 归一化成概率分布
load = torch.zeros(E, device=device) # [E]
topk_flat = topk_idx.reshape(-1) # [T*K]
load = load.scatter_add(
0,
topk_flat,
torch.ones_like(topk_flat, dtype=torch.float, device=device)
)
load = load / (load.sum() + 1e-9)
expert_target = torch.full_like(importance, 1.0 / E)
loss_importance = ((importance - expert_target) ** 2).mean()
loss_load = ((load - expert_target) ** 2).mean()
expert_balance_loss = loss_importance + loss_load
expert_balance_loss = self.alpha_expert * expert_balance_loss
device_balance_loss = torch.tensor(0.0, device=device)
if expert2device is not None:
num_devices = int(expert2device.max().item()) + 1
importance_per_dev = torch.zeros(num_devices, device=device)
load_per_dev = torch.zeros(num_devices, device=device)
importance_per_dev = importance_per_dev.scatter_add(
0, expert2device, importance
)
load_per_dev = load_per_dev.scatter_add(
0, expert2device, load
)
importance_per_dev = importance_per_dev / (importance_per_dev.sum() + 1e-9)
load_per_dev = load_per_dev / (load_per_dev.sum() + 1e-9)
device_target = torch.full_like(importance_per_dev, 1.0 / num_devices)
loss_importance_dev = ((importance_per_dev - device_target) ** 2).mean()
loss_load_dev = ((load_per_dev - device_target) ** 2).mean()
device_balance_loss = loss_importance_dev + loss_load_dev
device_balance_loss = self.alpha_device * device_balance_loss
total_loss = expert_balance_loss + device_balance_loss
stats = {
"moe/expert_balance_loss": expert_balance_loss.detach(),
"moe/device_balance_loss": device_balance_loss.detach(),
"moe/importance_std": importance.std().detach(),
"moe/load_std": load.std().detach(),
}
if expert2device is not None:
stats["moe/importance_dev_std"] = importance_per_dev.std().detach()
stats["moe/load_dev_std"] = load_per_dev.std().detach()
return total_loss, stats前面在 GQA 的学习中我们曾经说过:大模型推理阶段最贵的不是算力,而是 KV Cache 占的显存 + 带宽。
那 DeepSeek-V2 做的事情是:
在 GQA 的基础上,再把 K/V 本身压到一个更低维的 latent 空间里,只缓存 latent 向量。
具体来说,DeepSeek-V2 报告提到了:在 128K 上下文下,KV cache 减少约 93.3%,吞吐提升到原来的 5.76×,而模型效果还能打。这套机制就是 Multi-head Latent Attention(MLA)。
GQA的数学推理过程:
👉 MLA 的操作是:在生成 K/V 之前,先压一遍低维 latent:
训练 / naive 推理视角:看起来只是多插了一层低维投影;
真正推理实现视角:只需要 缓存 \mathbf{u} ,而 K/V 可以通过矩阵乘法在需要时重建,甚至可以进一步被“吸收”进 Q / O 的投影矩阵里。
import torch
import torch.nn as nn
import torch.nn.functional as F
class MLAAttention(nn.Module):
def __init__(self, d_model, num_heads, d_latent, dropout=0.0):
"""
d_model : hidden 维度
num_heads : 注意力头数(这里先按标准 MHA 写)
d_latent : MLA 的 latent 维度 (一般远小于 d_model)
"""
super().__init__()
assert d_model % num_heads == 0
self.d_model = d_model
self.num_heads = num_heads
self.head_dim = d_model // num_heads
self.d_latent = d_latent
# Q 仍然从原 hidden 里来
self.w_q = nn.Linear(d_model, d_model)
# 额外一条低维 latent 投影:x -> u
self.w_u = nn.Linear(d_model, d_latent)
# u 再生成 K / V(这里先写成多头形式,方便理解)
self.w_uk = nn.Linear(d_latent, num_heads * self.head_dim)
self.w_uv = nn.Linear(d_latent, num_heads * self.head_dim)
# 输出投影
self.w_o = nn.Linear(d_model, d_model)
self.dropout = nn.Dropout(dropout)
def _split_heads(self, x):
"""
x: [B, L, H * Dh] -> [B, H, L, Dh]
"""
B, L, D = x.size()
x = x.view(B, L, self.num_heads, self.head_dim)
return x.transpose(1, 2)
def forward(self, x, attn_mask=None):
"""
x: [B, L, d_model]
说明:这是训练 / naive 推理版本:
- 每一步都显式生成 K/V
- 真正高效推理时,只需要缓存 u,然后在 kernel 里重建 / 吸收 K/V
"""
B, L, _ = x.size()
# 1. Q:标准多头投影
q = self.w_q(x) # [B, L, d_model]
q = q.view(B, L, self.num_heads, self.head_dim).transpose(1, 2)
# q: [B, H, L, Dh]
# 2. latent:x -> u(低维)
u = self.w_u(x) # [B, L, d_latent]
# 3. u 生成多头 K / V
k = self.w_uk(u) # [B, L, H*Dh]
v = self.w_uv(u)
k = self._split_heads(k) # [B, H, L, Dh]
v = self._split_heads(v) # [B, H, L, Dh]
# (如果你已经实现了 RoPE,可以在这里对 q/k 做一次 rotary_embedding)
# 4. 缩放点积注意力
scores = q @ k.transpose(-2, -1) / (self.head_dim ** 0.5) # [B, H, L, L]
if attn_mask is not None:
scores = scores.masked_fill(attn_mask == 0, float('-inf'))
attn = F.softmax(scores, dim=-1)
attn = self.dropout(attn)
out = attn @ v # [B, H, L, Dh]
# 5. 合并头 + 输出投影
out = out.transpose(1, 2).contiguous().view(B, L, self.d_model)
out = self.w_o(out)
return out回答要点:
详细看第二章
关键点:
“把原来的 FFN 换成一个 Top-1 MoE FFN,不做太多工程优化,只把整体逻辑理清楚。”
import torch
import torch.nn as nn
import torch.nn.functional as F
class ExpertFFN(nn.Module):
"""一个标准的 SwiGLU FFN,可以看做单个 Expert。"""
def __init__(self, d_model, d_ff):
super().__init__()
# SwiGLU: (x W1) ⊗ σ(x W2) 再过 W3
self.w1 = nn.Linear(d_model, d_ff, bias=False)
self.w2 = nn.Linear(d_model, d_ff, bias=False)
self.w3 = nn.Linear(d_ff, d_model, bias=False)
def forward(self, x):
# x: [B, L, D]
a = self.w1(x)
b = self.w2(x)
x = F.silu(b) * a
return self.w3(x)
class Top1GatedMoE(nn.Module):
"""
极简版 Top-1 MoE:
- 有 num_experts 个 ExpertFFN
- Router 为每个 token 选出 1 个专家
- 不实现容量限制 / load-balance,只理顺计算图
"""
def __init__(self, d_model, d_ff, num_experts):
super().__init__()
self.d_model = d_model
self.num_experts = num_experts
# 一堆专家,可以想象成 DeepSeek 里的 routed experts
self.experts = nn.ModuleList(
[ExpertFFN(d_model, d_ff) for _ in range(num_experts)]
)
# Router:根据 token hidden state 选择专家
self.gate = nn.Linear(d_model, num_experts, bias=False)
def forward(self, x):
"""
x: [B, L, D]
返回形状仍然是 [B, L, D]
"""
B, L, D = x.shape
# 1. 计算 gating logits
logits = self.gate(x) # [B, L, E]
gate_scores = F.softmax(logits, -1)
# 2. 取 Top-1 专家
top1_score, top1_idx = gate_scores.max(dim=-1) # [B, L]
# 3. 为了方便实现,我们按专家拆分 token
# 实际工程里会用更高效的 scatter/gather 写法
out = torch.zeros_like(x)
for expert_id, expert in enumerate(self.experts):
# 找出路由到该 expert 的位置
mask = (top1_idx == expert_id) # [B, L]
if not mask.any():
continue
# 取出这些 token
# 展开成一个二维列表 [N_tokens, D]
selected_x = x[mask] # [N_tokens, D]
# 过专家 FFN
selected_out = expert(selected_x.unsqueeze(1)) # [N_tokens, 1, D]
selected_out = selected_out.squeeze(1) # [N_tokens, D]
# 乘上 gate score(Top-1,本质是一个系数)
selected_score = top1_score[mask].unsqueeze(-1) # [N_tokens, 1]
selected_out = selected_out * selected_score
# 写回原位置
out[mask] = selected_out
return out在本章的学习中,我们主要学习了DeepSeek的关键架构MOE以及MLA,分析了Deepseek的架构以及成功关键。除此之外,DeepSeek的后训练也为DeepSeek的成功奠定了强大的基础。下一章,将继续讲解Deepseek后训练的核心关键——GRPO.
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。