来自rust std net库:
let listener = TcpListener::bind(("127.0.0.1", port)).unwrap();
info!("Opened socket on localhost port {}", port);
// accept connections and process them serially
for stream in listener.incoming() {
break;
}
info!("closed socket");怎样才能让听者停止听呢?它在API中说明,当listener被删除时,它将停止。但是如果incoming()是一个阻塞调用,我们该如何删除它呢?最好没有像tokio/mio这样的外部板条箱。
发布于 2019-06-21 04:54:13
您需要使用set_nonblocking()方法将TcpListener置于非阻塞模式,如下所示:
use std::io;
use std::net::TcpListener;
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
listener.set_nonblocking(true).expect("Cannot set non-blocking");
for stream in listener.incoming() {
match stream {
Ok(s) => {
// do something with the TcpStream
handle_connection(s);
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
// Decide if we should exit
break;
// Decide if we should try to accept a connection again
continue;
}
Err(e) => panic!("encountered IO error: {}", e),
}
}传入的()调用将立即返回Result<>类型,而不是等待连接。如果Result为Ok(),则表示已建立连接,您可以对其进行处理。如果结果是Err(WouldBlock),这实际上不是一个错误,只是在incoming()检查套接字的时刻没有连接挂起。
请注意,在WouldBlock的情况下,您可能希望在继续之前放入一个睡眠()或其他东西,否则您的程序将快速轮询传入()函数以检查连接,从而导致高CPU使用率。
改编自here的代码示例
发布于 2019-12-25 07:42:06
标准库没有提供这样的API,但是您可以使用特定于平台的API来关闭对套接字的读取,这将导致incoming迭代器返回错误。然后,当收到错误时,您可以中断对连接的处理。例如,在Unix系统上:
use std::net::TcpListener;
use std::os::unix::io::AsRawFd;
use std::thread;
let listener = TcpListener::bind("localhost:0")?;
let fd = listener.as_raw_fd();
let handle = thread::spawn(move || {
for connection in listener.incoming() {
match connection {
Ok(connection) => /* handle connection */
Err(_) => break,
}
});
libc::shutdown(fd, libc::SHUT_RD);
handle.join();发布于 2020-02-22 15:32:03
您可以使用eventfd来poll您的套接字,它用于发送信号。我为此写了一个帮助器。
let shutdown = EventFd::new();
let listener = TcpListener::bind("0.0.0.0:12345")?;
let incoming = CancellableIncoming::new(&listener, &shutdown);
for stream in incoming {
// Your logic
}
// While in other thread
shutdown.add(1); // Light the shutdown signal, now your incoming loop exits gracefully.use nix;
use nix::poll::{poll, PollFd, PollFlags};
use nix::sys::eventfd::{eventfd, EfdFlags};
use nix::unistd::{close, write};
use std;
use std::net::{TcpListener, TcpStream};
use std::os::unix::io::{AsRawFd, RawFd};
pub struct EventFd {
fd: RawFd,
}
impl EventFd {
pub fn new() -> Self {
EventFd {
fd: eventfd(0, EfdFlags::empty()).unwrap(),
}
}
pub fn add(&self, v: i64) -> nix::Result<usize> {
let b = v.to_le_bytes();
write(self.fd, &b)
}
}
impl AsRawFd for EventFd {
fn as_raw_fd(&self) -> RawFd {
self.fd
}
}
impl Drop for EventFd {
fn drop(&mut self) {
let _ = close(self.fd);
}
}
// -----
//
pub struct CancellableIncoming<'a> {
listener: &'a TcpListener,
eventfd: &'a EventFd,
}
impl<'a> CancellableIncoming<'a> {
pub fn new(listener: &'a TcpListener, eventfd: &'a EventFd) -> Self {
Self { listener, eventfd }
}
}
impl<'a> Iterator for CancellableIncoming<'a> {
type Item = std::io::Result<TcpStream>;
fn next(&mut self) -> Option<std::io::Result<TcpStream>> {
use nix::errno::Errno;
let fd = self.listener.as_raw_fd();
let evfd = self.eventfd.as_raw_fd();
let mut poll_fds = vec![
PollFd::new(fd, PollFlags::POLLIN),
PollFd::new(evfd, PollFlags::POLLIN),
];
loop {
match poll(&mut poll_fds, -1) {
Ok(_) => break,
Err(nix::Error::Sys(Errno::EINTR)) => continue,
_ => panic!("Error polling"),
}
}
if poll_fds[0].revents().unwrap() == PollFlags::POLLIN {
Some(self.listener.accept().map(|p| p.0))
} else if poll_fds[1].revents().unwrap() == PollFlags::POLLIN {
None
} else {
panic!("Can't be!");
}
}
}https://stackoverflow.com/questions/56692961
复制相似问题