首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Rx.NET中无onBackpressureLatest的背压处理

Rx.NET中无onBackpressureLatest的背压处理
EN

Stack Overflow用户
提问于 2017-02-14 21:41:20
回答 4查看 3.2K关注 0票数 11

我需要在Rx.NET中实现以下算法:

  1. stream获取最新的项,或者在没有新项的情况下等待新项而不阻塞。只有最新的项目才重要,其他的可以放弃。
  2. 将项目输入SlowFunction并打印输出。
  3. 从第一步重复。

天真的解决办法是:

代码语言:javascript
复制
let PrintLatestData (stream: IObservable<_>) =
    stream.Select(SlowFunction).Subscribe(printfn "%A")

但是,该解决方案无法工作,因为平均而言,streamSlowFunction能够更快地释放项目。由于Select不删除项,而是尝试处理从最老到最新的每一项,因此在程序运行时,发出的项和打印的项之间的延迟将增长到无穷大。只有最新的项目,应该从溪流,以避免这种无限增长的背压。

我搜索了文档,并在RxJava中找到了一个名为RxJava的方法,据我所知,该方法将完成我前面描述的工作。但是,Rx.NET中不存在该方法。如何在Rx.NET中实现该方法?

EN

回答 4

Stack Overflow用户

回答已采纳

发布于 2017-02-15 03:42:36

我想你想用像ObserveLatestOn这样的东西。它有效地用单个值和标志替换传入事件的队列。

James已经在这里博客了,http://www.zerobugbuild.com/?p=192

这一概念在GUI应用程序中被大量使用,这些应用程序无法相信服务器可能会以多快的速度将数据推向GUI应用程序。

您还可以看到反应性Trader https://github.com/AdaptiveConsulting/ReactiveTrader/blob/83a6b7f312b9ba9d70327f03d8d326934b379211/src/Adaptive.ReactiveTrader.Shared/Extensions/ObservableExtensions.cs#L64中的实现以及解释ReactiveTrader https://leecampbell.com/presentations/#ReactConfLondon2014的支持演示文稿。

要明确的是,这是一个减载算法,而不是背压算法。

票数 11
EN

Stack Overflow用户

发布于 2017-02-14 23:54:04

同步/异步建议可能会稍微有所帮助,但是,给定慢函数总是比事件流慢,使其异步可能允许您并行处理(在线程池上观察),代价是(最终)只运行完线程或通过上下文切换增加更多的延迟。对我来说这听起来不像是解决办法。

我建议您看看由Dave编写的开源Rxx‘内省’操作符。这可能会改变缓冲区/节流阀的周期,这是因为队列由于使用者速度慢而备份。如果慢函数突然变得更快,它就不会缓冲任何东西。如果速度变慢,就会缓冲更多。您必须检查是否有一个‘最新的从’类型,或只是修改现有的,以满足您的需要。例如,使用缓冲区,只使用缓冲区中的最后一项,或者进一步增强,仅在内部存储最新的内容。谷歌'Rxx',你会在某个地方找到它。

一个更简单的方法,如果‘慢功能’的时间是相当可预测的,就是简单地节流超过这个时间的数量。显然,我指的不是标准的rx“油门”,而是一个允许更新的标准,而不是旧的。这里有很多解决这类问题的方法。

票数 1
EN

Stack Overflow用户

发布于 2017-02-15 08:16:55

同样的问题在一段时间前也发生在我身上,我没有找到一个内置的操作符来做到这一点。所以我写了我自己的,我称之为Latest。实现起来并不简单,但发现它在我当前的项目中非常有用。

它的工作方式如下:当观察者忙于处理以前的通知(当然是在它自己的线程上)时,它会对最后的n个通知(n >= 0)进行排队,并在观察员空闲时立即对其进行排序。所以:

  • Latest(0):仅在观察者空闲时观察到达的项目
  • Latest(1):总是观察最新的
  • Latest(1000) (例如):通常处理所有的项目,但是如果某些东西被卡住了,宁可错过一些,也不愿得到OutOfMemoryException
  • Latest(int.MaxValue):永远不要错过一个项目,而是生产者和消费者之间的负载平衡。

因此,您的代码是:stream.Latest(1).Select(SlowFunction).Subscribe(printfn "%A")

签名如下:

代码语言:javascript
复制
/// <summary>
/// Avoids backpressure by enqueuing items when the <paramref name="source"/> produces them more rapidly than the observer can process.
/// </summary>
/// <param name="source">The source sequence.</param>
/// <param name="maxQueueSize">Maximum queue size. If the queue gets full, less recent items are discarded from the queue.</param>
/// <param name="scheduler">Optional, default: <see cref="Scheduler.Default"/>: <see cref="IScheduler"/> on which to observe notifications.</param>
/// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
/// <exception cref="ArgumentOutOfRangeException"><paramref name="maxQueueSize"/> is negative.</exception>
/// <remarks>
/// A <paramref name="maxQueueSize"/> of 0 observes items only if the subscriber is ready.
/// A <paramref name="maxQueueSize"/> of 1 guarantees to observe the last item in the sequence, if any.
/// To observe the whole source sequence, specify <see cref="int.MaxValue"/>.
/// </remarks>
public static IObservable<TSource> Latest<TSource>(this IObservable<TSource> source, int maxQueueSize, IScheduler scheduler = null)

实现太大了,不能在这里发布,但是如果有人感兴趣,我很乐意分享它。让我知道。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/42236757

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档