首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >在不使用通道的情况下从任务中访问主作用域可访问的hashmap。

在不使用通道的情况下从任务中访问主作用域可访问的hashmap。
EN

Stack Overflow用户
提问于 2022-10-03 19:34:38
回答 1查看 77关注 0票数 -1

我正在查看用于Rust的WebSocket框架,最后观看了一个关于Warp的教程视频(https://www.youtube.com/watch?v=fuiFycJpCBw),重新创建了该项目,然后将其与Warp自己的聊天服务器示例实现进行了比较。

我开始从每一种方法中挑选一些片段,并最终根据我的喜好修改了Warp自己的示例,然后开始故意添加一些错误,以查看它对代码的影响。

具体来说,我试图了解什么时候将执行哪个错误处理分支。

这些示例包含一个主作用域的hashmap,由用户id与其相应的传输通道之间的映射组成,因此迭代此hashmap将允许向每个连接的用户发送一条消息。

每个新连接都将通过users.write().await.insert(my_id, tx);插入一个新的映射,在断开连接时通过users.write().await.remove(&my_id);删除它。

为了创建一个发送错误,我要做的是不删除客户端断开连接上的用户映射。当一个新消息出现并且这个hashmap被迭代时,它仍然包含过时的条目,试图通过它发送一条消息,为send()尝试正确地分支到错误分支。

问题是,这个错误分支位于一个tokio::spawn块中,在其中我想发出这个users.write().await.remove(&my_id);调用,我从正常流中删除了这个调用。

我可能弄错了,但我认为这是不可能的,因为我没有办法让这个任务访问和修改这个hashmap。如果我正确地理解了这个问题,我应该创建一个额外的通道,这个任务可以用来将消息发送回主范围,以便它从hashmap中删除条目。

为此,我使用了一个额外的mpsc::unbounded_channel(),在它上,我从错误处理分支调用send方法,以发送删除请求消息。

但这也使得我还需要在通道的接收端设置一个await,这会导致一个问题,因为该分支已经阻塞了while let Some(result) = user_rx.next().await循环块,以便等待next()传入的WebSocket消息。

因此,我尝试添加一个tokio::select!块,在其中我将侦听新的WebSocket消息以及当任务遇到错误时从任务中发送的删除消息。这是可行的,我可以接收WebSocket消息以及来自新的“控制”通道的消息。

然而,这带来了一个新的问题:当客户端断开连接时,我希望tokio::select!块在ws_rx.next() ( WebSocket接收套接字)上触发一个错误或什么东西,这是tokio::select!块中的分支之一。这将允许我将该连接视为断开连接,并将客户端从hashmap中删除。

以前,如果没有tokio::select!块,一旦客户端断开连接,while let Some(result) = ws_rx.next().await就会立即终止,而不会引发错误。

我还尝试的不是使用额外的通道来发送请求消息,而是调用drop(ws_tx),这是不起作用的。问题的核心是我希望能够从任务中操作hashmap。

我现在添加代码,这些代码可以复制粘贴到一个新项目中。它包含两个变体,一个带有tokio::select!块,另一个带有while let Some(result) = user_rx.next().await块,可以通过设置if true { /*select*/ } else { /*while let*/ }中的布尔值来选择它们。

您要检查的两个问题:

当使用

  1. 块时,注释掉最后一行users.write().await.remove(&current_id);以触发发送错误。
  2. 在使用tokio::select!块时,注意到select不会触发ws_rx.next()分支上的断开连接,因此不会到达底部的while let

我想要做的是不对通道使用tokio::select!,而是将其保留在更简单的while let-variant中,并从tokio::task::spawn代码中修改用户hashmap。

显然,我可以在那里使用hashmap,但是我不能在主范围内继续使用它。

这是包含问题的代码,main.rs:

代码语言:javascript
复制
//###########################################################################
use std::collections::HashMap;
use std::sync::{atomic::{AtomicUsize, Ordering}, Arc};
use env_logger::Env;
use futures::{SinkExt, StreamExt};
use tokio::sync::{mpsc, RwLock};
use tokio_stream::wrappers::UnboundedReceiverStream;
use warp::ws::{Message, WebSocket};
use warp::Filter;
use colored::Colorize;
//###########################################################################


//###########################################################################
static NEXT_USER_ID: AtomicUsize = AtomicUsize::new(1);
type Users = Arc<RwLock<HashMap<usize, mpsc::UnboundedSender<Message>>>>;
//###########################################################################


//###########################################################################
#[tokio::main]
async fn main() {    
  env_logger::Builder::from_env(Env::default().default_filter_or("info")).init();
  let users = Users::default();
  let users = warp::any().map(move || users.clone());
  let websocket = warp::path("ws")
    .and(warp::ws())
    .and(users)
    .map(|ws: warp::ws::Ws, users| {
        ws.on_upgrade(move |socket| connect(socket, users))
    });
  let files = warp::fs::dir("./static");
  let port = 8186;
  println!("running server at 0.0.0.0:{}", port.to_string().yellow());
  warp::serve(files.or(websocket)).run(([0, 0, 0, 0], port)).await;
}
//###########################################################################


//###########################################################################
async fn connect(ws: WebSocket, users: Users) {
  let current_id = NEXT_USER_ID.fetch_add(1, Ordering::Relaxed);
  println!("user {} connected", current_id.to_string().green());
  //~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  let (mut ws_tx, mut ws_rx) = ws.split();
  //~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  let (mpsc_tx, mpsc_rx) = mpsc::unbounded_channel(); // For passing WS messages between tasks
  let mut mpsc_stream_rx = UnboundedReceiverStream::new(mpsc_rx);
  //~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  let (mpsc_tx_2, mpsc_rx_2) = mpsc::unbounded_channel(); // For sending `remove-request` messages
  let mut mpsc_stream_rx_2: UnboundedReceiverStream<(String, usize)> = UnboundedReceiverStream::new(mpsc_rx_2);
  //~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  tokio::task::spawn(async move {
    while let Some(message) = mpsc_stream_rx.next().await {
      //----------------------------------------------------------------
      match ws_tx.send(message).await {
        Ok(_) => {
          // println!("websocket send success (current_id={})", current_id);
        },
        Err(e) => {
          eprintln!("=============================================================");
          eprintln!("websocket send error (current_id={}): {}", current_id, e);
          eprintln!("=============================================================");
          mpsc_tx_2.send(("remove-user".to_string(), current_id)).expect("unable to send remove-user message");
          break;
        }
      };
      //----------------------------------------------------------------
    };
    // NOTE: Problem here: cannot use "users"
    // users.write().await.remove(&current_id);
    // eprintln!("websocket send task ended (current_id={})", current_id);
    // eprintln!("=============================================================");
  });
  //~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~


  //~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  users.write().await.insert(current_id, mpsc_tx);
  //~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

  
  //~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  if false { // <------------------ TOGGLE THIS
    loop {
      tokio::select! {
        Some(result) = ws_rx.next() => {
          //------------------------------------------------------------------
          let msg = match result {
            Ok(msg) => msg,
            Err(e) => {
              eprintln!("=============================================================");
              eprintln!("websocket receive error(current_id={}): {}", current_id, e);
              eprintln!("=============================================================");
              break;
            }
          };
          //------------------------------------------------------------------
          if let Ok(text) = msg.to_str() {
            //----------------------------------------------------------------
            println!("got message '{}' from user {}", text, current_id);
            let new_msg = Message::text(format!("user {}: {}", current_id, text));
            //----------------------------------------------------------------
            let mut remove = Vec::new();
            for (&uid, mpsc_tx) in users.read().await.iter() {
              if current_id != uid {
                println!(" -> forwarding message '{}' to channel of user {}", text, uid);
                if let Err(e) = mpsc_tx.send(new_msg.clone()) {
                  eprintln!("=============================================================");
                  eprintln!("websocket channel error (current_id={}, uid={}): {}", current_id, uid.clone(), e);
                  eprintln!("=============================================================");
                  remove.push(uid);
                }
              }
            }
            //----------------------------------------------------------------
            if remove.len() > 0 {
              for uid in remove {
                eprintln!("removing from users (uid={})", uid);
                eprintln!("=============================================================");
                users.write().await.remove(&uid);
              }
            }
            //----------------------------------------------------------------
          };
          //------------------------------------------------------------------
        }

        Some(result) = mpsc_stream_rx_2.next() => {
          let (command, uid) = result;
          if command == "remove-user" {
            eprintln!("=============================================================");
            eprintln!("removing user {}", uid);
            eprintln!("=============================================================");
            users.write().await.remove(&uid);
          }
          else {
            eprintln!("=============================================================");
            eprintln!("unknown command {}", command);
            eprintln!("=============================================================");
          }
          break;
        }
        else => break
      }
    }
  }
  //~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~


  //~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  else {
    while let Some(result) = ws_rx.next().await {
      //------------------------------------------------------------------
      let msg = match result {
        Ok(msg) => msg,
        Err(e) => {
          eprintln!("=============================================================");
          eprintln!("websocket receive error(current_id={}): {}", current_id, e);
          eprintln!("=============================================================");
          break;
        }
      };
      //------------------------------------------------------------------
      if let Ok(text) = msg.to_str() {
        //----------------------------------------------------------------
        println!("got message '{}' from user {}", text, current_id);
        let new_msg = Message::text(format!("user {}: {}", current_id, text));
        //----------------------------------------------------------------
        let mut remove = Vec::new();
        for (&uid, mpsc_tx) in users.read().await.iter() {
          if current_id != uid {
            println!(" -> forwarding message '{}' to channel of user {}", text, uid);
            if let Err(e) = mpsc_tx.send(new_msg.clone()) {
              eprintln!("=============================================================");
              eprintln!("websocket channel error (current_id={}, uid={}): {}", current_id, uid.clone(), e);
              eprintln!("=============================================================");
              remove.push(uid);
            }
          }
        }
        //----------------------------------------------------------------
        if remove.len() > 0 {
          for uid in remove {
            eprintln!("removing from users (uid={})", uid);
            eprintln!("=============================================================");
            users.write().await.remove(&uid);
          }
        }
        //----------------------------------------------------------------
      };
      //------------------------------------------------------------------
    }
  }
  //~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~


  //~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  println!("user {} disconnected", current_id.to_string().red());
  users.write().await.remove(&current_id);
  //~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
}
//###########################################################################

此代码的源代码主要基于以下文件:

https://github.com/seanmonstar/warp/blob/master/examples/websockets_chat.rs

https://github.com/ddprrt/warp-websockets-example/blob/main/src/main.rs

这是Cargo.toml文件内容:

代码语言:javascript
复制
[package]
name = "websocket-3"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
tokio = { version = "1", features = ["full"] }
warp = "0.3.3"
tokio-stream = "0.1.10"
futures = "0.3.24"
env_logger = "0.9.1"
colored = "2"

这是驻留在静态/目录中的index.html文件:

代码语言:javascript
复制
<!DOCTYPE html>
<html lang="en">
    <head>
        <style>
          html, body {
            color: rgba(128, 128, 128);
            background-color: rgb(32, 32, 32);
          }
        </style>
        <title>Warp Websocket 3 8186 Chat</title>
    </head>
    <body>
        <h1>Warp Websocket 3 8186 Chat</h1>
        <div id="chat">
            <p><em>Connecting...</em></p>
        </div>
        <input type="text" id="text" />
        <button type="button" id="send">Send</button>
        <script type="text/javascript">
        const chat = document.getElementById('chat');
        const text = document.getElementById('text');
        const uri = 'ws://' + location.host + '/ws';
        const ws = new WebSocket(uri);
        function message(data) {
            const line = document.createElement('p');
            line.innerText = data;
            chat.appendChild(line);
        }
        ws.onopen = function() {
            chat.innerHTML = '<p><em>Connected!</em></p>';
        };
        ws.onmessage = function(msg) {
            message(msg.data);
        };
        ws.onclose = function() {
            chat.getElementsByTagName('em')[0].innerText = 'Disconnected!';
        };
        send.onclick = function() {
            const msg = text.value;
            ws.send(msg);
            text.value = '';
            message('you: ' + msg);
        };
        </script>
    </body>
</html>```
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-10-03 20:11:29

老实说,我没读过你的全部问题。有点太长了。

无论哪种方式。我飞过它,偶然发现了这个:

显然可以在那里使用hashmap,但是我不能在主作用域中继续使用它。

这是错误的。只有当您将HashMap本身移动到闭包中时,才是正确的。

对于move ||闭包,Arcs的工作方式有点不同:您必须克隆它们,然后将克隆移到:

代码语言:javascript
复制
async fn connect(ws: WebSocket, users: Users) {
    // .. some code ..
    tokio::task::spawn({
        let users = Arc::clone(&users);
        async move {
            // `users` in here is the cloned one,
            // the original one still exists
        }
    });
    // `users` can still be used here
}
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/73940037

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档