
基于真实股票数据同步项目的死锁案例分析
buffer_unordered + mpsc 并发模式中的隐藏陷阱
大家好,今天分享一个我在实际项目中踩过的坑——一个“静悄悄”的死锁 bug。
在开发股票数据同步服务时,全量同步任务执行到一半:基础信息顺利同步完成,但日线行情数据突然完全不拉取了。程序既不报错,也不崩溃,就这么静静地“卡住”了(hang)。
这种并发死锁是最难调试的类型之一:没有 panic、没有栈溢出,就是永远不往下走了。今天我们就来彻底拆解这个 bug 的成因、复现方式和多种解决方案。
现象:并发拉取数千只股票数据时,部分任务正常完成,后续任务却永远卡在 send() 操作上。主线程也卡在等待 stream 完成,导致整个程序永久挂起。
技术栈:
tokio::sync::mpsc(有界通道)futures::stream::StreamExt::buffer_unordered 控制并发核心模式是:多个异步任务并发执行 → 通过 channel 收集结果 → 主线程统一处理。
1. mpsc Channel 工作原理
mpsc 是多生产者单消费者通道。有界通道(bounded)有固定容量:
tx.send().await 会挂起,等待消费者 recv() 释放空间。recv() 返回 None 表示通道关闭。2. buffer_unordered(n) 工作原理
它允许同时运行最多 n 个 Future,按完成顺序(而非提交顺序)产出结果。
stream.next().await 会等待下一个完成的任务。两者结合本该高效,但容量设置不当就会出大问题。
let concurrency =; // 并发数
let total_stocks =; // 总股票数
// ❌ 致命错误:channel 容量等于并发数
let (tx, mut rx) = mpsc::channel(concurrency);
let mut stream = futures::stream::iter(..total_stocks)
.map(|i| {
let tx = tx.clone();
async move {
let result = fetch_stock_data(i).await;
tx.send(result).await.unwrap(); // ← 这里可能死锁!
}
})
.buffer_unordered(concurrency);
// ❌ 致命错误:先把 stream 消费完,再消费 channel
while let Some(_) = stream.next().await {} // 等待所有任务完成
drop(tx); // 关闭发送端
while let Some(msg) = rx.recv().await {} // 消费结果这个模式在任务少、运气好时能跑通,但总任务数远大于并发数时,就很容易死锁。
死锁的核心是循环等待:
buffer_unordered(20) 同时运行 20 个任务。send() 开始等待空间释放。stream.next().await 上等待“下一个任务完成”。send() 成功,而 send() 依赖主线程去 recv() —— 却永远轮不到。结果:send() 等 recv(),stream.next() 等任务完成,recv() 又在 stream 循环之后,三方互相卡死,形成经典死锁环。
这不是 100% 复现,而是取决于任务完成时序,所以特别隐蔽。

// 修改前:容量 = 并发数
let (tx, mut rx) = mpsc::channel(concurrency);
// 修改后:容量 >= 总任务数
let (tx, mut rx) = mpsc::channel(total_stocks);优点:改动极小,只需一行代码。 缺点:内存占用略增(5000 条小消息通常只有几百 KB,完全可接受)。
适合任务总数可控、需要收集详细结果的场景。
用 tokio::spawn 把 channel 消费放到独立任务中,同时运行:
let rx_task = tokio::spawn(async move {
let mut success =;
let mut fail =;
while let Some(msg) = rx.recv().await {
// 处理消息...
}
(success, fail)
});
// 主线程继续消费 stream
while let Some(_) = stream.next().await {}
drop(tx);
let stats = rx_task.await.unwrap();这样避免了“先消费完一个再消费另一个”的顺序依赖。
let (tx, mut rx) = mpsc::unbounded_channel(); // 永不阻塞警告:生产者远快于消费者时可能导致内存爆炸(OOM),仅在任务总数明确可控时使用。
用 Arc<AtomicU64> 统计成功/失败数,直接在每个任务里写数据库,无需 channel。
优点:彻底消除死锁风险,代码最干净。 缺点:无法轻松收集每只股票的详细失败信息。
我们的股票同步项目选择了方案一:任务数 5000 可控,内存开销 negligible,且需要知道具体哪只股票失败以便重试。
1. 永远不要在同一个异步任务中同时等待互斥资源
每次 .await 前都要问:这个等待会不会被另一个等待阻塞?
2. channel 容量必须 ≥ 并发数(更安全 ≥ 总任务数)
buffer_unordered(n) + mpsc::channel(m) 时,m < n 极易死锁。
3. 优先使用 tokio::select! 处理多个异步事件,避免顺序等待。
4. 为可疑 .await 添加超时保护,便于开发时快速发现问题:
use tokio::time::{timeout, Duration};
timeout(Duration::from_secs(), tx.send(result)).await5. 调试技巧:
send() 前后加 tracing::debug! 日志tokio-console 可视化任务状态-- --nocapture 实时看日志Rust 异步强大,但 buffer_unordered + 有界 mpsc 的组合如果容量设置不当,就会埋下隐蔽的死锁炸弹。核心教训只有一句话:
生产者和消费者的节奏必须解耦,不要让它们在同一个“等待链”上互相卡住。
希望这篇基于真实项目的拆解,能帮你避开类似的坑。欢迎在评论区分享你遇到过的异步死锁案例,或者你更喜欢的解决方案!
Rust 异步之路,任重而道远,一起加油!
(完)