首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何共享tokio::net::TcpStream?

如何共享tokio::net::TcpStream?
EN

Stack Overflow用户
提问于 2022-03-05 20:40:05
回答 2查看 662关注 0票数 1

我需要在同一个TcpStream上发送和接收正常数据,同时定期发送心跳数据。在当前的实现中,我使用了Arc<Mutex<TcpStream>>,但是它编译时有错误:

代码语言:javascript
复制
use anyhow::Result;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;

#[tokio::main]
async fn main() -> Result<()> {
    let stream = TcpStream::connect("127.0.0.1:8888").await.unwrap();
    let stream = Arc::new(Mutex::new(stream));

    let common_stream = stream.clone();
    let handler1 = tokio::spawn(async {
        loop {
            let mut stream = common_stream.lock().unwrap();
            let mut buf = [0u8; 10];
            stream.read_exact(&mut buf).await.unwrap();
            buf.reverse();
            stream.write(&buf).await.unwrap();
        }
    });

    let heartbeat_stream = stream.clone();
    let handler2 = tokio::spawn(async {
        loop {
            let mut stream = heartbeat_stream.lock().unwrap();
            stream.write_u8(1).await.unwrap();

            thread::sleep(Duration::from_millis(200));
        }
    });

    handler1.await?;
    handler2.await?;

    Ok(())
}
代码语言:javascript
复制
error: future cannot be sent between threads safely
   --> src\main.rs:14:20
    |
14  |     let handler1 = tokio::spawn(async {
    |                    ^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: within `impl Future<Output = [async output]>`, the trait `Send` is not implemented for `std::sync::MutexGuard<'_, tokio::net::TcpStream>`
note: future is not `Send` as this value is used across an await
   --> src\main.rs:20:31
    |
16  |             let mut stream = common_stream.lock().unwrap();
    |                 ---------- has type `std::sync::MutexGuard<'_, tokio::net::TcpStream>` which is not `Send`
...
20  |             stream.write(&buf).await.unwrap();
    |                               ^^^^^^ await occurs here, with `mut stream` maybe used later
21  |         }
    |         - `mut stream` is later dropped here
note: required by a bound in `tokio::spawn`
   --> .cargo\registry\src\mirrors.tuna.tsinghua.edu.cn-df7c3c540f42cdbd\tokio-1.17.0\src\task\spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ^^^^ required by this bound in `tokio::spawn`

error: future cannot be sent between threads safely
   --> src\main.rs:25:20
    |
25  |     let handler2 = tokio::spawn(async {
    |                    ^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: within `impl Future<Output = [async output]>`, the trait `Send` is not implemented for `std::sync::MutexGuard<'_, tokio::net::TcpStream>`
note: future is not `Send` as this value is used across an await
   --> src\main.rs:28:31
    |
27  |             let mut stream = heartbeat_stream.lock().unwrap();
    |                 ---------- has type `std::sync::MutexGuard<'_, tokio::net::TcpStream>` which is not `Send`
28  |             stream.write_u8(1).await.unwrap();
    |                               ^^^^^^ await occurs here, with `mut stream` maybe used later
...
31  |         }
    |         - `mut stream` is later dropped here
note: required by a bound in `tokio::spawn`
   --> .cargo\registry\src\mirrors.tuna.tsinghua.edu.cn-df7c3c540f42cdbd\tokio-1.17.0\src\task\spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ^^^^ required by this bound in `tokio::spawn`

如何修正这些错误,或者是否有其他方法来实现同样的目标?

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2022-03-05 21:05:54

下面是一个解决方案,它将流分成两个部分,用于读和写,以及循环中的to:

  • 等待心跳事件,并在发生这种情况时发送一个字节写入一半的流
  • 等待读取一半(10个字节)的数据,将其反转并再次写入一半。

而且,这不会产生线程,并且在当前没有锁的线程中可以很好地完成所有事情。

代码语言:javascript
复制
use anyhow::Result;
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;

#[tokio::main]
async fn main() -> Result<()> {
    let mut stream = TcpStream::connect("127.0.0.1:8888").await?;
    let (mut read, mut write) = stream.split();
    let mut heartbeat_interval = tokio::time::interval(Duration::from_millis(200));
    let mut buf = [0u8; 10];

    loop {
        tokio::select! {
            _ = heartbeat_interval.tick() => {
                write.write(&[1]).await?;
            }

            result = read.read_exact(&mut buf) => {
                let _bytes_read = result?;
                buf.reverse();
                write.write(&buf).await?;
            }
        }
    }
}
票数 2
EN

Stack Overflow用户

发布于 2022-03-05 20:59:21

代码中有几个错误,尽管其背后的想法几乎是好的。您应该尽可能使用异步中的任何可用工具。一些所需/期望的改变:

  • 使用tokio::time::sleep,因为它是异步的,否则调用将被阻塞。
  • 使用互斥锁的异步版本(例如,来自futures机箱的)
  • 使用某种通用错误处理(anyhow会有所帮助)
代码语言:javascript
复制
use futures::lock::Mutex;
use anyhow::Error;
use tokio::time::sleep;
use std::sync::Arc;
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;

#[tokio::main]
async fn main() -> Result<(), Error> {
    let stream = TcpStream::connect("127.0.0.1:8888").await.unwrap();
    let stream = Arc::new(Mutex::new(stream));

    let common_stream = stream.clone();
    let handler1 = tokio::spawn(async move {
        loop {
            let mut stream = common_stream.lock().await;
            let mut buf = [0u8; 10];
            stream.read_exact(&mut buf).await.unwrap();
            buf.reverse();
            stream.write(&buf).await.unwrap();
        }
    });

    let heartbeat_stream = stream.clone();
    let handler2 = tokio::spawn(async move {
        loop {
            let mut stream = heartbeat_stream.lock().await;
            stream.write_u8(1).await.unwrap();

            sleep(Duration::from_millis(200)).await;
        }
    });

    handler1.await?;
    handler2.await?;

    Ok(())
}

游乐场

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/71365810

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档