我正在尝试使用async_std从网络接收UDP数据报。
有一个实现UdpSocket的async recv_from,这个方法返回一个未来,但是我需要一个async_std::stream::Stream来提供一个UDP数据报流,因为它是一个更好的抽象。
我已经找到了完全满足我需要的tokio::net::UdpFramed,但它在目前的tokio版本中是不可用的。
一般来说,问题是如何将Future从给定的异步函数转换为Stream。
发布于 2020-01-30 20:03:27
对于单个项目,请使用FutureExt::into_stream
use futures::prelude::*; // 0.3.1
fn outer() -> impl Stream<Item = i32> {
inner().into_stream()
}
async fn inner() -> i32 {
42
}对于关闭所产生的多个期货的流,请使用stream::unfold
use futures::prelude::*; // 0.3.1
fn outer() -> impl Stream<Item = i32> {
stream::unfold((), |()| async { Some((inner().await, ())) })
}
async fn inner() -> i32 {
42
}在您的例子中,您可以使用stream::unfold
use async_std::{io, net::UdpSocket}; // 1.4.0, features = ["attributes"]
use futures::prelude::*; // 0.3.1
fn read_many(s: UdpSocket) -> impl Stream<Item = io::Result<Vec<u8>>> {
stream::unfold(s, |s| {
async {
let data = read_one(&s).await;
Some((data, s))
}
})
}
async fn read_one(s: &UdpSocket) -> io::Result<Vec<u8>> {
let mut data = vec![0; 1024];
let (len, _) = s.recv_from(&mut data).await?;
data.truncate(len);
Ok(data)
}
#[async_std::main]
async fn main() -> io::Result<()> {
let s = UdpSocket::bind("0.0.0.0:9876").await?;
read_many(s)
.for_each(|d| {
async {
match d {
Ok(d) => match std::str::from_utf8(&d) {
Ok(s) => println!("{}", s),
Err(_) => println!("{:x?}", d),
},
Err(e) => eprintln!("Error: {}", e),
}
}
})
.await;
Ok(())
}发布于 2020-01-30 20:10:37
一般来说,
的问题是如何将
Futures从给定的异步函数转换为Stream?
有FutureExt::into_stream,但是不要让这个名字愚弄你,它不适合你的情况。
有一个实现异步
recv_from的async_std::stream::Stream,这个方法返回一个未来,但是我需要一个async_std::stream::Stream来给出一个UDP数据报流,因为它是一个更好的抽象。
这不一定是一个更好的抽象概念。
具体来说,async-std的UdpSocket::recv_from返回一个输出类型为(usize, SocketAddr)的未来--接收到的数据的大小和对等地址。如果要使用into_stream将其转换为流,则只会给出接收到的数据,而不是接收到的数据。
,我已经找到了完全满足我需要的
tokio::net::UdpFramed,但是它在目前的tokio版本中是不可用的。
它已经搬到tokio-util板条箱里了。不幸的是,你也不能(很容易)使用它。它需要一个tokio::net::UdpSocket,这与async_std::net::UdpSocket不一样。
当然,您可以使用期货实用程序函数(如futures::stream::poll_fn或futures::stream::unfold )来为UdpSocket::recv_from提供futures::stream::Stream外观,但是您将如何处理这些功能呢?如果最后调用StreamExt::next来轮询它的值,则可以直接使用recv_from。
只有当您正在使用的API需要Stream输入(如rusoto )时,才有必要使用rusoto。
https://stackoverflow.com/questions/59982803
复制相似问题