首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何将futures_io::AsyncRead转换为rusoto::ByteStream?

如何将futures_io::AsyncRead转换为rusoto::ByteStream?
EN

Stack Overflow用户
提问于 2020-06-10 03:08:59
回答 1查看 461关注 0票数 3

我正在尝试建立一个服务,从SFTP服务器拉出文件,并上传到S3。

对于SFTP部分,我使用了async-ssh2,它为我提供了一个实现futures::AsyncRead的文件处理程序。由于这些SFTP文件可能非常大,我正在尝试将此File处理程序转换为可以使用Rusoto上传的ByteStream。看起来可以用futures::Stream初始化ByteStream

我的计划是在File对象上实现Stream (基于代码here),以与Rusoto (以下为后代重现的代码)兼容:

代码语言:javascript
复制
use core::pin::Pin;
use core::task::{Context, Poll};
use futures::{ready, stream::Stream};

pub struct ByteStream<R>(R);

impl<R: tokio::io::AsyncRead + Unpin> Stream for ByteStream<R> {
    type Item = u8;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
        let mut buf = [0; 1];

        match ready!(Pin::new(&mut self.0).poll_read(cx, &mut buf)) {
            Ok(n) if n != 0 => Some(buf[0]).into(),
            _ => None.into(),
        }
    }
}

这样做是不是一个好的方法呢?我看到了this question,但它似乎在使用tokio::io::AsyncRead。使用tokio是做这件事的标准方法吗?如果是这样的话,有没有办法从futures_io::AsyncRead转换到tokio::io::AsyncRead

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-06-11 05:50:00

这就是我进行转换的方式。除了使用更大的缓冲区(8KB)来减少网络调用次数外,我是基于上面的代码编写的。

代码语言:javascript
复制
use bytes::Bytes;
use core::pin::Pin;
use core::task::{Context, Poll};
use futures::{ready, stream::Stream};
use futures_io::AsyncRead;
use rusoto_s3::StreamingBody;

const KB: usize = 1024;

struct ByteStream<R>(R);

impl<R: AsyncRead + Unpin> Stream for ByteStream<R> {
    type Item = Result<Bytes, std::io::Error>;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
        let mut buf = vec![0_u8; 8 * KB];

        match ready!(Pin::new(&mut self.0).poll_read(cx, &mut buf[..])) {
            Ok(n) if n != 0 => Some(Ok(Bytes::from(buf))).into(),
            Ok(_) => None.into(),
            Err(e) => Some(Err(e)).into(),
        }
    }
}

允许我这样做:

代码语言:javascript
复制
fn to_streamingbody(body: async_ssh2::File) -> Option<StreamingBody> {
    let stream = ByteStream(body);
    Some(StreamingBody::new(stream))
}

(请注意,rusoto::StreamingBodyrusoto::ByteStream是别名)

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/62290132

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档