首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >DropQueue RX.net机制

DropQueue RX.net机制
EN

Stack Overflow用户
提问于 2017-10-28 15:41:56
回答 1查看 95关注 0票数 0

我遇到了RX.net的背压问题,我找不到解决办法。我有一个可以观察到的实时日志消息流。

代码语言:javascript
复制
var logObservable = /* Observable stream of log messages */

我想通过一个TCP接口来公开它,该接口在通过线路发送之前序列化来自logObservable的实时日志消息。因此,我做以下几点:

代码语言:javascript
复制
foreach (var message in logObservable.ToEnumerable())
{
   // 1. Serialize message
   // 2. Send it over the wire. 
}

如果出现背压场景,.ToEnumerable()就会出现问题,例如,另一端的客户端暂停流。问题是.ToEnumerable()缓存这些项,这会导致大量的内存使用。我正在寻找一种类似于DropQueue的机制,它只对最后10条消息进行缓冲。

代码语言:javascript
复制
var observableStream = logObservable.DropQueue(10).ToEnumerable();

这是解决这个问题的正确途径吗?你知道如何实施这样一种机制来避免可能出现的背压问题吗?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2017-12-08 14:16:22

我的DropQueue实现:

代码语言:javascript
复制
    public static IEnumerable<TSource> ToDropQueue<TSource>(
        this IObservable<TSource> source,
        int queueSize,
        Action backPressureNotification = null,
        CancellationToken token = default(CancellationToken))
    {
        var queue = new BlockingCollection<TSource>(new ConcurrentQueue<TSource>(), queueSize);
        var isBackPressureNotified = false;

        var subscription = source.Subscribe(
            item =>
            {
                var isBackPressure = queue.Count == queue.BoundedCapacity;

                if (isBackPressure)
                {
                    queue.Take(); // Dequeue an item to make space for the next one

                    // Fire back-pressure notification if defined
                    if (!isBackPressureNotified && backPressureNotification != null)
                    {
                        backPressureNotification();
                        isBackPressureNotified = true;
                    }
                }
                else
                {
                    isBackPressureNotified = false;
                }

                queue.Add(item);
            },
            exception => queue.CompleteAdding(),
            () => queue.CompleteAdding());

        token.Register(() => { subscription.Dispose(); });

        using (new CompositeDisposable(subscription, queue))
        {
            foreach (var item in queue.GetConsumingEnumerable())
            {
                yield return item;
            }
        }
    }
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/46991491

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档