首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Rx.net与行为者模型

Rx.net与行为者模型
EN

Stack Overflow用户
提问于 2017-12-19 03:23:21
回答 1查看 606关注 0票数 1

我遇到了需要从方法返回IObservable<T>的情况。当对象T发生更改时,它将返回一个新值。

对象更改的触发器将来自参与者系统,在我的示例中,我们使用的是Proto.Actor

我已经得到了一个“工作”的例子,但是我觉得它没有正确地实现IObservable,并且不会允许正确的取消订阅等等。

我的解决方案:

代码语言:javascript
复制
    public IObservable<T> Watch<T>(long id)
    {
        var subscriber = _actorFactory.GetActor<ObjectChangedActor>();
        var request = new ObjectWatchRequest(id);

        return Observable.Create<T>(async (observer, cancellationToken) =>
        {
            await Task.Run(async () =>
            {
                while (true)
                {
                    var response = await subscriber.RequestAsync<ObjectUpdatedResponse>(request, cancellationToken);
                    // once it gets past the above line, we have gotten notice that the object has been updated
                    var updatedObj = await Get<T>(id);
                    observer.OnNext(updatedObj);
                }
            }, cancellationToken);

            // will this ever be called?
            return Disposable.Create(() =>
            {
                Console.WriteLine("Disposal called");
            });
        });
    }

我觉得,尽管它“有效”,因为所有更新都被推送到订阅服务器上,但它永远无法准确地处理,而且我不认为while (true)应该放在代码中,即使它位于一个单独的线程上。

如何最好地处理从参与者系统返回的消息列表,并将这些消息转换为用户可以释放的IObservable (取消订阅)?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2017-12-19 06:01:33

您还没有提供完整的代码工作示例,这使得您很难为您提供一个可以复制和粘贴的工作解决方案。然而,我已经做了一个削减版本,我认为你可能可以工作。

你是对的,你的解决方案不是一个好的实现。

下面是您的代码的基本版本,一旦您将对象放入其中,它就可以正常工作了:

代码语言:javascript
复制
public IObservable<T> Watch<T>(long id)
{
    return
        Observable
            .Defer(() =>
                from x in Observable.FromAsync(() => RequestAsync())
                from y in Observable.FromAsync(() => Get<T>(id))
                select y)
            .Repeat();
}

如果这符合你的要求,请告诉我。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/47879581

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档