首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Rx - CombineLatest队列

Rx - CombineLatest队列
EN

Stack Overflow用户
提问于 2016-10-13 04:43:13
回答 1查看 172关注 0票数 0

我正在尝试使用Rx实现类似队列的功能(我知道我可以使用Queue + like来实现它,但我试图学习+使用Rx,因为我没有太多使用它的机会)。

功能是,我想对要做的事件采取一些操作,但只想一次处理一个,一旦完成,就处理下一个/最后一个(如果它是新的)。

这就是我目前所拥有的(使用一个可观察到的递增标志,这样我就可以执行DistinctUntilChanged了,但这感觉像是一个hack)。

代码语言:javascript
复制
        // Some source
        var events = new Subject<string>();
        events.Subscribe(s => Console.WriteLine("Event: " + s));

        // How can I get rid of this
        var counter = 0;

        var flag = new Subject<int>();
        flag.Subscribe(i => Console.WriteLine("Flag: " + i));
        var combined = events
            .CombineLatest(flag, (s, i) => new {Event = s, Flag = i});

        var worker = combined
            .DistinctUntilChanged(arg => arg.Flag)
            .DistinctUntilChanged(arg => arg.Event);

        worker.Subscribe(x => Console.WriteLine("\r\nDo Work: " + x.Event + "\r\n"));


        flag.OnNext(counter++); // Ready
        events.OnNext("one"); // Idle, Start eight

        events.OnNext("two");
        events.OnNext("three");
        events.OnNext("four");

        events.OnNext("five"); // Process
        flag.OnNext(counter++); // Finished one, start five

        events.OnNext("six");
        events.OnNext("seven");

        events.OnNext("eight"); // Idle, Start eight
        flag.OnNext(counter++); // Finished five, start eight
        flag.OnNext(counter++); // Finished eight
        
        events.OnNext("nine"); // Should be processed

如果您运行此代码,您将看到最后一个事件没有得到处理,即使执行元是空闲的。

感觉我好像错过了什么.

目前,我正在考虑以某种方式使用可观察性中的可观察性……但在过去的几个小时里,我一直在努力解决这个问题:-(

编辑

通过将主题更改为ReplaySubject,我做到了这一点

代码语言:javascript
复制
        var events = new ReplaySubject<string>(1);

并引入了另一个变量,但看起来像是另一个技巧,但它确实让我摆脱了计数器。

代码语言:javascript
复制
        string lastSeen = null;
        flag.Select(x => events.Where(s => s != lastSeen).Take(1))
            .Subscribe(x =>
                x.Subscribe(s =>
                {
                    lastSeen = s;
                    Console.WriteLine(s);
                })
            );

如果有人知道一个更好的方法,可以让我摆脱string lastSeen或者简化我的嵌套/订阅,那就太好了。

EN

回答 1

Stack Overflow用户

发布于 2016-10-18 03:40:33

如果你愿意混入第三方DataFlow块,你可以做任何你想做的事情。尽管我必须说它有点不是Rxish。

代码语言:javascript
复制
var source = Observable.Interval(TimeSpan.FromSeconds(1));
var queue = new BroadcastBlock<long>(null);
var subscription = queue.AsObservable().DistinctUntilChanged().Subscribe(l =>
{
    Thread.Sleep(2500);
    Console.WriteLine(l);
});

source.SubscribeOn(ThreadPoolScheduler.Instance).Subscribe(queue.AsObserver());

BroadcastBlock是只保存最后一个值的队列。第三方DataFlow块在Rx上确实有很好的支持。因此,我们可以订阅事件并将其放入BroadcastBlock,然后在BroadcastBlock上进行订阅。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/40008020

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档