首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何同时运行包含借用TcpStream的期货?

如何同时运行包含借用TcpStream的期货?
EN

Stack Overflow用户
提问于 2020-02-02 18:18:48
回答 2查看 266关注 0票数 1

我试图让这段代码并行运行,而不是顺序运行,因为对等体的数量可能很大。我使用的是async_std 1.4和rust 1.41

代码语言:javascript
复制
pub struct Peer {
    pub peer_id: String,
    pub tcp_stream: Arc<TcpStream>,
    pub public_key: [u8; 32],
}

async fn send_to_all_peers(message: Protocol, peers: &HashMap<String,Peer>) -> Result<()> {
    for peer in peers.values() {
        let mut stream = &*peer.tcp_stream;
        stream.write_all(&bincode::serialize(&message)?).await?;
    }
    Ok(())
}

我尝试过使用futures::future::join_all方法,因为包装我在async_std::task::spawn中创建和使用的未来需要一个静态生命周期。这是我尝试过的:

代码语言:javascript
复制
async fn send_to_all_peers(message: Protocol, peers: &HashMap<String,Peer>) {
    let handles = peers.values().into_iter().map(|peer| {
        task::spawn(
            async {
                let mut stream = &*peer.tcp_stream;
                if let Err(err) = stream
                    .write_all(&bincode::serialize(&message).unwrap())
                    .await
                {
                    error!("Error when writing to tcp_stream: {}", err);
                }
            }
        )
    });
    futures::future::join_all(handles).await;
}

我确信我遗漏了一些方法,谢谢你的帮助!

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2020-02-02 23:05:56

由于您正在尝试并发发送消息,因此每个任务都必须有自己的消息副本:

代码语言:javascript
复制
use async_std::{task, net::TcpStream};
use futures::{future, io::AsyncWriteExt};
use serde::Serialize;
use std::{
    collections::HashMap,
    error::Error,
    sync::Arc,
};

pub struct Peer {
    pub peer_id: String,
    pub tcp_stream: Arc<TcpStream>,
    pub public_key: [u8; 32],
}

#[derive(Serialize)]
struct Protocol;

async fn send_to_all_peers(
    message: Protocol,
    peers: &HashMap<String, Peer>)
    -> Result<(), Box<dyn Error>>
{
    let msg = bincode::serialize(&message)?;
    let handles = peers.values()
        .map(|peer| {
            let msg = msg.clone();
            let socket = peer.tcp_stream.clone();
            task::spawn(async move {
                let mut socket = &*socket;
                socket.write_all(&msg).await
            })
        });

    future::try_join_all(handles).await?;
    Ok(())
}
票数 1
EN

Stack Overflow用户

发布于 2020-02-02 22:07:57

你试过像这样的东西吗?

代码语言:javascript
复制
let handles = peers.values().into_iter().map(|peer| {
   let mut stream = &*peer.tcp_stream;
   stream.write_all(&bincode::serialize(&message).unwrap())
}
let results = futures::future::join_all(handles).await

请注意,.map闭包并不等待,而是直接返回一个未来,然后将其传递给join_all,然后等待。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/60025114

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档