我遇到了需要从方法返回IObservable<T>的情况。当对象T发生更改时,它将返回一个新值。
对象更改的触发器将来自参与者系统,在我的示例中,我们使用的是Proto.Actor。
我已经得到了一个“工作”的例子,但是我觉得它没有正确地实现IObservable,并且不会允许正确的取消订阅等等。
我的解决方案:
public IObservable<T> Watch<T>(long id)
{
var subscriber = _actorFactory.GetActor<ObjectChangedActor>();
var request = new ObjectWatchRequest(id);
return Observable.Create<T>(async (observer, cancellationToken) =>
{
await Task.Run(async () =>
{
while (true)
{
var response = await subscriber.RequestAsync<ObjectUpdatedResponse>(request, cancellationToken);
// once it gets past the above line, we have gotten notice that the object has been updated
var updatedObj = await Get<T>(id);
observer.OnNext(updatedObj);
}
}, cancellationToken);
// will this ever be called?
return Disposable.Create(() =>
{
Console.WriteLine("Disposal called");
});
});
}我觉得,尽管它“有效”,因为所有更新都被推送到订阅服务器上,但它永远无法准确地处理,而且我不认为while (true)应该放在代码中,即使它位于一个单独的线程上。
如何最好地处理从参与者系统返回的消息列表,并将这些消息转换为用户可以释放的IObservable (取消订阅)?
发布于 2017-12-19 06:01:33
您还没有提供完整的代码工作示例,这使得您很难为您提供一个可以复制和粘贴的工作解决方案。然而,我已经做了一个削减版本,我认为你可能可以工作。
你是对的,你的解决方案不是一个好的实现。
下面是您的代码的基本版本,一旦您将对象放入其中,它就可以正常工作了:
public IObservable<T> Watch<T>(long id)
{
return
Observable
.Defer(() =>
from x in Observable.FromAsync(() => RequestAsync())
from y in Observable.FromAsync(() => Get<T>(id))
select y)
.Repeat();
}如果这符合你的要求,请告诉我。
https://stackoverflow.com/questions/47879581
复制相似问题