首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >System.Threading.Channels ReadAsync()方法正在阻止执行

System.Threading.Channels ReadAsync()方法正在阻止执行
EN

Stack Overflow用户
提问于 2020-08-03 17:17:06
回答 1查看 1.5K关注 0票数 1

概述

我正试图围绕一个IAsyncEnumerable<T>接口编写一个IObserver<T>包装器。起初,我使用BufferBlock<T>作为后台数据存储,但通过性能测试和研究发现,它实际上是一个相当慢的类型,所以我决定让System.Threading.Channels.Channel类型试一试。我的BufferBlock实现也遇到了类似的问题,但这一次我不知道如何解决它。

问题

如果我的GetAsyncEnumerator()方法尚未写入await _channel.Reader.WaitToRead(token)调用,则IObserver<T>.OnNext()循环会被_channel调用阻塞。在这种情况下,在不阻止程序执行的情况下,等待值可用的正确方法是什么?

Implementation

代码语言:javascript
复制
public sealed class ObserverAsyncEnumerableWrapper<T> : IAsyncEnumerable<T>,
    IObserver<T>, IDisposable
{
    private readonly IDisposable _unsubscriber;
    private readonly Channel<T> _channel = Channel.CreateUnbounded<T>();

    private bool _producerComplete;

    public ObserverAsyncEnumerableWrapper(IObservable<T> provider)
    {
        _unsubscriber = provider.Subscribe(this);
    }

    public async void OnNext(T value)
    {
        Log.Logger.Verbose("Adding value to Channel.");
        await _channel.Writer.WriteAsync(value);
    }

    public void OnError(Exception error)
    {
        _channel.Writer.Complete(error);
    }

    public void OnCompleted()
    {
        _producerComplete = true;
    }

    public async IAsyncEnumerator<T> GetAsyncEnumerator(
        [EnumeratorCancellation] CancellationToken token = new CancellationToken())
    {
        Log.Logger.Verbose("Starting async iteration...");
        while (await _channel.Reader.WaitToReadAsync(token) || !_producerComplete)
        {
            Log.Logger.Verbose("Reading...");
            while (_channel.Reader.TryRead(out var item))
            {
                Log.Logger.Verbose("Yielding item.");
                yield return item;
            }
            Log.Logger.Verbose("Awaiting more items.");
        }
        Log.Logger.Verbose("Iteration Complete.");
        _channel.Writer.Complete();
    }

    public void Dispose()
    {
        _channel.Writer.Complete();
        _unsubscriber?.Dispose();
    }
}

附加上下文

这不重要,但是在运行时,传递给构造函数的IObservable<T>实例是一个CimAsyncResult,它是从异步调用到Microsoft.Management.Infrastructure apis返回的。这些都使用了观察者设计模式,我试图用花哨的新异步枚举模式来包装它。

编辑

通过日志记录更新到调试器输出,并使我的OnNext()方法如一位评论者所建议的那样异步/等待。您可以看到它从未进入while()循环。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-08-04 02:01:25

在调用堆栈的后面,我通过GetAwaiter().GetResult()方法同步地调用异步方法。

是的,这是个问题

我这样做是因为有一次我想从构造函数中获取数据。我将该实现更改为使用Task.Run()执行调用,现在迭代器在这两个实现中都运行得完美无缺。

有比阻止异步代码更好的解决方案。是避免死锁的一种方法。,但您的用户体验仍然不太理想(我假设您的应用程序是UI应用程序,因为有SynchronizationContext)。

如果异步枚举数用于加载数据以供显示,则更合适的解决方案是当数据异步加载时,该状态。。如果异步枚举器用于其他用途,您可能会在我的异步构造器博客文章中找到一些合适的替代模式。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/63234041

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档