我正在尝试使用FuturesOrdered发送并行异步Rusoto SQS请求
use futures::prelude::*; // 0.1.26
use futures::stream::futures_unordered::FuturesUnordered;
use rusoto_core::{Region, HttpClient}; // 0.38.0
use rusoto_credential::EnvironmentProvider; // 0.17.0
use rusoto_sqs::{SendMessageBatchRequest, SendMessageBatchRequestEntry, Sqs, SqsClient}; // 0.38.0
fn main() {
let client = SqsClient::new_with(
HttpClient::new().unwrap(),
EnvironmentProvider::default(),
Region::UsWest2,
);
let messages: Vec<u32> = (1..12).map(|n| n).collect();
let chunks: Vec<_> = messages.chunks(10).collect();
let tasks: FuturesUnordered<_> = chunks.into_iter().map(|c| {
let batch = create_batch(c);
client.send_message_batch(batch)
}).collect();
let tasks = tasks
.for_each(|t| {
println!("{:?}", t);
Ok(())
})
.map_err(|e| println!("{}", e));
tokio::run(tasks);
}
fn create_batch(ids: &[u32]) -> SendMessageBatchRequest {
let queue_url = "https://sqs.us-west-2.amazonaws.com/xxx/xxx".to_string();
let entries = ids
.iter()
.map(|id| SendMessageBatchRequestEntry {
id: id.to_string(),
message_body: id.to_string(),
..Default::default()
})
.collect();
SendMessageBatchRequest {
entries,
queue_url,
}
}任务正确完成,但tokio::run(tasks)没有停止。我假设这是因为tasks.for_each()会迫使它继续运行并寻找更多的未来?
为什么tokio::run(tasks)不停下来?我是否正确地使用了FuturesOrdered?
当创建多达60,000个要完成的未来并将它们推送到FuturesUnordered组合器中时,我还有点担心内存使用。
发布于 2019-05-12 15:12:30
我发现是主函数中的SqsClient导致它阻塞,因为即使任务已经完成,它仍然在做一些家务活。
其中一位Rusoto人员提供的解决方案是在tokio::run上方添加以下内容
std::mem::drop(client);https://stackoverflow.com/questions/56022266
复制相似问题