首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Rx.Net:无限观察异步事件

Rx.Net:无限观察异步事件
EN

Stack Overflow用户
提问于 2017-03-19 19:05:41
回答 1查看 405关注 0票数 0

我有一个助手类,可以将文本消息保存到本地文件系统。此方法返回一个Task对象,并且根据定义是异步的。

我希望能够观察到这个方法何时被调用,这样我就可以持续监控缓冲区的大小和长度,并基于此做出决定。

我正在尝试使用.NET的Reactive扩展来实现这一点,但是,我想不出一个允许我连续监听添加到缓冲区的消息的设计。下面是我目前的实现:

代码语言:javascript
复制
public IObservable<Unit> Receive(InternalMessage message)
        {
            var observable = FileBuffer.BufferMessage(message.MessageId.ToString(), message, DateTime.UtcNow).ToObservable(); //This returns a Task, which I convert into an Observable
            return observable;
        }

下面是我订阅observable的方式:

代码语言:javascript
复制
IObservable<Unit> receiverObservable = batchHandler.Receive(message);
            receiverObservable.Subscribe(
                x => Console.WriteLine("On next"),
                ex => //TODO,
                () => // Completed);

我希望每次调用Receive方法时都调用订阅者。然而,一旦这个方法被调用,observable就会完成,序列也会终止,因此以后对Receive的调用将不会被监听。

谁能推荐一种方法来使用Rx.Net库来实现我正在寻找的这个可观察的模式,即如何保持序列打开并将异步方法的结果提供给它?

EN

回答 1

Stack Overflow用户

发布于 2017-03-21 01:20:01

按照您的编码,Receive将返回IObservable<Unit>,表示单个任务的完成。您希望订阅返回表示任务完成流的IObservable<IObservable<Unit>>的内容。

有许多方法可以做到这一点,其中最好的可能取决于你的类是如何设置的,以及你是如何调用它的。

下面是最懒的一条:

声明一个表示调用流的类级变量subject

代码语言:javascript
复制
Subject<IObservable<Unit>> subject = new Subject<IObservable<Unit>>();
subject.Merge().Subscribe(
    x => Console.WriteLine("On next"),
    ex => { },  //TODO
    () => { }   // Completed
);

然后,当你有一个新的调用时,你只需将它添加到主题中。

代码语言:javascript
复制
IObservable<Unit> receiverObservable = batchHandler.Receive(message);
subject.OnNext(receiverObservable);

这真的很懒惰的原因是,Rx的核心是函数式的,它倾向于看不到可变状态变量。Subjects基本上是可变状态。

更好的方法是找出调用Receive的时间和原因,并将其构造为可观察对象。一旦完成,你就可以解决这个问题了:

代码语言:javascript
复制
IObservable<Unit> sourceReasonsToCallReceive; // Most likely sourced from event

sourceReasonsToCallReceive.SelectMany(_ => batchHandler.Receive(message))
    .SubScribe(
    x => Console.WriteLine("On next"),
    ex => { },  //TODO
    () => { }   // Completed
);

希望这能有所帮助。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/42885822

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档