我有一个助手类,可以将文本消息保存到本地文件系统。此方法返回一个Task对象,并且根据定义是异步的。
我希望能够观察到这个方法何时被调用,这样我就可以持续监控缓冲区的大小和长度,并基于此做出决定。
我正在尝试使用.NET的Reactive扩展来实现这一点,但是,我想不出一个允许我连续监听添加到缓冲区的消息的设计。下面是我目前的实现:
public IObservable<Unit> Receive(InternalMessage message)
{
var observable = FileBuffer.BufferMessage(message.MessageId.ToString(), message, DateTime.UtcNow).ToObservable(); //This returns a Task, which I convert into an Observable
return observable;
}下面是我订阅observable的方式:
IObservable<Unit> receiverObservable = batchHandler.Receive(message);
receiverObservable.Subscribe(
x => Console.WriteLine("On next"),
ex => //TODO,
() => // Completed);我希望每次调用Receive方法时都调用订阅者。然而,一旦这个方法被调用,observable就会完成,序列也会终止,因此以后对Receive的调用将不会被监听。
谁能推荐一种方法来使用Rx.Net库来实现我正在寻找的这个可观察的模式,即如何保持序列打开并将异步方法的结果提供给它?
发布于 2017-03-21 01:20:01
按照您的编码,Receive将返回IObservable<Unit>,表示单个任务的完成。您希望订阅返回表示任务完成流的IObservable<IObservable<Unit>>的内容。
有许多方法可以做到这一点,其中最好的可能取决于你的类是如何设置的,以及你是如何调用它的。
下面是最懒的一条:
声明一个表示调用流的类级变量subject:
Subject<IObservable<Unit>> subject = new Subject<IObservable<Unit>>();
subject.Merge().Subscribe(
x => Console.WriteLine("On next"),
ex => { }, //TODO
() => { } // Completed
);然后,当你有一个新的调用时,你只需将它添加到主题中。
IObservable<Unit> receiverObservable = batchHandler.Receive(message);
subject.OnNext(receiverObservable);这真的很懒惰的原因是,Rx的核心是函数式的,它倾向于看不到可变状态变量。Subjects基本上是可变状态。
更好的方法是找出调用Receive的时间和原因,并将其构造为可观察对象。一旦完成,你就可以解决这个问题了:
IObservable<Unit> sourceReasonsToCallReceive; // Most likely sourced from event
sourceReasonsToCallReceive.SelectMany(_ => batchHandler.Receive(message))
.SubScribe(
x => Console.WriteLine("On next"),
ex => { }, //TODO
() => { } // Completed
);希望这能有所帮助。
https://stackoverflow.com/questions/42885822
复制相似问题