概述
我正试图围绕一个IAsyncEnumerable<T>接口编写一个IObserver<T>包装器。起初,我使用BufferBlock<T>作为后台数据存储,但通过性能测试和研究发现,它实际上是一个相当慢的类型,所以我决定让System.Threading.Channels.Channel类型试一试。我的BufferBlock实现也遇到了类似的问题,但这一次我不知道如何解决它。
问题
如果我的GetAsyncEnumerator()方法尚未写入await _channel.Reader.WaitToRead(token)调用,则IObserver<T>.OnNext()循环会被_channel调用阻塞。在这种情况下,在不阻止程序执行的情况下,等待值可用的正确方法是什么?
Implementation
public sealed class ObserverAsyncEnumerableWrapper<T> : IAsyncEnumerable<T>,
IObserver<T>, IDisposable
{
private readonly IDisposable _unsubscriber;
private readonly Channel<T> _channel = Channel.CreateUnbounded<T>();
private bool _producerComplete;
public ObserverAsyncEnumerableWrapper(IObservable<T> provider)
{
_unsubscriber = provider.Subscribe(this);
}
public async void OnNext(T value)
{
Log.Logger.Verbose("Adding value to Channel.");
await _channel.Writer.WriteAsync(value);
}
public void OnError(Exception error)
{
_channel.Writer.Complete(error);
}
public void OnCompleted()
{
_producerComplete = true;
}
public async IAsyncEnumerator<T> GetAsyncEnumerator(
[EnumeratorCancellation] CancellationToken token = new CancellationToken())
{
Log.Logger.Verbose("Starting async iteration...");
while (await _channel.Reader.WaitToReadAsync(token) || !_producerComplete)
{
Log.Logger.Verbose("Reading...");
while (_channel.Reader.TryRead(out var item))
{
Log.Logger.Verbose("Yielding item.");
yield return item;
}
Log.Logger.Verbose("Awaiting more items.");
}
Log.Logger.Verbose("Iteration Complete.");
_channel.Writer.Complete();
}
public void Dispose()
{
_channel.Writer.Complete();
_unsubscriber?.Dispose();
}
}

附加上下文
这不重要,但是在运行时,传递给构造函数的IObservable<T>实例是一个CimAsyncResult,它是从异步调用到Microsoft.Management.Infrastructure apis返回的。这些都使用了观察者设计模式,我试图用花哨的新异步枚举模式来包装它。
编辑
通过日志记录更新到调试器输出,并使我的OnNext()方法如一位评论者所建议的那样异步/等待。您可以看到它从未进入while()循环。
发布于 2020-08-04 02:01:25
在调用堆栈的后面,我通过GetAwaiter().GetResult()方法同步地调用异步方法。
是的,这是个问题。
我这样做是因为有一次我想从构造函数中获取数据。我将该实现更改为使用Task.Run()执行调用,现在迭代器在这两个实现中都运行得完美无缺。
有比阻止异步代码更好的解决方案。是避免死锁的一种方法。,但您的用户体验仍然不太理想(我假设您的应用程序是UI应用程序,因为有SynchronizationContext)。
如果异步枚举数用于加载数据以供显示,则更合适的解决方案是当数据异步加载时,该状态。。如果异步枚举器用于其他用途,您可能会在我的异步构造器博客文章中找到一些合适的替代模式。
https://stackoverflow.com/questions/63234041
复制相似问题