
Tokio 系列第六篇更新啦!
前五篇我们已经掌握了 Runtime 配置、网络编程基础和 Channel 通信。今天我们进入数据流处理领域 —— Stream。
如果你把 Future 比作“一次性的异步操作”,那 Stream 就是“异步版本的 Iterator” —— 它能持续产出一系列值,每个值到来时都可以 .await 处理。
在实时日志采集、WebSocket 消息处理、传感器数据流、Kafka 消费等场景中,Stream 都是神器。它和 Channel 结合后,能构建出优雅、高效的异步数据管道。
Stream 是 futures_core::stream::Stream trait(Tokio 通过 tokio-stream 提供便利):
trait Stream {
type Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>>;
}poll_next:尝试获取下一个值,返回 Ready(Some(item))、Ready(None)(流结束)或 Pending。tokio-stream crate 提供了:
iter、wrappers::ReceiverStream 等)StreamExt trait:大量组合子(combinators),类似 Iterator 的 map、filter、collect依赖添加(推荐):
[dependencies]
tokio = { version = "1", features = ["full"] }
tokio-stream = { version = "0.1", features = ["full"] } # 开启 time 等特性use tokio_stream::{self as stream, StreamExt};
#[tokio::main]
async fn main() {
// 从迭代器创建 Stream
let numbers = stream::iter(1..=5);
// 消费 Stream(类似 for 循环)
let mut s = numbers;
while let Some(num) = s.next().await {
println!("收到: {}", num);
}
}如果你发现 .next() 报错提示 Pin 相关问题,记得使用 tokio::pin!(s); 来固定流。
更实用:把 mpsc Receiver 转为 Stream(上一期 Channel 的延伸):
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
let (tx, rx) = mpsc::channel(32);
let mut stream = ReceiverStream::new(rx);
// 在其他 Task 中 tx.send(...)
while let Some(msg) = stream.next().await {
println!("处理消息: {}", msg);
}StreamExt 提供了强大且声明式的转换能力,让你像搭积木一样构建数据管道。
常用组合子示例:
如果你的过滤和映射逻辑不涉及异步 I/O(比如查数据库),不应该使用异步闭包。直接用同步闭包,性能最高且代码最简洁。
use tokio_stream::{self as stream, StreamExt}; // 别名导入,方便使用 stream::iter
use std::time::Duration;
#[tokio::main]
async fn main() {
let s = stream::iter(1..=10)
.filter(|&n| n % 2 == 0) // 修正:同步过滤偶数
.map(|n| n * n) // 映射平方
.take(3) // 取前 3 个
.timeout(Duration::from_secs(1)) // 超时控制
.filter_map(|res| { // 修正:同步处理 Result
match res {
Ok(v) => Some(v),
Err(_) => None,
}
});
// 提示:collect 需要引入 StreamExt
let result: Vec<i32> = s.collect().await;
println!("结果: {:?}", result); // 输出: [4, 16, 36]
}
如果你在处理流的过程中需要执行异步操作(例如:根据 ID 去数据库查询),你应该使用 .then()。这是将同步流转为异步处理管道的标准做法。
use tokio_stream::{self as stream, StreamExt};
use std::time::Duration;
#[tokio::main]
async fn main() {
let s = stream::iter(1..=10)
.filter(|&n| n % 2 == 0)
.map(|n| n * n)
// 使用 then 处理异步转换(如果内部有 await)
.then(|n| async move {
// 模拟异步操作,比如数据库查询
tokio::time::sleep(Duration::from_millis(10)).await;
n
})
.timeout(Duration::from_secs(1))
.map(|res| res.ok()) // 将 Result 转为 Option,超时变 None
.filter_map(|opt| opt); // 过滤掉 None
let result: Vec<i32> = s.collect().await;
println!("进阶结果: {:?}", result);
}更多实用组合子:
.throttle(duration):限流,每隔一段时间产出一个值.chunks(n) / .chunks_timeout(n, duration):批量处理(防小消息风暴).merge(other_stream):合并两个 Stream,按到达顺序交错输出.chain(other_stream):先处理完当前,再处理另一个.buffer_unordered(n):并发处理最多 n 个 item(适合 I/O 密集).zip(other_stream):配对两个 Stream 的值StreamMap(多流管理神器):
当你有动态的多个 Stream(如多个 WebSocket 连接)时,用 StreamMap 按 key 管理:
use tokio::sync::mpsc;
use tokio::time::{self, Duration};
use tokio_stream::{self as stream, StreamExt, StreamMap};
use tokio_stream::wrappers::{ReceiverStream, IntervalStream};
use std::pin::Pin;
// 专家技巧:定义一个类型别名,用于存放不同类型的流
// Pin<Box<dyn stream::Stream<Item = String> + Send>> 俗称 BoxStream
type BoxStream = Pin<Box<dyn stream::Stream<Item = String> + Send>>;
#[tokio::main]
async fn main() {
// 1. 构造第一个流:来自 mpsc 通道(模拟用户输入)
let (tx, rx) = mpsc::channel::<String>(10);
let client1_stream = ReceiverStream::new(rx);
// 2. 构造第二个流:来自 Interval 定时器(模拟心跳)
let interval = time::interval(Duration::from_secs(2));
let client2_stream = IntervalStream::new(interval)
.map(|t| format!("心跳滴答: {:?}", t));
// 3. 创建 StreamMap
// 重点:由于两个流原始类型不同,需要通过 Box::pin 擦除类型(Type Erasure)
let mut map = StreamMap::new();
map.insert("User_Task", Box::pin(client1_stream) as BoxStream);
map.insert("Heartbeat", Box::pin(client2_stream) as BoxStream);
// 模拟异步发送数据给 client1
tokio::spawn(async move {
tx.send("你好,Tokio!".to_string()).await.unwrap();
time::sleep(Duration::from_secs(1)).await;
tx.send("来自客户端1的第二条消息".to_string()).await.unwrap();
});
println!("--- 开始多流监听 ---");
// 4. 消费 StreamMap
// StreamMap 本身实现了 Stream 特性,它会公平地轮询内部所有的流
while let Some((key, value)) = map.next().await {
println!("[{}] 接收到数据: {}", key, value);
}
}假设我们有多个日志源(文件、TCP、Channel),需要过滤、转换、批量入库:
use std::time::Duration;
use tokio::sync::mpsc;
use tokio_stream::{self as stream, StreamExt};
#[tokio::main]
async fn main() {
// 启动专家级日志流水线
build_log_pipeline().await;
}
async fn build_log_pipeline() {
// 1. 模拟数据源 A:来自 mpsc 通道
let (tx, rx) = mpsc::channel::<String>(100);
let log_stream_mpsc = tokio_stream::wrappers::ReceiverStream::new(rx);
// 2. 模拟数据源 B:来自静态迭代器
let log_stream_static = stream::iter(vec![
"INFO: system initialized".to_string(),
"DEBUG: memory check pass".to_string(),
"INFO: setup complete".to_string(),
]);
// 模拟生产者:异步向通道发送日志
let tx_clone = tx.clone();
tokio::spawn(async move {
let logs = vec!["ERROR: disk full", "INFO: user_1 login", "WARN: high cpu"];
for log in logs {
let _ = tx_clone.send(log.to_string()).await;
// 故意留点间隔,触发 chunks_timeout 的时间阈值
tokio::time::sleep(Duration::from_millis(200)).await;
}
// 生产者完成后,tx 被 drop,rx 流会在处理完剩余数据后关闭
});
println!("🚀 日志聚合流水线已启动(使用 while let 模式)...");
// 3. 构建组合子管道
// 注意:这里我们不直接 await,而是先定义好管道
let pipeline = log_stream_mpsc
.merge(log_stream_static)
.filter(|log| {
let is_debug = log.contains("DEBUG");
if is_debug {
println!("── [过滤] 跳过 DEBUG 级日志");
}
!is_debug
})
.map(|log| log.to_uppercase())
// 每凑够 3 条,或者超过 1 秒没新数据,就打包成 Vec<String> 产出
.chunks_timeout(3, Duration::from_secs(1));
// 4. 关键点:使用 tokio::pin! 固定流
// 复杂的流结构在调用 .next() 时必须被固定在内存中
tokio::pin!(pipeline);
// 5. 消费管道:while let 模式比 for_each 更稳健,避开了 Trait 冲突
while let Some(batch) = pipeline.next().await {
println!("📦 --- 触发批量处理 ---");
println!("内容: {:?}", batch);
println!("批次大小: {}, 触发时间: {:?}", batch.len(), std::time::Instant::now());
// 模拟异步数据库操作
// db.insert_batch(batch).await;
}
println!("✅ 流水线运行结束,所有日志处理完毕。");
}这个管道声明式、带背压、易于测试,远比一堆手动 select! 清晰。
上一期学的 Channel 是 Stream 的最佳搭档:
mpsc::Receiver → ReceiverStreambroadcast::Receiver → BroadcastStreamtokio::time::interval → IntervalStream这样你可以用 StreamExt 对 Channel 数据做复杂变换,而无需手动循环。
tokio_stream::StreamExt 和 futures::StreamExt 有重叠方法,建议优先使用 tokio_stream(带时间相关方法),或用完全限定语法避免冲突。.buffer_unordered(1000),根据下游处理能力设置合理并发度,否则内存可能爆炸。Result<T, E>,用 try_filter、try_for_each 等。take_while 或外部取消信号控制。tokio::spawn + buffer_unordered 并发加速 I/O 部分。tokio_test 或 futures::stream::iter 构造测试 Stream,非常方便。小贴士:Stream 是“懒”的,只有被消费(next/collect/for_each)时才会推进。和 Iterator 一样,链式调用不会立即执行。
很多人觉得 Stream 难学,是因为它处于 Future 和 Iterator 的交界处。记住一个原则:凡是涉及到“一串”异步数据的场景,闭着眼选 Stream。 如果你手动写 loop { select! { ... } } 觉得代码越来越乱,那就是该重构为 Stream 的时候了。
Stream + StreamExt 让异步数据处理变得像同步 Iterator 一样自然,却拥有强大的并发与背压能力。它是构建实时系统、数据管道、事件驱动服务的核心工具。
掌握 Stream 后,你可以轻松处理:
我们下期见!
(完)