首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >在AsyncRead中超时不超时

在AsyncRead中超时不超时
EN

Stack Overflow用户
提问于 2022-02-08 18:33:28
回答 3查看 215关注 0票数 2

我正在尝试实现一个异步读取包装器,它将添加读取超时功能。目标是API是普通的AsyncRead。换句话说,我不想在代码中添加io.read(buf).timeout(t)。相反,读取实例本身应该在给定的超时过期后返回适当的io::ErrorKind::TimedOut

不过,我无法投票让delay做好准备。总是悬而未决的。我试过async-stdfuturessmol-timeout --同样的结果。虽然超时在等待时会触发,但在民意测验中却不会触发。我知道暂停并不容易。需要一些东西来唤醒它。我做错了什么?怎么才能挺过去?

代码语言:javascript
复制
use async_std::{
    future::Future,
    io,
    pin::Pin,
    task::{sleep, Context, Poll},
};
use std::time::Duration;

pub struct PrudentIo<IO> {
    expired: Option<Pin<Box<dyn Future<Output = ()> + Sync + Send>>>,
    timeout: Duration,
    io: IO,
}

impl<IO> PrudentIo<IO> {
    pub fn new(timeout: Duration, io: IO) -> Self {
        PrudentIo {
            expired: None,
            timeout,
            io,
        }
    }
}

fn delay(t: Duration) -> Option<Pin<Box<dyn Future<Output = ()> + Sync + Send + 'static>>> {
    if t.is_zero() {
        return None;
    }
    Some(Box::pin(sleep(t)))
}

impl<IO: io::Read + Unpin> io::Read for PrudentIo<IO> {
    fn poll_read(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut [u8],
    ) -> Poll<io::Result<usize>> {
        if let Some(ref mut expired) = self.expired {
            match expired.as_mut().poll(cx) {
                Poll::Ready(_) => {
                    println!("expired ready");
                    // too much time passed since last read/write
                    return Poll::Ready(Err(io::ErrorKind::TimedOut.into()));
                }
                Poll::Pending => {
                    println!("expired pending");
                    // in good time
                }
            }
        }

        let res = Pin::new(&mut self.io).poll_read(cx, buf);
        println!("read {:?}", res);

        match res {
            Poll::Pending => {
                if self.expired.is_none() {
                    // No data, start checking for a timeout
                    self.expired = delay(self.timeout);
                }
            }
            Poll::Ready(_) => self.expired = None,
        }

        res
    }
}
impl<IO: io::Write + Unpin> io::Write for PrudentIo<IO> {
    fn poll_write(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &[u8],
    ) -> Poll<io::Result<usize>> {
        Pin::new(&mut self.io).poll_write(cx, buf)
    }

    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
        Pin::new(&mut self.io).poll_flush(cx)
    }

    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
        Pin::new(&mut self.io).poll_close(cx)
    }
}

#[cfg(test)]
mod io_tests {
    use super::*;
    use async_std::io::ReadExt;
    use async_std::prelude::FutureExt;
    use async_std::{
        io::{copy, Cursor},
        net::TcpStream,
    };
    use std::time::Duration;

    #[async_std::test]
    async fn fail_read_after_timeout() -> io::Result<()> {
        let mut output = b"______".to_vec();
        let io = PendIo;
        let mut io = PrudentIo::new(Duration::from_millis(5), io);
        let mut io = Pin::new(&mut io);
        insta::assert_debug_snapshot!(io.read(&mut output[..]).timeout(Duration::from_secs(1)).await,@"Ok(io::Err(timeou))");
        Ok(())
    }
    #[async_std::test]
    async fn timeout_expires() {
        let later = delay(Duration::from_millis(1)).expect("some").await;
        insta::assert_debug_snapshot!(later,@r"()");
    }
    /// Mock IO always pending
    struct PendIo;
    impl io::Read for PendIo {
        fn poll_read(
            self: Pin<&mut Self>,
            _cx: &mut Context<'_>,
            _buf: &mut [u8],
        ) -> Poll<futures_io::Result<usize>> {
            Poll::Pending
        }
    }
    impl io::Write for PendIo {
        fn poll_write(
            self: Pin<&mut Self>,
            _cx: &mut Context<'_>,
            _buf: &[u8],
        ) -> Poll<futures_io::Result<usize>> {
            Poll::Pending
        }

        fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<futures_io::Result<()>> {
            Poll::Pending
        }

        fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<futures_io::Result<()>> {
            Poll::Pending
        }
    }
}
EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2022-02-08 19:32:06

异步超时工作如下:

similar.

  • When

  • 您创建超时未来。运行时调用poll进入超时,它检查超时是否过期。

  • 如果超时过期,则返回Ready并完成。

  • 如果没有过期,它会以某种方式注册回调,以便在调用cx.waker().wake()的正确时间过去时,或者在适当的时间内调用#4的回调,即调用适当的waker中的wake(),指示运行库再次调用poll

  • ,这次poll将返回Ready。好了!

代码的问题是从poll()实现:self.expired = delay(self.timeout);内部创建延迟。但是,在不轮询超时的情况下返回Pending甚至一次。这样,就不会在任何地方注册回调来调用Waker。没有懦夫,没有暂停。

我认为有几种解决办法:

A.不要将PrudentIo::expired初始化为None,而是直接在构造函数中创建timeout。这样,超时将始终在io之前被轮询至少一次,并且会被唤醒。但是,您将始终创建一个超时,即使它实际上并不需要。

B。创建timeout时,执行递归轮询:

代码语言:javascript
复制
Poll::Pending => {
    if self.expired.is_none() {
        // No data, start checking for a timeout
        self.expired = delay(self.timeout);
        return self.poll_read(cx, buf);
    }

这将调用io两次,这是不必要的,因此可能不是最优的。

C.在创建超时后添加对轮询的调用:

代码语言:javascript
复制
Poll::Pending => {
    if self.expired.is_none() {
        // No data, start checking for a timeout
        self.expired = delay(self.timeout);
        self.expired.as_mut().unwrap().as_mut().poll(cx);
    }

也许您应该匹配轮询的输出,以防它返回Ready,但是嘿,这是一个新的超时,它可能还在等待,而且它似乎工作得很好。

票数 2
EN

Stack Overflow用户

发布于 2022-03-04 08:19:11

代码语言:javascript
复制
// This is another solution. I think it is better.

impl<IO: io::AsyncRead + Unpin> io::AsyncRead for PrudentIo<IO> {
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut [u8],
    ) -> Poll<io::Result<usize>> {
        let this = self.get_mut();

        let io = Pin::new(&mut this.io);
        if let Poll::Ready(res) = io.poll_read(cx, buf) {
            return Poll::Ready(res);
        }

        loop {
            if let Some(expired) = this.expired.as_mut() {
                ready!(expired.poll(cx));
                this.expired.take();
                return Poll::Ready(Err(io::ErrorKind::TimedOut.into()));
            }

            let timeout = Timer::after(this.timeout);
            this.expired = Some(timeout);
        }
    }
}
票数 1
EN

Stack Overflow用户

发布于 2022-03-04 03:27:09

代码语言:javascript
复制
// 1. smol used, not async_std.
// 2. IO should be 'static.
// 3. when timeout, read_poll return Poll::Ready::Err(io::ErrorKind::Timeout)

use {
    smol::{future::FutureExt, io, ready, Timer},
    std::{
        future::Future,
        pin::Pin,
        task::{Context, Poll},
        time::Duration,
    },
};

// --

pub struct PrudentIo<IO> {
    expired: Option<Pin<Box<dyn Future<Output = io::Result<usize>>>>>,
    timeout: Duration,
    io: IO,
}

impl<IO> PrudentIo<IO> {
    pub fn new(timeout: Duration, io: IO) -> Self {
        PrudentIo {
            expired: None,
            timeout,
            io,
        }
    }
}

impl<IO: io::AsyncRead + Unpin + 'static> io::AsyncRead for PrudentIo<IO> {
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut [u8],
    ) -> Poll<io::Result<usize>> {
        let this = self.get_mut();
        loop {
            if let Some(expired) = this.expired.as_mut() {
                let res = ready!(expired.poll(cx))?;
                this.expired.take();
                return Ok(res).into();
            }
            let timeout = this.timeout.clone();
            let (io, read_buf) = unsafe {
                // Safety: ONLY used in poll_read method.
                (&mut *(&mut this.io as *mut IO), &mut *(buf as *mut [u8]))
            };
            let fut = async move {
                let timeout_fut = async {
                    Timer::after(timeout).await;
                    io::Result::<usize>::Err(io::ErrorKind::TimedOut.into())
                };
                let read_fut = io::AsyncReadExt::read(io, read_buf);
                let res = read_fut.or(timeout_fut).await;
                res
            }
            .boxed_local();
            this.expired = Some(fut);
        }
    }
}
impl<IO: io::AsyncWrite + Unpin> io::AsyncWrite for PrudentIo<IO> {
    fn poll_write(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &[u8],
    ) -> Poll<io::Result<usize>> {
        Pin::new(&mut self.io).poll_write(cx, buf)
    }

    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
        Pin::new(&mut self.io).poll_flush(cx)
    }

    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
        Pin::new(&mut self.io).poll_close(cx)
    }
}
票数 -1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/71039085

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档