首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Replay()-like功能,但是能够替换陈旧的值吗?

Replay()-like功能,但是能够替换陈旧的值吗?
EN

Stack Overflow用户
提问于 2015-08-21 18:29:52
回答 1查看 432关注 0票数 0

想知道是否有人能想出一个优雅的解决方案来解决这个用例:

我正在使用一个可观察对象(TEntity类型的IObservable),它为我提供了一个实体流。如果这些实体中的任何一个被更新,那么可观察体的提供者将下推更新后的实体。

我在这个流上使用了Replay(),这样我只需要订阅底层流一次,这样延迟的订阅者就可以看到所有的值。问题是这里可能存在内存泄漏,因为Replay()将保留它看到的所有更新,而我所需要的是每个实体的最新更新。

我可以将Replay()替换为Scan(),它只允许我维护最新的更新,但这样我就必须推出一个包含到目前为止观察到的所有更新的字典,而不仅仅是已经更改的特定实体。

我能想到的唯一解决方案是使用如上所述的Scan(),但在Scan()实现中,我将把所有更新推入到一个主题中。我将公开的IObservable的订阅者将收到存储在扫描()字典中的快照的合并以及任何更新,如下所示:

代码语言:javascript
复制
private Subject<Entity> _updateSubject = new Subject<Entity>();

private IObservable<Dictionary<string, Entity>> _subscriptionStream;

//called once on initialisation
private void CreateSubscription()
{
    IObservable<Entity> source = GetSomeLongRunningSubscriptionStream();

    _subscriptionStream = source
    .Scan(new Dictionary<string, Entity>(), (accumulator,update) =>
    {
        accumulator[update.ID] = update;

        _updateSubject.OnNext(update);

        return accumulator;
    })
    .Replay(1);
}

//called each time a consumer wants access to the stream
public IObservable<Entity> GetStream()
{
    return _subscriptionStream.Take(1).SelectMany(x => x).Select(x => x.Value)
    .Merge(_updateSubject.AsObservable());
}

有没有人能想到一种更优雅的解决方案,将状态保持在单个流中,而不是求助于主题?

谢谢

*编辑*

根据我的评论,我已经使用了类似的东西。让我知道你的想法

代码语言:javascript
复制
//called once on initialisation
private void CreateSubscription()
        {
            _baseSubscriptionObservable = GetSomeLongRunningSubscriptionStream ().Publish();

            _snapshotObservable = _baseSubscriptionObservable
                .Scan(new Dictionary<string,Entity>(), (accumulator, update) =>
                    {
                        accumulator[update.ID] = update;

                        return accumulator;
                    })
                .StartWith(new Dictionary<string, Entity>())
                .Replay(1);

            _baseSubscriptionObservable.Connect ();
            _snapshotObservable.Connect ();
        }

public IObservable<Entity> GetStream()
        {
            return _snapshotObservable.Take (1).Select (x => x.Values).SelectMany (x => x)
                .Merge (_baseSubscriptionObservable);
        }
EN

回答 1

Stack Overflow用户

发布于 2015-08-22 08:07:41

我大体上喜欢你正在做的事情,但我能看到一些问题。

首先,您已经将CreateSubscriptionGetStream拆分为两个方法,其思想是您将拥有对GetSomeLongRunningSubscriptionStream()流的一个底层订阅。不幸的是,在这种情况下,无论对最终可观察对象有多少订阅,您都将没有订阅,因为.Replay(1)返回一个IConnectableObservable<>,您需要调用.Connect()来开始值流。

下一件事是用最新的值更新累加器,然后在GetStream中添加最新的值,并合并到累加器的扁平流中。每次返回最新的值两次。

下面是我建议你这样做的方法:

代码语言:javascript
复制
private IObservable<IList<Timestamped<Entity>>> GetStream()
{
    return
        Observable
            .Create<IList<Timestamped<Entity>>>(o =>
                GetSomeLongRunningSubscriptionStream()
                    .Timestamp()
                    .Scan(
                        new Dictionary<string, Timestamped<Entity>>(),
                        (accumulator, update) =>
                        {
                            accumulator[update.Value.ID] = update;
                            return accumulator;
                        })
                    .Select(x => x.Select(y => y.Value).ToList())
                    .Replay(1)
                    .RefCount()
                    .Subscribe(o));
}

在使用Rx时,最好避免任何状态(这不是在可观察对象中的本地化状态)。因此,我将CreateSubscriptionGetStream合并到一个GetStream方法中,并将整个可观察对象封装到一个Observable.Create中。

为了避免两次推出值,并方便您了解最新更新是什么,我添加了一个对.Timestamp()的调用,以放入返回Entity的最新时间。

我把.Scan(...)和字典放在一起,但现在它是一个Dictionary<string, Timestamped<Entity>>

对于添加/更新的每个值,我都会扁平化字典,并以列表的形式返回底层值。此时,您可以对列表进行排序,以确保最新的值排在最前面或最后,以满足您的需要。

然后,我使用.Replay(1).RefCount()组合将.Replay(1)返回的IConnectableObservable<>转换回IObservable<>,并理解了当所有订阅者都处理时,您将处理底层订阅。这可能是您的查询中最关键的部分。应该这样做。这是确保您避免内存泄漏的Rx方法。

如果您迫切需要保持底层连接打开,则需要将所有代码封装在一个实现IDisposable的类中,以清理所需的.Connect()

如下所示:

代码语言:javascript
复制
public class EntityStream : IDisposable 
{
    private IDisposable _connection = null;

    public EntityStream(IObservable<Entity> someLongRunningSubscriptionStream)
    {
        _stream =
            someLongRunningSubscriptionStream
            .Timestamp()
            .Scan(
                new Dictionary<string, Timestamped<Entity>>(),
                (accumulator, update) =>
                {
                    accumulator[update.Value.ID] = update;
                    return accumulator;
                })
            .Select(x => x.Select(y => y.Value).ToList())
            .Replay(1);

        _connection = _stream.Connect();
    }

    private IConnectableObservable<IList<Timestamped<Entity>>> _stream = null;

    public IObservable<IList<Timestamped<Entity>>> GetStream()
    {
        return _stream.AsObservable();
    }

    public void Dispose()
    {
    if (_connection != null)
    {
        _connection.Dispose();
        _connection = null;
    }
    }
}

不过,我很少这样做。我会彻底推荐使用第一种方法。你应该只在必要的时候混合使用OOP和Rx。

如果您需要任何澄清,请告诉我。

票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/32138092

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档