首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >可扩展的可观测组合

可扩展的可观测组合
EN

Stack Overflow用户
提问于 2018-09-17 08:38:33
回答 1查看 76关注 0票数 0

我希望将列表的IObservables存储在容器中,并订阅这些可观察到的检索合并列表的组合。然后,我希望能够添加更多的观察,而不必更新订阅,并仍然得到新的结果。理想情况下,当将新的可观察到的添加到商店时,也应该启动。以下代码应解释更多内容:

代码语言:javascript
复制
using System;
using System.Collections.Generic;
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Linq;

namespace dynamic_combine
{
    class ObservableStuff
    {
        private List<IObservable<List<String>>> _listOfObservables = new List<IObservable<List<String>>>();

        public ObservableStuff() { }

        public void AddObservable(IObservable<List<String>> obs)
        {
            _listOfObservables.Add(obs);
        }

        public IObservable<IList<String>> GetCombinedObservable()
        {
            return Observable.CombineLatest(_listOfObservables)
                .Select((all) =>
                {
                    List<String> mergedList = new List<String>();
                    foreach(var list in all)
                    {
                        mergedList = mergedList.Concat(list).ToList();
                    }
                    return mergedList;
                });
        }
    }

    class Program
    {
        static void Main(string[] args)
        {
            ObservableStuff Stuff = new ObservableStuff();
            BehaviorSubject<List<String>> A = new BehaviorSubject<List<String>>(new List<String>() { "a", "b", "c" });
            BehaviorSubject<List<String>> B = new BehaviorSubject<List<String>>(new List<String>() { "d", "e", "f" });
            BehaviorSubject<List<String>> C = new BehaviorSubject<List<String>>(new List<String>() { "x", "y", "z" });

            Stuff.AddObservable(A.AsObservable());
            Stuff.AddObservable(B.AsObservable());

            Stuff.GetCombinedObservable().Subscribe((x) =>
            {
                Console.WriteLine(String.Join(",", x));
            });

            // Initial Output: a,b,c,d,e,f

            A.OnNext(new List<String>() { "1", "2", "3", "4", "5" });
            // Output: 1,2,3,4,5,d,e,f

            B.OnNext(new List<String>() { "6", "7", "8", "9", "0" });
            // Output: 1,2,3,4,5,6,7,8,9,0

            Stuff.AddObservable(C.AsObservable());
            // Wishful Output: 1,2,3,4,5,6,7,8,9,0,x,y,z

            C.OnNext(new List<String>() { "y", "e", "a", "h" });
            // Wishful Output: 1,2,3,4,5,6,7,8,9,0,y,e,a,h

            Console.WriteLine("Press the any key...");
            Console.ReadKey();
        }
    }
}

虽然该示例是在C#中实现的,但它最终需要在rxCpp中实现。此外,还可以看到Rx的其他变体中的实现。

我已经设置了一个存储库来检查代码,并可能将其扩展到其他语言:https://gitlab.com/dwaldorf/rx-examples

比尔,丹尼尔

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2018-09-18 06:03:44

首先,有一些变化是因为您的代码不那么容易阅读。GetCombinedObservable可以重写如下:

代码语言:javascript
复制
public IObservable<IList<String>> GetCombinedObservable()
{
    return Observable.CombineLatest(_listOfObservables)
        .Select(l => l.SelectMany(s => s).ToList());
}

您的问题归结为两件事:您希望_listOfObservables是动态的,这意味着将它从List<IObservable<T>>更改为IObservable<IObservable<T>>。但是,这样做的问题是CombineLatest不支持IObservable<IObservable<T>>,所以我们必须创建一个。

这给我们带来了这个有趣的、丑陋的小功能(使用nuget包System.Collections.Immutable):

代码语言:javascript
复制
public static class X
{
    public static IObservable<List<T>> DynamicCombineLatest<T>(this IObservable<IObservable<T>> source)
    {
        return source
            .SelectMany((o, i) => o.Select(item => (observableIndex: i, item: item)))
            .Scan(ImmutableDictionary<int, T>.Empty, (state, t) => state.SetItem(t.observableIndex, t.item))
            .Select(dict => dict.OrderBy(kvp => kvp.Key).Select(kvp => kvp.Value).ToList());
    }
}

现在我们可以更新你的班级了:

代码语言:javascript
复制
class ObservableStuff
{
    private ReplaySubject<IObservable<List<String>>> _subject = new ReplaySubject<IObservable<List<String>>>(int.MaxValue);

    public ObservableStuff() { }

    public void AddObservable(IObservable<List<String>> obs)
    {
        _subject.OnNext(obs);
    }

    public IObservable<IList<String>> GetCombinedObservable()
    {
        return _subject
            .DynamicCombineLatest()
            .Select(l => l.SelectMany(s => s).ToList());
    }
}
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/52363772

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档