
在现代 Rust 异步编程中,JoinSet 和 Semaphore 的组合被誉为当前最推荐的并发控制模式。这种组合不仅解决了传统异步编程中的许多痛点,还提供了优雅的并发管理解决方案。本文将深入解析这两种工具的特点,以及它们结合使用的强大优势。
JoinSet 是 Rust 异步生态系统中用于管理多个异步任务的工具。它提供了任务的生命周期管理、并发控制以及结果收集等功能。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
use tokio::sync::JoinSet;
// 创建 JoinSet
let mut tasks = JoinSet::new();
// 添加任务
tasks.spawn(async {
println!("任务1开始执行");
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
"任务1完成"
});
tasks.spawn(async {
println!("任务2开始执行");
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
"任务2完成"
});
// 等待所有任务完成
while let Some(result) = tasks.join_next().await {
match result {
Ok(output) => println!("任务结果: {}", output),
Err(e) => println!("任务出错: {:?}", e),
}
}
Semaphore(信号量)是并发编程中的经典同步原语,用于控制对共享资源的访问。在 Rust 异步环境中,Semaphore 特别适合控制并发任务的数量。
1
2
3
4
5
6
7
8
9
10
use tokio::sync::Semaphore;
// 创建允许3个并发任务的信号量
let semaphore = Semaphore::new(3);
// 获取许可
let _permit = semaphore.acquire().await.unwrap();
// 释放许可(通过 Drop trait 自动完成)
drop(_permit);
虽然 JoinSet 和 Semaphore 各自都很强大,但它们的组合能够解决异步编程中的核心挑战:
单独使用 JoinSet 时,所有任务会尽可能并发执行,可能导致:
1
2
3
4
5
// ❌ 问题:可能创建过多并发任务
let mut tasks = JoinSet::new();
for i in 0..1000 {
tasks.spawn(process_request(i));
}
Semaphore 限制并发数,JoinSet 管理任务生命周期:
1
2
3
4
5
6
7
8
9
10
11
// ✅ 解决方案:控制并发数
let semaphore = Semaphore::new(10); // 最多10个并发任务
let mut tasks = JoinSet::new();
for i in 0..1000 {
let semaphore = semaphore.clone();
tasks.spawn(async move {
let _permit = semaphore.acquire().await.unwrap();
process_request(i).await
});
}
让我们通过一个实际场景来展示这个组合的强大之处:批量处理 API 请求。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
use tokio::sync::{JoinSet, Semaphore};
use reqwest::Client;
use serde_json::Value;
#[derive(Debug)]
struct ApiResponse {
id: u32,
data: Value,
status: String,
}
async fn fetch_api_data(client: &Client, id: u32, url: &str) -> Result<ApiResponse, reqwest::Error> {
let response = client.get(url).send().await?;
let data: Value = response.json().await?;
Ok(ApiResponse {
id,
data,
status: "success".to_string(),
})
}
async fn batch_process_urls(urls: Vec<String>) -> Vec<ApiResponse> {
let client = Client::new();
// 控制并发数为5,避免过多请求
let semaphore = Semaphore::new(5);
let mut tasks = JoinSet::new();
// 启动所有任务
for (i, url) in urls.into_iter().enumerate() {
let semaphore = semaphore.clone();
let client = client.clone();
tasks.spawn(async move {
// 获取信号量许可
let _permit = semaphore.acquire().await.unwrap();
match fetch_api_data(&client, i as u32, &url).await {
Ok(response) => response,
Err(e) => ApiResponse {
id: i as u32,
data: Value::Null,
status: format!("error: {}", e),
}
}
});
}
// 收集所有结果
let mut results = Vec::new();
while let Some(result) = tasks.join_next().await {
match result {
Ok(response) => results.push(response),
Err(e) => eprintln!("任务执行错误: {:?}", e),
}
}
results
}
// 使用示例
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let urls = vec![
"https://api.example.com/data/1".to_string(),
"https://api.example.com/data/2".to_string(),
"https://api.example.com/data/3".to_string(),
// ... 更多 URL
];
let results = batch_process_urls(urls).await;
for result in results {
println!("处理完成: ID={}, 状态={}", result.id, result.status);
}
Ok(())
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
use tokio::fs;
use tokio::io::AsyncWriteExt;
use std::path::Path;
async fn process_file(input_path: &Path, output_path: &Path) -> Result<(), Box<dyn std::error::Error>> {
// 模拟文件处理(读取、转换、写入)
let content = fs::read_to_string(input_path).await?;
// 模拟处理逻辑
let processed_content = content.to_uppercase();
fs::write(output_path, processed_content).await?;
Ok(())
}
async fn batch_process_files(input_dir: &str, output_dir: &str) -> Result<(), Box<dyn std::error::Error>> {
let entries = fs::read_dir(input_dir).await?;
let mut file_pairs = Vec::new();
// 收集所有需要处理的文件对
let mut entries = entries;
while let Some(entry) = entries.next_entry().await? {
let path = entry.path();
if path.extension().and_then(|s| s.to_str()) == Some("txt") {
let file_name = path.file_name().unwrap().to_str().unwrap();
let output_path = format!("{}/{}", output_dir, file_name);
file_pairs.push((path, std::path::Path::new(&output_path)));
}
}
// 使用信号量控制并发文件处理数量
let semaphore = Semaphore::new(3); // 最多同时处理3个文件
let mut tasks = JoinSet::new();
for (input_path, output_path) in file_pairs {
let semaphore = semaphore.clone();
tasks.spawn(async move {
let _permit = semaphore.acquire().await.unwrap();
if let Err(e) = process_file(&input_path, &output_path).await {
eprintln!("处理文件 {:?} 失败: {}", input_path, e);
Err(e)
} else {
println!("成功处理文件: {:?}", input_path.file_name().unwrap());
Ok(())
}
});
}
// 等待所有任务完成
while let Some(result) = tasks.join_next().await {
if let Err(e) = result {
eprintln!("任务执行失败: {:?}", e);
}
}
Ok(())
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 传统方法:所有任务立即启动
let mut tasks = Vec::new();
for i in 0..1000 {
tasks.push(tokio::spawn(process_item(i)));
}
// 问题:立即创建1000个任务,内存占用大
// JoinSet + Semaphore 方法
let semaphore = Semaphore::new(10);
let mut tasks = JoinSet::new();
for i in 0..1000 {
let semaphore = semaphore.clone();
tasks.spawn(async move {
let _permit = semaphore.acquire().await.unwrap();
process_item(i).await
});
}
// 优势:最多同时运行10个任务,内存使用受控
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
let mut tasks = JoinSet::new();
let semaphore = Semaphore::new(5);
for item in items {
let semaphore = semaphore.clone();
tasks.spawn(async move {
let _permit = semaphore.acquire().await.unwrap();
// 即使某个任务失败,其他任务继续执行
match risky_operation(item).await {
Ok(result) => Some(result),
Err(e) => {
eprintln!("任务失败: {}", e);
None // 返回 None 表示任务失败但不中断其他任务
}
}
});
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
async fn dynamic_task_management() {
let semaphore = Semaphore::new(5);
let mut tasks = JoinSet::new();
// 第一批任务
for i in 0..10 {
let semaphore = semaphore.clone();
tasks.spawn(async move {
let _permit = semaphore.acquire().await.unwrap();
process_batch_1(i).await
});
}
// 等待第一批完成一部分后,动态添加第二批
let mut completed = 0;
while let Some(result) = tasks.join_next().await {
completed += 1;
// 每完成5个任务,添加1个新任务
if completed % 5 == 0 {
let semaphore = semaphore.clone();
tasks.spawn(async move {
let _permit = semaphore.acquire().await.unwrap();
process_batch_2(completed).await
});
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// ❌ 手动管理方式
let mut futures = Vec::new();
for item in items {
futures.push(Box::pin(process_item(item)));
}
// 需要手动实现轮询机制
while !futures.is_empty() {
for i in (0..futures.len()).rev() {
if let Some(mut f) = futures.remove(i) {
match f.as_mut().poll(cx) {
Poll::Ready(_) => { /* 处理结果 */ }
Poll::Pending => futures.insert(i, f), // 重新插入
}
}
}
tokio::task::yield_now().await;
}
// ✅ JoinSet 方式
let mut tasks = JoinSet::new();
for item in items {
tasks.spawn(process_item(item));
}
while let Some(result) = tasks.join_next().await {
// 自动处理结果
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// ❌ 直接 spawn 的问题
for item in items {
tokio::spawn(async move {
// 没有并发控制
let result = process_item(item).await;
// 结果可能丢失或难以收集
});
}
// ✅ JoinSet + Semaphore
let semaphore = Semaphore::new(10);
let mut tasks = JoinSet::new();
for item in items {
let semaphore = semaphore.clone();
tasks.spawn(async move {
let _permit = semaphore.acquire().await.unwrap();
process_item(item).await
});
}
1
2
3
4
5
6
// 根据系统资源和任务特点调整
let semaphore = match task_type {
TaskType::IO密集 => Semaphore::new(num_cpus::get() * 2), // IO密集可以更多并发
TaskType::CPU密集 => Semaphore::new(num_cpus::get()), // CPU密集等于核心数
TaskType::网络请求 => Semaphore::new(50), // 网络请求根据服务限制
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
async fn robust_task_with_semaphore(
semaphore: &Semaphore,
tasks: &mut JoinSet,
item: Item,
) {
let semaphore = semaphore.clone();
tasks.spawn(async move {
let _permit = match semaphore.acquire().await {
Ok(permit) => permit,
Err(e) => {
eprintln!("获取信号量失败: {}", e);
return;
}
};
match process_item(item).await {
Ok(result) => Ok(result),
Err(e) => {
eprintln!("处理项目失败: {}", e);
Err(e) // JoinSet 可以捕获并处理错误
}
}
});
}
1
2
3
4
5
6
7
8
9
10
11
async fn task_with_cleanup() {
let semaphore = Semaphore::new(5);
let mut tasks = JoinSet::new();
// ... 添加任务 ...
// 优雅关闭:取消剩余任务
tasks.shutdown().await;
// 信号量会自动清理(通过 Drop trait)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
async fn optimized_batch_processing<T>(
items: Vec<T>,
batch_size: usize,
max_concurrent: usize,
) -> Vec<Result<T::Output, Box<dyn std::error::Error>>>
where
T: Send + 'static,
T::Output: Send,
{
let semaphore = Semaphore::new(max_concurrent);
let mut tasks = JoinSet::new();
let mut results = Vec::new();
// 分批处理,减少频繁的任务创建开销
for chunk in items.chunks(batch_size) {
for item in chunk {
let semaphore = semaphore.clone();
tasks.spawn(async move {
let _permit = semaphore.acquire().await.unwrap();
process_item(item).await
});
}
// 处理当前批次的结果
while let Some(result) = tasks.join_next().await {
results.push(result);
}
}
results
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
use reqwest::Client;
struct BatchProcessor {
client: Client,
semaphore: Semaphore,
}
impl BatchProcessor {
fn new(max_concurrent: usize) -> Self {
Self {
client: Client::new(),
semaphore: Semaphore::new(max_concurrent),
}
}
async fn process_batch(&self, urls: Vec<String>) -> Vec<Result<String, reqwest::Error>> {
let mut tasks = JoinSet::new();
for url in urls {
let semaphore = self.semaphore.clone();
let client = self.client.clone();
tasks.spawn(async move {
let _permit = semaphore.acquire().await.unwrap();
let response = client.get(&url).send().await?;
let text = response.text().await?;
Ok(text)
});
}
let mut results = Vec::new();
while let Some(result) = tasks.join_next().await {
results.push(result);
}
results
}
}
JoinSet + Semaphore 组合代表了 Rust 异步编程的最佳实践,它解决了现代并发应用中的核心问题:
JoinSet 提供完善的任务生命周期管理这种组合特别适合以下场景:
掌握这个组合,将让你的 Rust 异步编程技能提升到一个新的水平。记住,好的并发控制不仅仅是让代码运行更快,更重要的是让代码更加稳定、可维护和可预测。