想知道是否有人能想出一个优雅的解决方案来解决这个用例:
我正在使用一个可观察对象(TEntity类型的IObservable),它为我提供了一个实体流。如果这些实体中的任何一个被更新,那么可观察体的提供者将下推更新后的实体。
我在这个流上使用了Replay(),这样我只需要订阅底层流一次,这样延迟的订阅者就可以看到所有的值。问题是这里可能存在内存泄漏,因为Replay()将保留它看到的所有更新,而我所需要的是每个实体的最新更新。
我可以将Replay()替换为Scan(),它只允许我维护最新的更新,但这样我就必须推出一个包含到目前为止观察到的所有更新的字典,而不仅仅是已经更改的特定实体。
我能想到的唯一解决方案是使用如上所述的Scan(),但在Scan()实现中,我将把所有更新推入到一个主题中。我将公开的IObservable的订阅者将收到存储在扫描()字典中的快照的合并以及任何更新,如下所示:
private Subject<Entity> _updateSubject = new Subject<Entity>();
private IObservable<Dictionary<string, Entity>> _subscriptionStream;
//called once on initialisation
private void CreateSubscription()
{
IObservable<Entity> source = GetSomeLongRunningSubscriptionStream();
_subscriptionStream = source
.Scan(new Dictionary<string, Entity>(), (accumulator,update) =>
{
accumulator[update.ID] = update;
_updateSubject.OnNext(update);
return accumulator;
})
.Replay(1);
}
//called each time a consumer wants access to the stream
public IObservable<Entity> GetStream()
{
return _subscriptionStream.Take(1).SelectMany(x => x).Select(x => x.Value)
.Merge(_updateSubject.AsObservable());
}有没有人能想到一种更优雅的解决方案,将状态保持在单个流中,而不是求助于主题?
谢谢
*编辑*
根据我的评论,我已经使用了类似的东西。让我知道你的想法
//called once on initialisation
private void CreateSubscription()
{
_baseSubscriptionObservable = GetSomeLongRunningSubscriptionStream ().Publish();
_snapshotObservable = _baseSubscriptionObservable
.Scan(new Dictionary<string,Entity>(), (accumulator, update) =>
{
accumulator[update.ID] = update;
return accumulator;
})
.StartWith(new Dictionary<string, Entity>())
.Replay(1);
_baseSubscriptionObservable.Connect ();
_snapshotObservable.Connect ();
}
public IObservable<Entity> GetStream()
{
return _snapshotObservable.Take (1).Select (x => x.Values).SelectMany (x => x)
.Merge (_baseSubscriptionObservable);
}发布于 2015-08-22 08:07:41
我大体上喜欢你正在做的事情,但我能看到一些问题。
首先,您已经将CreateSubscription和GetStream拆分为两个方法,其思想是您将拥有对GetSomeLongRunningSubscriptionStream()流的一个底层订阅。不幸的是,在这种情况下,无论对最终可观察对象有多少订阅,您都将没有订阅,因为.Replay(1)返回一个IConnectableObservable<>,您需要调用.Connect()来开始值流。
下一件事是用最新的值更新累加器,然后在GetStream中添加最新的值,并合并到累加器的扁平流中。每次返回最新的值两次。
下面是我建议你这样做的方法:
private IObservable<IList<Timestamped<Entity>>> GetStream()
{
return
Observable
.Create<IList<Timestamped<Entity>>>(o =>
GetSomeLongRunningSubscriptionStream()
.Timestamp()
.Scan(
new Dictionary<string, Timestamped<Entity>>(),
(accumulator, update) =>
{
accumulator[update.Value.ID] = update;
return accumulator;
})
.Select(x => x.Select(y => y.Value).ToList())
.Replay(1)
.RefCount()
.Subscribe(o));
}在使用Rx时,最好避免任何状态(这不是在可观察对象中的本地化状态)。因此,我将CreateSubscription和GetStream合并到一个GetStream方法中,并将整个可观察对象封装到一个Observable.Create中。
为了避免两次推出值,并方便您了解最新更新是什么,我添加了一个对.Timestamp()的调用,以放入返回Entity的最新时间。
我把.Scan(...)和字典放在一起,但现在它是一个Dictionary<string, Timestamped<Entity>>。
对于添加/更新的每个值,我都会扁平化字典,并以列表的形式返回底层值。此时,您可以对列表进行排序,以确保最新的值排在最前面或最后,以满足您的需要。
然后,我使用.Replay(1).RefCount()组合将.Replay(1)返回的IConnectableObservable<>转换回IObservable<>,并理解了当所有订阅者都处理时,您将处理底层订阅。这可能是您的查询中最关键的部分。应该这样做。这是确保您避免内存泄漏的Rx方法。
如果您迫切需要保持底层连接打开,则需要将所有代码封装在一个实现IDisposable的类中,以清理所需的.Connect()。
如下所示:
public class EntityStream : IDisposable
{
private IDisposable _connection = null;
public EntityStream(IObservable<Entity> someLongRunningSubscriptionStream)
{
_stream =
someLongRunningSubscriptionStream
.Timestamp()
.Scan(
new Dictionary<string, Timestamped<Entity>>(),
(accumulator, update) =>
{
accumulator[update.Value.ID] = update;
return accumulator;
})
.Select(x => x.Select(y => y.Value).ToList())
.Replay(1);
_connection = _stream.Connect();
}
private IConnectableObservable<IList<Timestamped<Entity>>> _stream = null;
public IObservable<IList<Timestamped<Entity>>> GetStream()
{
return _stream.AsObservable();
}
public void Dispose()
{
if (_connection != null)
{
_connection.Dispose();
_connection = null;
}
}
}不过,我很少这样做。我会彻底推荐使用第一种方法。你应该只在必要的时候混合使用OOP和Rx。
如果您需要任何澄清,请告诉我。
https://stackoverflow.com/questions/32138092
复制相似问题