我正在查看用于Rust的WebSocket框架,最后观看了一个关于Warp的教程视频(https://www.youtube.com/watch?v=fuiFycJpCBw),重新创建了该项目,然后将其与Warp自己的聊天服务器示例实现进行了比较。
我开始从每一种方法中挑选一些片段,并最终根据我的喜好修改了Warp自己的示例,然后开始故意添加一些错误,以查看它对代码的影响。
具体来说,我试图了解什么时候将执行哪个错误处理分支。
这些示例包含一个主作用域的hashmap,由用户id与其相应的传输通道之间的映射组成,因此迭代此hashmap将允许向每个连接的用户发送一条消息。
每个新连接都将通过users.write().await.insert(my_id, tx);插入一个新的映射,在断开连接时通过users.write().await.remove(&my_id);删除它。
为了创建一个发送错误,我要做的是不删除客户端断开连接上的用户映射。当一个新消息出现并且这个hashmap被迭代时,它仍然包含过时的条目,试图通过它发送一条消息,为send()尝试正确地分支到错误分支。
问题是,这个错误分支位于一个tokio::spawn块中,在其中我想发出这个users.write().await.remove(&my_id);调用,我从正常流中删除了这个调用。
我可能弄错了,但我认为这是不可能的,因为我没有办法让这个任务访问和修改这个hashmap。如果我正确地理解了这个问题,我应该创建一个额外的通道,这个任务可以用来将消息发送回主范围,以便它从hashmap中删除条目。
为此,我使用了一个额外的mpsc::unbounded_channel(),在它上,我从错误处理分支调用send方法,以发送删除请求消息。
但这也使得我还需要在通道的接收端设置一个await,这会导致一个问题,因为该分支已经阻塞了while let Some(result) = user_rx.next().await循环块,以便等待next()传入的WebSocket消息。
因此,我尝试添加一个tokio::select!块,在其中我将侦听新的WebSocket消息以及当任务遇到错误时从任务中发送的删除消息。这是可行的,我可以接收WebSocket消息以及来自新的“控制”通道的消息。
然而,这带来了一个新的问题:当客户端断开连接时,我希望tokio::select!块在ws_rx.next() ( WebSocket接收套接字)上触发一个错误或什么东西,这是tokio::select!块中的分支之一。这将允许我将该连接视为断开连接,并将客户端从hashmap中删除。
以前,如果没有tokio::select!块,一旦客户端断开连接,while let Some(result) = ws_rx.next().await就会立即终止,而不会引发错误。
我还尝试的不是使用额外的通道来发送请求消息,而是调用drop(ws_tx),这是不起作用的。问题的核心是我希望能够从任务中操作hashmap。
我现在添加代码,这些代码可以复制粘贴到一个新项目中。它包含两个变体,一个带有tokio::select!块,另一个带有while let Some(result) = user_rx.next().await块,可以通过设置if true { /*select*/ } else { /*while let*/ }中的布尔值来选择它们。
您要检查的两个问题:
当使用
users.write().await.remove(¤t_id);以触发发送错误。tokio::select!块时,注意到select不会触发ws_rx.next()分支上的断开连接,因此不会到达底部的while let我想要做的是不对通道使用tokio::select!,而是将其保留在更简单的while let-variant中,并从tokio::task::spawn代码中修改用户hashmap。
显然,我可以在那里使用hashmap,但是我不能在主范围内继续使用它。
这是包含问题的代码,main.rs:
//###########################################################################
use std::collections::HashMap;
use std::sync::{atomic::{AtomicUsize, Ordering}, Arc};
use env_logger::Env;
use futures::{SinkExt, StreamExt};
use tokio::sync::{mpsc, RwLock};
use tokio_stream::wrappers::UnboundedReceiverStream;
use warp::ws::{Message, WebSocket};
use warp::Filter;
use colored::Colorize;
//###########################################################################
//###########################################################################
static NEXT_USER_ID: AtomicUsize = AtomicUsize::new(1);
type Users = Arc<RwLock<HashMap<usize, mpsc::UnboundedSender<Message>>>>;
//###########################################################################
//###########################################################################
#[tokio::main]
async fn main() {
env_logger::Builder::from_env(Env::default().default_filter_or("info")).init();
let users = Users::default();
let users = warp::any().map(move || users.clone());
let websocket = warp::path("ws")
.and(warp::ws())
.and(users)
.map(|ws: warp::ws::Ws, users| {
ws.on_upgrade(move |socket| connect(socket, users))
});
let files = warp::fs::dir("./static");
let port = 8186;
println!("running server at 0.0.0.0:{}", port.to_string().yellow());
warp::serve(files.or(websocket)).run(([0, 0, 0, 0], port)).await;
}
//###########################################################################
//###########################################################################
async fn connect(ws: WebSocket, users: Users) {
let current_id = NEXT_USER_ID.fetch_add(1, Ordering::Relaxed);
println!("user {} connected", current_id.to_string().green());
//~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
let (mut ws_tx, mut ws_rx) = ws.split();
//~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
let (mpsc_tx, mpsc_rx) = mpsc::unbounded_channel(); // For passing WS messages between tasks
let mut mpsc_stream_rx = UnboundedReceiverStream::new(mpsc_rx);
//~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
let (mpsc_tx_2, mpsc_rx_2) = mpsc::unbounded_channel(); // For sending `remove-request` messages
let mut mpsc_stream_rx_2: UnboundedReceiverStream<(String, usize)> = UnboundedReceiverStream::new(mpsc_rx_2);
//~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
tokio::task::spawn(async move {
while let Some(message) = mpsc_stream_rx.next().await {
//----------------------------------------------------------------
match ws_tx.send(message).await {
Ok(_) => {
// println!("websocket send success (current_id={})", current_id);
},
Err(e) => {
eprintln!("=============================================================");
eprintln!("websocket send error (current_id={}): {}", current_id, e);
eprintln!("=============================================================");
mpsc_tx_2.send(("remove-user".to_string(), current_id)).expect("unable to send remove-user message");
break;
}
};
//----------------------------------------------------------------
};
// NOTE: Problem here: cannot use "users"
// users.write().await.remove(¤t_id);
// eprintln!("websocket send task ended (current_id={})", current_id);
// eprintln!("=============================================================");
});
//~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
//~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
users.write().await.insert(current_id, mpsc_tx);
//~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
//~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
if false { // <------------------ TOGGLE THIS
loop {
tokio::select! {
Some(result) = ws_rx.next() => {
//------------------------------------------------------------------
let msg = match result {
Ok(msg) => msg,
Err(e) => {
eprintln!("=============================================================");
eprintln!("websocket receive error(current_id={}): {}", current_id, e);
eprintln!("=============================================================");
break;
}
};
//------------------------------------------------------------------
if let Ok(text) = msg.to_str() {
//----------------------------------------------------------------
println!("got message '{}' from user {}", text, current_id);
let new_msg = Message::text(format!("user {}: {}", current_id, text));
//----------------------------------------------------------------
let mut remove = Vec::new();
for (&uid, mpsc_tx) in users.read().await.iter() {
if current_id != uid {
println!(" -> forwarding message '{}' to channel of user {}", text, uid);
if let Err(e) = mpsc_tx.send(new_msg.clone()) {
eprintln!("=============================================================");
eprintln!("websocket channel error (current_id={}, uid={}): {}", current_id, uid.clone(), e);
eprintln!("=============================================================");
remove.push(uid);
}
}
}
//----------------------------------------------------------------
if remove.len() > 0 {
for uid in remove {
eprintln!("removing from users (uid={})", uid);
eprintln!("=============================================================");
users.write().await.remove(&uid);
}
}
//----------------------------------------------------------------
};
//------------------------------------------------------------------
}
Some(result) = mpsc_stream_rx_2.next() => {
let (command, uid) = result;
if command == "remove-user" {
eprintln!("=============================================================");
eprintln!("removing user {}", uid);
eprintln!("=============================================================");
users.write().await.remove(&uid);
}
else {
eprintln!("=============================================================");
eprintln!("unknown command {}", command);
eprintln!("=============================================================");
}
break;
}
else => break
}
}
}
//~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
//~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
else {
while let Some(result) = ws_rx.next().await {
//------------------------------------------------------------------
let msg = match result {
Ok(msg) => msg,
Err(e) => {
eprintln!("=============================================================");
eprintln!("websocket receive error(current_id={}): {}", current_id, e);
eprintln!("=============================================================");
break;
}
};
//------------------------------------------------------------------
if let Ok(text) = msg.to_str() {
//----------------------------------------------------------------
println!("got message '{}' from user {}", text, current_id);
let new_msg = Message::text(format!("user {}: {}", current_id, text));
//----------------------------------------------------------------
let mut remove = Vec::new();
for (&uid, mpsc_tx) in users.read().await.iter() {
if current_id != uid {
println!(" -> forwarding message '{}' to channel of user {}", text, uid);
if let Err(e) = mpsc_tx.send(new_msg.clone()) {
eprintln!("=============================================================");
eprintln!("websocket channel error (current_id={}, uid={}): {}", current_id, uid.clone(), e);
eprintln!("=============================================================");
remove.push(uid);
}
}
}
//----------------------------------------------------------------
if remove.len() > 0 {
for uid in remove {
eprintln!("removing from users (uid={})", uid);
eprintln!("=============================================================");
users.write().await.remove(&uid);
}
}
//----------------------------------------------------------------
};
//------------------------------------------------------------------
}
}
//~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
//~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
println!("user {} disconnected", current_id.to_string().red());
users.write().await.remove(¤t_id);
//~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
}
//###########################################################################此代码的源代码主要基于以下文件:
https://github.com/seanmonstar/warp/blob/master/examples/websockets_chat.rs
https://github.com/ddprrt/warp-websockets-example/blob/main/src/main.rs
这是Cargo.toml文件内容:
[package]
name = "websocket-3"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
tokio = { version = "1", features = ["full"] }
warp = "0.3.3"
tokio-stream = "0.1.10"
futures = "0.3.24"
env_logger = "0.9.1"
colored = "2"这是驻留在静态/目录中的index.html文件:
<!DOCTYPE html>
<html lang="en">
<head>
<style>
html, body {
color: rgba(128, 128, 128);
background-color: rgb(32, 32, 32);
}
</style>
<title>Warp Websocket 3 8186 Chat</title>
</head>
<body>
<h1>Warp Websocket 3 8186 Chat</h1>
<div id="chat">
<p><em>Connecting...</em></p>
</div>
<input type="text" id="text" />
<button type="button" id="send">Send</button>
<script type="text/javascript">
const chat = document.getElementById('chat');
const text = document.getElementById('text');
const uri = 'ws://' + location.host + '/ws';
const ws = new WebSocket(uri);
function message(data) {
const line = document.createElement('p');
line.innerText = data;
chat.appendChild(line);
}
ws.onopen = function() {
chat.innerHTML = '<p><em>Connected!</em></p>';
};
ws.onmessage = function(msg) {
message(msg.data);
};
ws.onclose = function() {
chat.getElementsByTagName('em')[0].innerText = 'Disconnected!';
};
send.onclick = function() {
const msg = text.value;
ws.send(msg);
text.value = '';
message('you: ' + msg);
};
</script>
</body>
</html>```发布于 2022-10-03 20:11:29
老实说,我没读过你的全部问题。有点太长了。
无论哪种方式。我飞过它,偶然发现了这个:
显然可以在那里使用hashmap,但是我不能在主作用域中继续使用它。
这是错误的。只有当您将HashMap本身移动到闭包中时,才是正确的。
对于move ||闭包,Arcs的工作方式有点不同:您必须克隆它们,然后将克隆移到:
async fn connect(ws: WebSocket, users: Users) {
// .. some code ..
tokio::task::spawn({
let users = Arc::clone(&users);
async move {
// `users` in here is the cloned one,
// the original one still exists
}
});
// `users` can still be used here
}https://stackoverflow.com/questions/73940037
复制相似问题