首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >推送SelectAll流

推送SelectAll流
EN

Stack Overflow用户
提问于 2021-10-10 17:56:45
回答 1查看 182关注 0票数 1

我想要一个处理一些流的结构。所有流共享同一项。逻辑如下:我需要创建一个唯一的流,其中包含来自所有其他流的所有项。我还需要在“主”流中添加“新”流。我不在乎下一个项目来自哪个流。

为此,我看到有一个select_all函数应该执行上面描述的逻辑。

代码语言:javascript
复制
pub struct WsPool {
    merge: Arc<Mutex<SelectAll<Box<dyn Stream<Item=MyItem> + Send + 'static>>>>,
}

impl WsPool {
    pub fn new() -> Self {
        Self {
            merge: Arc::new(Mutex::new(SelectAll::new())),
        }
    }

    pub fn add(&self, s: Box<dyn Stream<Item = MyItem> + Send + 'static>) {
        let mut merge  = self.merge.lock().unwrap();

        merge.push(s);
    }

    pub async fn process(&self) {
        loop {
            let mut merge = self.merge.lock().unwrap();
            let item = merge.await.next();
        }
    }
}

但我收到这些错误:

代码语言:javascript
复制
error[E0277]: `std::sync::MutexGuard<'_, futures::stream::SelectAll<Box<(dyn futures::Stream<Item = MyItem> + std::marker::Send + 'static)>>>` is not a future
  --> src/ws_pool.rs:30:24
   |
30 |             let item = merge.await.next();
   |                        ^^^^^^^^^^^ `std::sync::MutexGuard<'_, futures::stream::SelectAll<Box<(dyn futures::Stream<Item = MyItem> + std::marker::Send + 'static)>>>` is not a future
   |
   = help: the trait `futures::Future` is not implemented for `std::sync::MutexGuard<'_, futures::stream::SelectAll<Box<(dyn futures::Stream<Item = MyItem> + std::marker::Send + 'static)>>>`
note: required by `futures::Future::poll`
  --> /home/allevo/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/future/future.rs:99:5
   |
99 |     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
   |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

error[E0277]: `(dyn futures::Stream<Item = MyItem> + std::marker::Send + 'static)` cannot be unpinned
  --> src/ws_pool.rs:17:40
   |
17 |             merge: Arc::new(Mutex::new(SelectAll::new())),
   |                                        ^^^^^^^^^^^^^^^^ the trait `Unpin` is not implemented for `(dyn futures::Stream<Item = MyItem> + std::marker::Send + 'static)`
   |
   = note: consider using `Box::pin`
   = note: required because of the requirements on the impl of `futures::Stream` for `Box<(dyn futures::Stream<Item = MyItem> + std::marker::Send + 'static)>`
note: required by `futures::stream::SelectAll::<St>::new`
  --> /home/allevo/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.17/src/stream/select_all.rs:47:5
   |
47 |     pub fn new() -> Self {
   |     ^^^^^^^^^^^^^^^^^^^^

error[E0599]: the method `push` exists for struct `std::sync::MutexGuard<'_, futures::stream::SelectAll<Box<(dyn futures::Stream<Item = MyItem> + std::marker::Send + 'static)>>>`, but its trait bounds were not satisfied
   --> src/ws_pool.rs:24:15
    |
24  |           merge.push(s);
    |                 ^^^^ method cannot be called on `std::sync::MutexGuard<'_, futures::stream::SelectAll<Box<(dyn futures::Stream<Item = MyItem> + std::marker::Send + 'static)>>>` due to unsatisfied trait bounds
    |
   ::: /home/allevo/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/alloc/src/boxed.rs:172:1
    |
172 | / pub struct Box<
173 | |     T: ?Sized,
174 | |     #[unstable(feature = "allocator_api", issue = "32838")] A: Allocator = Global,
175 | | >(Unique<T>, A);
    | |________________- doesn't satisfy `_: futures::Stream`
    |
    = note: the following trait bounds were not satisfied:
            `Box<(dyn futures::Stream<Item = MyItem> + std::marker::Send + 'static)>: futures::Stream`

我做错了什么?否则,我如何存储多个流并对它们进行迭代?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-10-10 18:44:24

你的第一个问题是一个简单的混淆。您想要await下一个值,而不是流本身:

代码语言:javascript
复制
let item = merge.next().await;

所产生的错误都是因为SelectAll<Box<dyn Stream + Send + 'static>>没有实现Stream。如果您查看impl Stream for SelectAll,内部流类型实现Unpin是受到限制的。

您可以通过将它添加到边界中来解决这个问题:

代码语言:javascript
复制
use std::marker::Unpin;
                                               // vvvvv
Arc<Mutex<SelectAll<Box<dyn Stream<Item=MyItem> + Unpin + Send + 'static>>>>

或者,一个更好的解决方案是将流固定在一起:

代码语言:javascript
复制
use std::pin::Pin;
                 // vvv
Arc<Mutex<SelectAll<Pin<Box<dyn Stream<Item=MyItem> + Send + 'static>>>>>

不同的是后者可以接受更多的Stream类型。添加它们时,只需使用Box::pin即可。

看到它在游乐场上工作。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/69517659

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档