首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >【多模态大模型面经】 DeepSeek专题: DeepSeekMOE + MLA

【多模态大模型面经】 DeepSeek专题: DeepSeekMOE + MLA

原创
作者头像
九年义务漏网鲨鱼
发布2025-11-21 17:22:03
发布2025-11-21 17:22:03
8130
举报

前言

✍ 在上一章里,我们已经把现代 LLM 的“四件标配武器”(GQA / RoPE / SwiGLU / RMSNorm + Pre-Norm)系统的学习一遍。现在再看 LLaMA、Qwen、Gemma 这类模型,你就不会觉得学起来那么费劲,因为这些模块早就成为现在的baseline了。学习完这些标配,可以帮助我们更快的把握文章的重点是什么。在这一章中,我们正式学习25年最火的中文模型——DeepSeek 系列。

一、DeepSeek 系列

技术圈有一个有趣的现象:真正爆火的那个时间点,往往已经是技术“量变到质变”的结果,而不是起点。

DeepSeek 也是这样——大家印象里是某个时间点突然刷屏、对标海外大模型,但如果往前扒时间线,你会发现它的技术演进其实是循序渐进 + 一路扩写 MoE 这条路线的。为了后面好讲清楚 DeepSeek 的架构设计,我们先用一小节,把它的几个关键“里程碑版本”梳理一下:

① 2024 年初: DeepSeek-V1 发布,提出 DeepSeekMoE 架构

  • 重点在于:相比传统 MoE(Switch / GShard),通过细粒度专家划分 + 共享专家,让专家“更专、更不混”。

② 2024 年中:DeepSeek-V2 发布,架构升级为大规模 Decoder-only LLM + DeepSeekMoE + MLA(Multi-head Latent Attention);

  • 总参数量上百亿甚至百亿级别,但每个 token 只激活其中一小部分参数;
  • MLA 用来极限压缩 Attention 的 KV Cache,显著降低长上下文推理的显存与带宽成本。

③ 2024 年底及之后:V2.5 / V3 / R1 等迭代,在 同一条 MoE + MLA 技术路线上继续加码,扩大总参数量、增强代码/推理能力;

看完DeepSeek模型的时间线后,是不是发现,实际上DeepSeek的主要内容就是一个DeepSeekMoE + MLA。

  • DeepSeekMoE:把 FFN 层变成稀疏激活的 MoE ;
  • MLA(Multi-head Latent Attention):把 KV cache 压到一个低秩 latent 空间里,极限节省显存,提升吞吐。

二、MOE→DeepSeekMoE

2.1 MOE 基础概念

在普通 Transformer Block 中,对于所有的token,FFN 是这样的将其一视同仁的输入输出,公式为:

\text{FFN}(x) = W_2 \,\sigma(W_1 x)

而在 MoE(Mixture-of-Experts) 里,你可以粗暴理解成:“把原来一个大 FFN,拆成很多个 Expert,小模型(Router/Gate)决定某个 token 去找哪个 Expert 看病。在DeepSeek论文中,把每一个小模型称为一个Router,为每个token分配路径

① 给定 N 个专家: E_1, \dots, E_N

Router 给每个 token 出一个打分:p = \text{softmax}(W_{\text{gate}} x) \in \mathbb{R}^N

③ 然后选 Top-K 个专家给它服务:y = \sum_{i\in\text{TopK}(p)} p_i \, E_i(x)

虽然需要很多个专家,参数量可以堆到非常大,但每个 token 只走 K 个 Expert,算力是比较稀疏的,只要 Router 分配合理,就能得到“参数大、算得不多”的甜蜜点。

  • 路由器Router代码实现
代码语言:PYTHON
复制
import torch
import torch.nn as nn
import torch.nn.functional as F


class Router(nn.Module):
    """给每个 token 生成一个长度为 num_experts 的 gate 分布。"""
    def __init__(self, d_model, num_experts):
        super().__init__()
        self.num_experts = num_experts
        self.gate = nn.Linear(d_model, num_experts, bias=False)

    def forward(self, x):
        """
        x: [B, L, D]
        return:
          gate_scores: [B, L, num_experts],每个 token 的专家概率分布
        """
        logits = self.gate(x)                 # [B, L, E]
        gate_scores = F.softmax(logits, dim=-1)
        return gate_scores
  • Top-1 Gated MoE代码实现
代码语言:python
复制
class Top1GatedMoE(nn.Module):
    """
    极简版 Top-1 MoE:
    - Router 给每个 token 选 1 个专家
    - 实现公式:y = p_i * E_i(x)
    - 不实现负载均衡 / 容量限制,先把计算图搭清楚
    """
    def __init__(self, d_model, d_ff, num_experts):
        super().__init__()
        self.d_model = d_model
        self.num_experts = num_experts

        # 一堆专家 E_1 ... E_N
        self.experts = nn.ModuleList(
            [ExpertFFN(d_model, d_ff) for _ in range(num_experts)]
        )
        # Router: W_gate
        self.router = Router(d_model, num_experts)

    def forward(self, x):
        """
        x: [B, L, D]
        return:
          y: [B, L, D]
        """
        B, L, D = x.shape
        gate_scores = self.router(x)                    # [B, L, E]

        top1_score, top1_idx = gate_scores.max(dim=-1)  # [B, L], [B, L]

        x_flat = x.reshape(B * L, D)                    # [T, D], T = B*L
        idx_flat = top1_idx.reshape(B * L)              # [T]
        score_flat = top1_score.reshape(B * L)          # [T]

        out_flat = torch.zeros_like(x_flat)             # [T, D]

        for expert_id, expert in enumerate(self.experts):
            mask = (idx_flat == expert_id)              # [T]
            if not mask.any():
                continue

            token_idx = mask.nonzero(as_tuple=True)[0]  # [N_e]
            x_e = x_flat[token_idx]                     # [N_e, D]
            s_e = score_flat[token_idx].unsqueeze(-1)   # [N_e, 1]

            # 过 expert FFN
            y_e = expert(x_e)                           # [N_e, D]

            y_e = y_e * s_e

            out_flat[token_idx] = y_e

        # 3) reshape 回 [B, L, D]
        out = out_flat.view(B, L, D)
        return out

2.2 DeepSeekMoE

DeepSeekMoE 这篇 ACL 长文里,作者先点名了传统 Top-K MoE 的两个问题:

  1. 知识混叠(Knowledge Hybridity):专家数量有限(比如 8/16 个),一个 Expert 里塞了太多杂乱知识。同一个 Expert 既要会数学、又要会写代码、又要会闲聊——很难做到专精。
  2. 知识冗余(Knowledge Redundancy): 不同专家学到的大部分东西是重复的;

简单说:目前的MoE 看起来是“多专家会诊”,实际上专家经常“兼职”,还爱抄彼此作业。

💡 为了解决这两个问题,DeepSeek 提出了 DeepSeekMoE,核心为:

2.2.1 Fine-Grained Expert Segmentation(细粒度专家划分)
  • 不是只搞 N 个大专家,而是 把专家进一步切成 mN 个小专家
  • Top-K 时,实际激活的是 mK 个小专家,可以组成 更细腻的专家组合

直觉理解:

以前:一个 token 找两个“万金油医生”; 现在:一个 token 可以同时挂很多“专科门诊”,组合出更细腻的“专家拼盘”。

🧠 可能有些读者和博主一样,刚开始无法理解,一个模型怎么就拆成多个小专家了,实际上,在 LLM 里的 MoE,所谓的“专家(Expert)不是一整个大模型,而只是 Transformer 里的一小块子网络——通常就是 FFN 那一块,大概只有两三层全连接层而已。大骨架:Attention / Norm / Embedding 这些大家都共用,某些层里的 FFN 被换成“很多个 Expert FFN + 一个 Router”。简易代码:

代码语言:python
复制
class DenseFFN(nn.Module):
    def __init__(self, d_model, d_ff):
        super().__init__()
        self.w1 = nn.Linear(d_model, d_ff)
        self.w2 = nn.Linear(d_ff, d_model)

    def forward(self, x):
        # x: [B, L, D]
        h = F.relu(self.w1(x))   # [B, L, d_ff]
        y = self.w2(h)           # [B, L, D]
        return y
2.2.2 Shared Expert Isolation(共享专家隔离)
  • 专门拿出若干个 Shared Experts,始终激活;
  • 专门负责“公共知识”(比如语法、基础常识),
  • 其他 Routed Experts 就可以更专注在数学、代码、多语言等特化领域,减少重复建设。
2.2.3 负载均衡损失

除此之外,如果不对专家的使用进行限制,Router 可能把 80% 甚至 90% 的 token 都丢给少数几个“强专家”,可能会导致:

  • 专家负载严重不均衡:有的 Expert 超载,有的几乎不参与训练;
  • 有效容量下降:本来有 N 个专家,结果只用活了 2~3 个,等于退化回“伪 MoE / 几乎 dense”;
  • Routing Collapse(路由塌缩):随着训练推进,大部分 token 永远只去固定几个专家。

为了解决这个问题,DeepSeekMoE 加入了负载均衡损失(balance loss)

  • Expert-Level Balance Loss(专家层面的均衡损失)

统计每个 Expert 在一个 batch 内被路由到的 token 比例 / gate 概率平均值;鼓励这些比例不要差得太夸张,让所有专家都有机会参与训练。

  • Device-Level Balance Loss(设备层面的均衡损失)

DeepSeek 的 MoE 是多卡/多节点并行的,不同卡上放着不同 Expert;Device-level loss 进一步约束:不同设备上的整体专家负载也要相对均衡,避免某几块 GPU 爆掉、其他 GPU 闲着。

  • 代码实现
代码语言:python
复制
import torch
import torch.nn as nn
import torch.nn.functional as F


class MoEBalanceLoss(nn.Module):
    
    def __init__(self, alpha_expert=1e-2, alpha_device=1e-2):
        super().__init__()
        self.alpha_expert = alpha_expert
        self.alpha_device = alpha_device

    def forward(self,
                gate_scores: torch.Tensor,
                topk_idx: torch.Tensor,
        B, L, E = gate_scores.shape
        K = topk_idx.shape[-1]
        device = gate_scores.device

        T = B * L
        gate_flat = gate_scores.reshape(T, E)  # [T, E]

        importance = gate_flat.sum(dim=0)              # [E]
        importance = importance / (importance.sum() + 1e-9)  # 归一化成概率分布
        load = torch.zeros(E, device=device)           # [E]
        topk_flat = topk_idx.reshape(-1)               # [T*K]
        load = load.scatter_add(
            0,
            topk_flat,
            torch.ones_like(topk_flat, dtype=torch.float, device=device)
        )
        load = load / (load.sum() + 1e-9)            

        expert_target = torch.full_like(importance, 1.0 / E)

        loss_importance = ((importance - expert_target) ** 2).mean()
        loss_load = ((load - expert_target) ** 2).mean()

        expert_balance_loss = loss_importance + loss_load
        expert_balance_loss = self.alpha_expert * expert_balance_loss

        device_balance_loss = torch.tensor(0.0, device=device)

        if expert2device is not None:
            num_devices = int(expert2device.max().item()) + 1

   
            importance_per_dev = torch.zeros(num_devices, device=device)
            load_per_dev = torch.zeros(num_devices, device=device)

            importance_per_dev = importance_per_dev.scatter_add(
                0, expert2device, importance
            )
            load_per_dev = load_per_dev.scatter_add(
                0, expert2device, load
            )

            importance_per_dev = importance_per_dev / (importance_per_dev.sum() + 1e-9)
            load_per_dev = load_per_dev / (load_per_dev.sum() + 1e-9)

            device_target = torch.full_like(importance_per_dev, 1.0 / num_devices)

            loss_importance_dev = ((importance_per_dev - device_target) ** 2).mean()
            loss_load_dev = ((load_per_dev - device_target) ** 2).mean()

            device_balance_loss = loss_importance_dev + loss_load_dev
            device_balance_loss = self.alpha_device * device_balance_loss

        total_loss = expert_balance_loss + device_balance_loss

        stats = {
            "moe/expert_balance_loss": expert_balance_loss.detach(),
            "moe/device_balance_loss": device_balance_loss.detach(),
            "moe/importance_std": importance.std().detach(),
            "moe/load_std": load.std().detach(),
        }
        if expert2device is not None:
            stats["moe/importance_dev_std"] = importance_per_dev.std().detach()
            stats["moe/load_dev_std"] = load_per_dev.std().detach()

        return total_loss, stats

三、Multi-head Latent Attention(MLA)

前面在 GQA 的学习中我们曾经说过:大模型推理阶段最贵的不是算力,而是 KV Cache 占的显存 + 带宽。

  • 标准 MHA:每个头都要一份 K_h, V_h ,KV Cache ∼ \mathcal{O}(H \cdot L \cdot d_{\text{head}})
  • GQA / MQA:通过共享 / 分组 K/V,已经能把 KV Cache 压到大概原来的 1/H 甚至更少

那 DeepSeek-V2 做的事情是:

在 GQA 的基础上,再把 K/V 本身压到一个更低维的 latent 空间里,只缓存 latent 向量。

具体来说,DeepSeek-V2 报告提到了:在 128K 上下文下,KV cache 减少约 93.3%,吞吐提升到原来的 5.76×,而模型效果还能打。这套机制就是 Multi-head Latent Attention(MLA)

3.1 MLA 的核心思路:把 K/V 压成一个低维 latent 向量

GQA的数学推理过程:

  • 输入隐藏状态:\mathbf{x} \in \mathbb{R}^{d_{\text{model}}}
  • 多头投影:Q = x W^Q,\quad K = x W^K,\quad V = x W^V
  • 推理时,我们缓存的是所有历史 token 的 K, V(按照 GQA 已经共享掉一部分 head)

👉 MLA 的操作是:在生成 K/V 之前,先压一遍低维 latent:

  1. 把 token 隐状态 $\mathbf{x}$ 投影到一个低维 latent 空间:\mathbf{u} = x W^U,\quad \mathbf{u} \in \mathbb{R}^{d_u},\; d_u \ll d_{\text{model}}
  2. 然后用这条低维 $\mathbf{u}$,去生成所有头共享的 K/V: \mathbf{u} W^{UK},\quad V = \mathbf{u} W^{UV}
  3. Q 还是从原始 hidden / content 里来: $Q_h = x W^Q_h$

训练 / naive 推理视角:看起来只是多插了一层低维投影;

真正推理实现视角:只需要 缓存 \mathbf{u}而 K/V 可以通过矩阵乘法在需要时重建,甚至可以进一步被“吸收”进 Q / O 的投影矩阵里。

  • 代码实现
代码语言:PYTHON
复制
import torch
import torch.nn as nn
import torch.nn.functional as F


class MLAAttention(nn.Module):
    def __init__(self, d_model, num_heads, d_latent, dropout=0.0):
        """
        d_model : hidden 维度
        num_heads : 注意力头数(这里先按标准 MHA 写)
        d_latent : MLA 的 latent 维度 (一般远小于 d_model)
        """
        super().__init__()
        assert d_model % num_heads == 0
        self.d_model = d_model
        self.num_heads = num_heads
        self.head_dim = d_model // num_heads
        self.d_latent = d_latent

        # Q 仍然从原 hidden 里来
        self.w_q = nn.Linear(d_model, d_model)

        # 额外一条低维 latent 投影:x -> u
        self.w_u = nn.Linear(d_model, d_latent)

        # u 再生成 K / V(这里先写成多头形式,方便理解)
        self.w_uk = nn.Linear(d_latent, num_heads * self.head_dim)
        self.w_uv = nn.Linear(d_latent, num_heads * self.head_dim)

        # 输出投影
        self.w_o = nn.Linear(d_model, d_model)
        self.dropout = nn.Dropout(dropout)

    def _split_heads(self, x):
        """
        x: [B, L, H * Dh] -> [B, H, L, Dh]
        """
        B, L, D = x.size()
        x = x.view(B, L, self.num_heads, self.head_dim)
        return x.transpose(1, 2)

    def forward(self, x, attn_mask=None):
        """
        x: [B, L, d_model]
        说明:这是训练 / naive 推理版本:
          - 每一步都显式生成 K/V
          - 真正高效推理时,只需要缓存 u,然后在 kernel 里重建 / 吸收 K/V
        """
        B, L, _ = x.size()

        # 1. Q:标准多头投影
        q = self.w_q(x)                               # [B, L, d_model]
        q = q.view(B, L, self.num_heads, self.head_dim).transpose(1, 2)
        # q: [B, H, L, Dh]

        # 2. latent:x -> u(低维)
        u = self.w_u(x)                               # [B, L, d_latent]

        # 3. u 生成多头 K / V
        k = self.w_uk(u)                              # [B, L, H*Dh]
        v = self.w_uv(u)
        k = self._split_heads(k)                      # [B, H, L, Dh]
        v = self._split_heads(v)                      # [B, H, L, Dh]

        # (如果你已经实现了 RoPE,可以在这里对 q/k 做一次 rotary_embedding)

        # 4. 缩放点积注意力
        scores = q @ k.transpose(-2, -1) / (self.head_dim ** 0.5)  # [B, H, L, L]
        if attn_mask is not None:
            scores = scores.masked_fill(attn_mask == 0, float('-inf'))

        attn = F.softmax(scores, dim=-1)
        attn = self.dropout(attn)

        out = attn @ v                                 # [B, H, L, Dh]

        # 5. 合并头 + 输出投影
        out = out.transpose(1, 2).contiguous().view(B, L, self.d_model)
        out = self.w_o(out)
        return out

四、面经

🧠 1. DeepSeek 系列在架构上和传统 GPT / LLaMA 有什么不同?

回答要点:

  • Backbone 方面很像:都是 Decoder-only + RoPE + RMSNorm + SwiGLU + GQA
  • 最大的差别在于两点:
    1. FFN 换成了 DeepSeekMoE(激进 MoE + 专家专化)
    2. Attention 上用 MLA 做 KV cache 压缩,大幅降低显存和带宽开销
🧠 2. 传统 MoE 有什么问题?DeepSeekMoE 提出了什么改进?

详细看第二章


🧠 3. 为什么说 DeepSeek 是“参数大、算得少”?

关键点:

  • 解释“总参数 vs 激活参数”的概念;(以V2为例,总参数:约 236B;每个 token 实际激活:约 21B;)
  • 说明 MoE 是稀疏激活结构,每个 token 只走少数几个专家;
  • 顺手提一句 DeepSeekMoE 用的是细粒度专家划分 + Shared Expert,提升专家利用效率;

五、代码手撕

“把原来的 FFN 换成一个 Top-1 MoE FFN,不做太多工程优化,只把整体逻辑理清楚。”

代码语言:python
复制
import torch
import torch.nn as nn
import torch.nn.functional as F

class ExpertFFN(nn.Module):
    """一个标准的 SwiGLU FFN,可以看做单个 Expert。"""
    def __init__(self, d_model, d_ff):
        super().__init__()
        # SwiGLU: (x W1) ⊗ σ(x W2) 再过 W3
        self.w1 = nn.Linear(d_model, d_ff, bias=False)
        self.w2 = nn.Linear(d_model, d_ff, bias=False)
        self.w3 = nn.Linear(d_ff, d_model, bias=False)

    def forward(self, x):
        # x: [B, L, D]
        a = self.w1(x)
        b = self.w2(x)
        x = F.silu(b) * a
        return self.w3(x)


class Top1GatedMoE(nn.Module):
    """
    极简版 Top-1 MoE:
    - 有 num_experts 个 ExpertFFN
    - Router 为每个 token 选出 1 个专家
    - 不实现容量限制 / load-balance,只理顺计算图
    """
    def __init__(self, d_model, d_ff, num_experts):
        super().__init__()
        self.d_model = d_model
        self.num_experts = num_experts

        # 一堆专家,可以想象成 DeepSeek 里的 routed experts
        self.experts = nn.ModuleList(
            [ExpertFFN(d_model, d_ff) for _ in range(num_experts)]
        )

        # Router:根据 token hidden state 选择专家
        self.gate = nn.Linear(d_model, num_experts, bias=False)

    def forward(self, x):
        """
        x: [B, L, D]
        返回形状仍然是 [B, L, D]
        """
        B, L, D = x.shape

        # 1. 计算 gating logits
        logits = self.gate(x)              # [B, L, E]
        gate_scores = F.softmax(logits, -1)

        # 2. 取 Top-1 专家
        top1_score, top1_idx = gate_scores.max(dim=-1)  # [B, L]

        # 3. 为了方便实现,我们按专家拆分 token
        #   实际工程里会用更高效的 scatter/gather 写法
        out = torch.zeros_like(x)

        for expert_id, expert in enumerate(self.experts):
            # 找出路由到该 expert 的位置
            mask = (top1_idx == expert_id)           # [B, L]
            if not mask.any():
                continue

            # 取出这些 token
            # 展开成一个二维列表 [N_tokens, D]
            selected_x = x[mask]                     # [N_tokens, D]

            # 过专家 FFN
            selected_out = expert(selected_x.unsqueeze(1))  # [N_tokens, 1, D]
            selected_out = selected_out.squeeze(1)          # [N_tokens, D]

            # 乘上 gate score(Top-1,本质是一个系数)
            selected_score = top1_score[mask].unsqueeze(-1) # [N_tokens, 1]
            selected_out = selected_out * selected_score

            # 写回原位置
            out[mask] = selected_out

        return out

六、总结

在本章的学习中,我们主要学习了DeepSeek的关键架构MOE以及MLA,分析了Deepseek的架构以及成功关键。除此之外,DeepSeek的后训练也为DeepSeek的成功奠定了强大的基础。下一章,将继续讲解Deepseek后训练的核心关键——GRPO.

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 一、DeepSeek 系列
  • 二、MOE→DeepSeekMoE
    • 2.1 MOE 基础概念
    • 2.2 DeepSeekMoE
      • 2.2.1 Fine-Grained Expert Segmentation(细粒度专家划分)
      • 2.2.2 Shared Expert Isolation(共享专家隔离)
      • 2.2.3 负载均衡损失
  • 三、Multi-head Latent Attention(MLA)
    • 3.1 MLA 的核心思路:把 K/V 压成一个低维 latent 向量
  • 四、面经
    • 🧠 1. DeepSeek 系列在架构上和传统 GPT / LLaMA 有什么不同?
    • 🧠 2. 传统 MoE 有什么问题?DeepSeekMoE 提出了什么改进?
    • 🧠 3. 为什么说 DeepSeek 是“参数大、算得少”?
  • 五、代码手撕
  • 六、总结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档