我有以下结构:
// source of data
interface IItem
{
IObservable<string> Changed { get; }
}
interface IItemCollection
{
List<IItem> Items { get; }
IObservable<IItem> ItemAdded { get; }
IObservable<IItem> ItemRemoved { get; }
}
interface IItemCollectionManager
{
List<IItemCollection> ItemCollectionCollection { get; }
IObservable<IItemCollection> ItemCollectionAdded { get; }
IObservable<IItemCollection> ItemCollectionRemoved { get; }
}
// desired result
interface IAggregation
{
IObservable<string> Changed { get; }
}这里的目标是让IAggregation公开一个可观察的对象。但是,可以随时在每个IItemCollection中添加和删除IItem,实际上,也可以随时在IItemCollectionManager中添加或删除IItemCollection。当然,当添加这样的IItemCollection时,Aggregation也应该从该ItemCollection发出值,如果删除了一个string,我就不再希望发出该集合中IItems的值。此外,当将Item添加到任何IItemCollection时,来自其Changed可观察值的值也应该产生IAggregation的Changed可观察值之外的值。
现在,当只有一个IItemCollection时,解决这个问题相当简单,例如:
class AggregationImpl : IAggregation
{
public AggregationImpl(IItemCollection itemCollection)
{
var added = itemCollection.ItemAdded
.Select(_ => itemCollection.Items);
var removed = itemCollection.ItemRemoved
.Select(_ => itemCollection.Items);
Changed = Observable.Merge(added, removed)
.StartWith(itemCollection.Items)
.Select(coll => coll.Select(item => item.Changed).Merge())
.Switch();
}
public IObservable<string> Changed { get; }
}..。这里的关键点是,我用Merge()将所有Item的Changed观察值展平成一个单一的观察值,然后,每次添加或删除一项时,我重新创建整个Observable,并使用Switch()取消订阅旧的并订阅新的。
我觉得扩展到包括IItemCollectionManager应该是非常简单的,但我不太确定如何处理它。
发布于 2020-03-30 02:02:51
我希望这能起作用,或者至少让你走上正确的道路。由于测试看起来相当复杂,我将即兴发挥。如果你有一些简单的测试代码,我很乐意测试。
首先,我真的不喜欢你发布的实现。您将连接到ItemAdded和ItemRemoved对象,而根本不使用数据;您将从Items属性获取数据。这可能会在糟糕的实现中导致争用条件,即在更新属性之前发出事件。因此,我创建了自己的实现。我还建议将其放入扩展方法中,因为这会使以后的工作变得更容易:
public static IObservable<string> ToAggregatedObservable(this IItemCollection itemCollection)
{
return Observable.Merge(
itemCollection.ItemAdded.Select(item => (op: "+", item)),
itemCollection.ItemRemoved.Select(item => (op: "-", item))
)
.Scan(ImmutableList<IItem>.Empty.AddRange(itemCollection.Items), (list, t) =>
t.op == "+"
? list.Add(t.item)
: list.Remove(t.item)
)
.Select(l => l.Select(item => item.Changed).Merge())
.Switch();
}请原谅这个神奇的字符串,如果你愿意的话,你可以把它变成一个enum。我们在Scan内部的ImmutableList中维护当前项的状态。当一个项目被添加/删除时,我们更新列表,然后切换可观察对象。
同样的逻辑也可以应用于集合管理器级别:
public static IObservable<string> ToAggregatedObservable(this IItemCollectionManager itemCollectionManager)
{
return Observable.Merge(
itemCollectionManager.ItemCollectionAdded.Select(itemColl => (op: "+", itemColl)),
itemCollectionManager.ItemCollectionRemoved.Select(itemColl => (op: "-", itemColl))
)
.Scan(ImmutableList<IItemCollection>.Empty.AddRange(itemCollectionManager.ItemCollectionCollection), (list, t) =>
t.op == "+"
? list.Add(t.itemColl)
: list.Remove(t.itemColl)
)
.Select(l => l.Select(itemColl => itemColl.ToAggregatedObservable()).Merge())
.Switch();
}在这里,我们只是重用了第一个扩展方法,并使用了与前面相同的添加/删除然后切换逻辑。
https://stackoverflow.com/questions/60891369
复制相似问题