我使用的是Rx .NET的Subject.OnError,它似乎是抛出而不是传播异常。我的场景是,主题是在一个单独的线程上输入数据,调用线程在返回数据时需要做一些事情,还需要等待可观察到的所有数据的完成,以及传播发生的任何异常。
下面是一个简化的示例:
class Program
{
static async Task Main(string[] args)
{
var subject = new Subject<bool>();
Task.Run(async () =>
{
await Task.Delay(5000);
subject.OnError(new Exception()); //This call is throwing!
});
subject.Subscribe(e =>
{
//Do some data processing here
});
try
{
//Need to wait for observable to complete before returning to the caller
await subject.LastOrDefaultAsync();
}
catch
{
//Do some logging, clean up resources
throw;
}
}
}如果我删除对subject.Subscribe()的调用,代码就会像您预期的那样工作,并且异常将在subject.LastOrDefaultAsync()上重新抛出。但是,有了Subscribe,对subject.OnError()的调用马上就会重新抛出异常(而不是将它传递给可观察的),这对我来说是完全奇怪的。
我如何解决这个问题?
(FYI,大量的代码已经使用Subject编写了,所以建议我根本不使用它是一个不可接受的解决方案)
发布于 2019-01-11 20:35:41
下面是一个简单的例子:
void Main()
{
var subject = new Subject<bool>();
subject.Subscribe(b => {/* bool handling code */});
subject.OnError(new Exception()); //This call is throwing!
}Subscribe重载重新抛出它接收到的异常。如果要忽略这些异常,请执行以下操作:
void Main()
{
var subject = new Subject<bool>();
// subject.Subscribe();
subject.Subscribe(b => {/* bool handling code */}, e => { });
subject.OnError(new Exception()); //This call is throwing!
}如果您想看到这个源,请在这里查看:https://github.com/dotnet/reactive/blob/master/Rx.NET/Source/src/System.Reactive/Observable.Extensions.cs (第63行)。它会抛出任何捕获的异常。
编辑:
如果您想跳进兔子洞,下面的异常处理代码(实际上)会被.Subscribe(onNextHandlerOnly)重载调用:
void Main()
{
var subject = new Subject<bool>();
subject.Subscribe(b => b.Dump(), e => { e.Throw(); }, () => {});
subject.OnError(new Exception()); //This call is throwing!
}
public static class X
{
public static void Throw(this Exception exception)
{
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Capture(exception).Throw();
}
}EDI.Capture调用使它看起来像是异常的“源”是OnError调用,而不是Subscribe。
https://stackoverflow.com/questions/54151477
复制相似问题