首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何将未来转化为溪流?

如何将未来转化为溪流?
EN

Stack Overflow用户
提问于 2020-01-30 09:31:54
回答 2查看 2.4K关注 0票数 5

我正在尝试使用async_std从网络接收UDP数据报。

有一个实现UdpSocketasync recv_from,这个方法返回一个未来,但是我需要一个async_std::stream::Stream来提供一个UDP数据报流,因为它是一个更好的抽象。

我已经找到了完全满足我需要的tokio::net::UdpFramed,但它在目前的tokio版本中是不可用的。

一般来说,问题是如何将Future从给定的异步函数转换为Stream

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2020-01-30 20:03:27

对于单个项目,请使用FutureExt::into_stream

代码语言:javascript
复制
use futures::prelude::*; // 0.3.1

fn outer() -> impl Stream<Item = i32> {
    inner().into_stream()
}

async fn inner() -> i32 {
    42
}

对于关闭所产生的多个期货的流,请使用stream::unfold

代码语言:javascript
复制
use futures::prelude::*; // 0.3.1

fn outer() -> impl Stream<Item = i32> {
    stream::unfold((), |()| async { Some((inner().await, ())) })
}

async fn inner() -> i32 {
    42
}

在您的例子中,您可以使用stream::unfold

代码语言:javascript
复制
use async_std::{io, net::UdpSocket}; // 1.4.0, features = ["attributes"]
use futures::prelude::*; // 0.3.1

fn read_many(s: UdpSocket) -> impl Stream<Item = io::Result<Vec<u8>>> {
    stream::unfold(s, |s| {
        async {
            let data = read_one(&s).await;
            Some((data, s))
        }
    })
}

async fn read_one(s: &UdpSocket) -> io::Result<Vec<u8>> {
    let mut data = vec![0; 1024];
    let (len, _) = s.recv_from(&mut data).await?;
    data.truncate(len);
    Ok(data)
}

#[async_std::main]
async fn main() -> io::Result<()> {
    let s = UdpSocket::bind("0.0.0.0:9876").await?;

    read_many(s)
        .for_each(|d| {
            async {
                match d {
                    Ok(d) => match std::str::from_utf8(&d) {
                        Ok(s) => println!("{}", s),
                        Err(_) => println!("{:x?}", d),
                    },
                    Err(e) => eprintln!("Error: {}", e),
                }
            }
        })
        .await;

    Ok(())
}
票数 7
EN

Stack Overflow用户

发布于 2020-01-30 20:10:37

一般来说,

的问题是如何将Futures从给定的异步函数转换为Stream

FutureExt::into_stream,但是不要让这个名字愚弄你,它不适合你的情况。

有一个实现异步recv_fromasync_std::stream::Stream,这个方法返回一个未来,但是我需要一个async_std::stream::Stream来给出一个UDP数据报流,因为它是一个更好的抽象。

这不一定是一个更好的抽象概念。

具体来说,async-stdUdpSocket::recv_from返回一个输出类型为(usize, SocketAddr)的未来--接收到的数据的大小和对等地址。如果要使用into_stream将其转换为流,则只会给出接收到的数据,而不是接收到的数据。

,我已经找到了完全满足我需要的tokio::net::UdpFramed,但是它在目前的tokio版本中是不可用的。

它已经搬到tokio-util板条箱里了。不幸的是,你也不能(很容易)使用它。它需要一个tokio::net::UdpSocket,这与async_std::net::UdpSocket不一样。

当然,您可以使用期货实用程序函数(如futures::stream::poll_fnfutures::stream::unfold )来为UdpSocket::recv_from提供futures::stream::Stream外观,但是您将如何处理这些功能呢?如果最后调用StreamExt::next来轮询它的值,则可以直接使用recv_from

只有当您正在使用的API需要Stream输入(如rusoto )时,才有必要使用rusoto

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/59982803

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档