我试图让这段代码并行运行,而不是顺序运行,因为对等体的数量可能很大。我使用的是async_std 1.4和rust 1.41
pub struct Peer {
pub peer_id: String,
pub tcp_stream: Arc<TcpStream>,
pub public_key: [u8; 32],
}
async fn send_to_all_peers(message: Protocol, peers: &HashMap<String,Peer>) -> Result<()> {
for peer in peers.values() {
let mut stream = &*peer.tcp_stream;
stream.write_all(&bincode::serialize(&message)?).await?;
}
Ok(())
}我尝试过使用futures::future::join_all方法,因为包装我在async_std::task::spawn中创建和使用的未来需要一个静态生命周期。这是我尝试过的:
async fn send_to_all_peers(message: Protocol, peers: &HashMap<String,Peer>) {
let handles = peers.values().into_iter().map(|peer| {
task::spawn(
async {
let mut stream = &*peer.tcp_stream;
if let Err(err) = stream
.write_all(&bincode::serialize(&message).unwrap())
.await
{
error!("Error when writing to tcp_stream: {}", err);
}
}
)
});
futures::future::join_all(handles).await;
}我确信我遗漏了一些方法,谢谢你的帮助!
发布于 2020-02-02 23:05:56
由于您正在尝试并发发送消息,因此每个任务都必须有自己的消息副本:
use async_std::{task, net::TcpStream};
use futures::{future, io::AsyncWriteExt};
use serde::Serialize;
use std::{
collections::HashMap,
error::Error,
sync::Arc,
};
pub struct Peer {
pub peer_id: String,
pub tcp_stream: Arc<TcpStream>,
pub public_key: [u8; 32],
}
#[derive(Serialize)]
struct Protocol;
async fn send_to_all_peers(
message: Protocol,
peers: &HashMap<String, Peer>)
-> Result<(), Box<dyn Error>>
{
let msg = bincode::serialize(&message)?;
let handles = peers.values()
.map(|peer| {
let msg = msg.clone();
let socket = peer.tcp_stream.clone();
task::spawn(async move {
let mut socket = &*socket;
socket.write_all(&msg).await
})
});
future::try_join_all(handles).await?;
Ok(())
}发布于 2020-02-02 22:07:57
你试过像这样的东西吗?
let handles = peers.values().into_iter().map(|peer| {
let mut stream = &*peer.tcp_stream;
stream.write_all(&bincode::serialize(&message).unwrap())
}
let results = futures::future::join_all(handles).await请注意,.map闭包并不等待,而是直接返回一个未来,然后将其传递给join_all,然后等待。
https://stackoverflow.com/questions/60025114
复制相似问题