我正在使用FuturesUnordered将异步工作负载排队到多线程tokio运行程序上。这些期货回报各种不同的结果。为了区分它们,我将每个未来的结果映射到自定义的Event类型。
enum Event {
ResultTypeA {...},
ResultTypeB {...},
ResultTypeC {...},
ResultTypeD {...}
}
let pending_futures: FuturesUnordered<Pin<Box<dyn Future<Output = Event> + Send>>> = FuturesUnordered::default()
loop {
tokio::select! {
Some(future) = workload_receiver.recv() => {
pending_futures.push(future.boxed());
},
Some(event) = pending_futures.next() => process_event(event),
else => break,
}
}上面的代码工作得很好,但是,我想限制并行处理的pending_futures数量。这就是buffered_unordered进来的地方。我天真的做法是:
loop {
tokio::select! {
Some(future) = workload_receiver.recv() => {
pending_futures.push(future.boxed());
},
Some(event) = pending_futures.buffered(10).next() => process_event(event),
else => break,
}
}这会引发以下编译错误:
--> src/main.rs
|
257 | Some(event) = pending_futures.buffered(10).next() => process_event(event),
| ^^^^^^^^^^^^ `Event` is not a future
|
= help: the trait `futures::Future` is not implemented for `Event`
= note: Event must be a future or must implement `IntoFuture` to be awaited
note: required by a bound in `buffered`
--> futures-util-0.3.24/src/stream/stream/mod.rs:1359:21
|
1359 | Self::Item: Future,
| ^^^^^^ required by this bound in `buffered`如何将FuturesUnordered限制为只同时处理其底层队列的N个期货,但仍然允许动态地排队新的期货?
发布于 2022-11-07 01:29:55
如果要限制并发性,则不希望使用FuturesUnordered,它将一直运行所有包含的任务。使用.buffered()不会对此有所帮助,因为它实现的Stream用于任务完成后的结果。
如果你的workload_receiver是tokio::sync::mpsc::Receiver,那么你很幸运!您可以通过Stream机箱通过ReceiverStream将其直接转换为托基奥流 (其他东西的包装器也存在)。这将非常适合于.buffered()或.buffered_unordered(),因为您似乎正在接收的项目是Future的。
https://stackoverflow.com/questions/74339190
复制相似问题