首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >有没有办法关闭`tokio::runtime::current_thread::Runtime`?

有没有办法关闭`tokio::runtime::current_thread::Runtime`?
EN

Stack Overflow用户
提问于 2018-12-12 23:03:23
回答 1查看 1.9K关注 0票数 1

我使用的是tokio::runtime::current_thread::Runtime,我希望能够运行一个未来,并在同一个线程中停止反应堆。页面上的示例没有显示如何停止运行时。有什么办法我能做到吗?

EN

回答 1

Stack Overflow用户

发布于 2019-02-10 03:17:49

当将来完成时,运行库将自动关闭:

代码语言:javascript
复制
use std::time::Duration;
use tokio::time; // 0.2.21

#[tokio::main]
async fn main() {
    time::delay_for(Duration::from_secs(2)).await;
    eprintln!("future complete");
}

有关创建运行时的其他方法,请参见How do I synchronously return a value calculated in an asynchronous Future in stable Rust?

如果您需要取消未来,您可以创建一些将导致未来poll成功的东西。我可能会用频道和select

代码语言:javascript
复制
use futures::{channel::oneshot, future, FutureExt}; // 0.3.5
use std::time::Duration;
use tokio::{task, time}; // 0.2.21

#[tokio::main]
async fn main() {
    let future = async {
        time::delay_for(Duration::from_secs(3600)).await;
        eprintln!("future complete");
    };

    let (cancel_tx, cancel_rx) = oneshot::channel();

    let another_task = task::spawn(async {
        eprintln!("Another task started");
        time::delay_for(Duration::from_secs(2)).await;
        eprintln!("Another task canceling the future");
        cancel_tx.send(()).expect("Unable to cancel");
        eprintln!("Another task exiting");
    });

    future::select(future.boxed(), cancel_rx).await;

    another_task.await.expect("The other task panicked");
}

下面是一种非常简单的手动解决方案,它很简单,很残忍,而且可能不是很好的表现:

代码语言:javascript
复制
use pin_project::pin_project; // 0.4.17
use std::{
    future::Future,
    pin::Pin,
    sync::{Arc, Mutex},
    task::{self, Context, Poll},
    thread,
    time::Duration,
};
use tokio::time; // 0.2.21 

#[tokio::main]
async fn main() {
    let future = async {
        time::delay_for(Duration::from_secs(3600)).await;
        eprintln!("future complete");
    };

    let (future, cancel) = Cancelable::new(future);

    let another_thread = thread::spawn(|| {
        eprintln!("Another thread started");
        thread::sleep(Duration::from_secs(2));
        eprintln!("Another thread canceling the future");
        cancel();
        eprintln!("Another thread exiting");
    });

    future.await;

    another_thread.join().expect("The other thread panicked");
}

#[pin_project]
#[derive(Debug)]
struct Cancelable<F> {
    #[pin]
    inner: F,
    info: Arc<Mutex<CancelInfo>>,
}

#[derive(Debug, Default)]
struct CancelInfo {
    cancelled: bool,
    task: Option<task::Waker>,
}

impl<F> Cancelable<F> {
    fn new(inner: F) -> (Self, impl FnOnce()) {
        let info = Arc::new(Mutex::new(CancelInfo::default()));
        let cancel = {
            let info = info.clone();
            move || {
                let mut info = info.lock().unwrap();
                info.cancelled = true;
                if let Some(waker) = info.task.take() {
                    waker.wake();
                }
            }
        };
        let me = Cancelable { inner, info };
        (me, cancel)
    }
}

impl<F> Future for Cancelable<F>
where
    F: Future<Output = ()>,
{
    type Output = ();

    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
        let this = self.project();
        let mut info = this.info.lock().unwrap();

        if info.cancelled {
            Poll::Ready(())
        } else {
            let r = this.inner.poll(ctx);

            if r.is_pending() {
                info.task = Some(ctx.waker().clone());
            }

            r
        }
    }
}

另请参阅:

票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/53752726

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档