我试图编写一个简单的tcp服务器来读取和广播消息。
我在使用Tokio,但我认为这更像是一个普遍的锈蚀问题。
我有一个有着共同状态的弧形:
let state = Arc::new(Mutex::new(Shared::new(server_tx)));
稍后,我想生成两个线程,它们将使用对该状态的引用:
let server = listener.incoming().for_each(move |socket| {
// error[E0382]: capture of moved value: `state`
process(socket, state.clone());
Ok(())
}).map_err(|err| {
println!("accept error = {:?}", err);
});
let receive_sensor_messages = sensors_rx.for_each(move |line| {
println!("Received sensor message, broadcasting: {:?}", line);
// error[E0597]: borrowed value does not live long enough
// error[E0507]: cannot move out of borrowed content
for (_, tx) in state.clone().lock().unwrap().clients {
tx.unbounded_send(line.clone()).unwrap();
}
Ok(())
}).map_err(|err| {
println!("line reading error = {:?}", err);
});(游乐场)
据我所知,它试图告诉我的是,state是在第一个闭包listener.incoming().for_each(move |socket| {中借用的,所以当我试图在sensors_rx.for_each(move |line| {中再次使用它时,它是说它是不可能的。
我的问题是如何解决这个问题?难道Arc不应该解决线程间共享变量的问题吗?我尝试了不同的clone组合(在闭包之外执行克隆,然后再在内部执行clone ),但都没有工作。
干杯!
发布于 2018-10-29 12:47:52
本质上,您的问题可以简化为对下列妇女、儿童和儿童事务部:
use std::sync::{Arc, Mutex};
struct Bar;
fn foo(_ : &Bar){
println!("foo called");
}
fn main(){
let example = Arc::new(Mutex::new(Bar));
std::thread::spawn(move ||{
let _ = example.clone();
});
// --- (1) ---
std::thread::spawn(move ||{
foo(&example.clone().lock().unwrap());
});
}现在,这里的第一个问题是移动了example。也就是说,一旦我们跨越了(1),最初的example就被认为是从。相反,我们首先需要clone,然后是move
let example = Arc::new(Mutex::new(Bar));
let local_state = example.clone();
std::thread::spawn(move ||{
let _ = local_state; // now fine!
});另一个错误源于短暂的Arc。本质上,它只存在足够长的时间让我们在底层的lock上使用Mutex。虽然我们知道至少还有一个指向内存的Arc,但编译器无法证明这一点。但是,如果我们去掉了clone(),就可以了:
let local_state = example.clone();
std::thread::spawn(move ||{
foo(&local_state.lock().unwrap());
});但是,您还可以通过使用容器的内容( clients)来循环容器。相反,在那里使用&,例如&local_state().unwrap().clients)。
您可以在或者在操场上下面找到完整的固定代码
use std::sync::{Arc, Mutex};
struct Bar;
fn foo(_ : &Bar){
println!("foo called");
}
fn main(){
let example = Arc::new(Mutex::new(Bar));
let local_state = example.clone();
std::thread::spawn(move ||{
let _ = local_state;
});
let local_state = example.clone();
std::thread::spawn(move ||{
foo(&local_state.lock().unwrap());
}).join();
}发布于 2018-10-29 12:41:46
对于每一个闭包,您都必须提供自己的Arc,因此您必须事先clone您的Arc。
let state = Arc::new(Mutex::new(Shared::new(server_tx)));
let state1 = Arc::clone(&state);
let state2 = Arc::clone(&state);
let server = listener.incoming().for_each(move |socket| {
process(socket, state1.clone());
Ok(())
});
let receive_sensor_messages = sensors_rx.for_each(move |line| {
println!("Received sensor message, broadcasting: {:?}", line);
let shared = state2.lock().unwrap();
for (_, tx) in &shared.clients { // better: `for tx in shared.clients.values()`
tx.unbounded_send(line.clone()).unwrap();
}
Ok(())
});您可以在这里省略state1,但我发现这样做更干净。
这样做的原因是,您将值state移动到第一个闭包中,所以不能在第二个闭包中使用它,因为它已经被移动了(这很有意义,不是吗?)
https://stackoverflow.com/questions/53045522
复制相似问题