首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何避免InvalidOperationException抛出时ChannelReader.WaitToReadAsync?

如何避免InvalidOperationException抛出时ChannelReader.WaitToReadAsync?
EN

Stack Overflow用户
提问于 2019-04-25 02:03:47
回答 1查看 1.2K关注 0票数 2

我使用System.Threading.Channels编写了异步队列。但是当我运行测试程序时,随后的异常会被随机抛出并停止工作线程。

代码语言:javascript
复制
System.InvalidOperationException: The asynchronous operation has not completed.
   at System.Threading.Channels.AsyncOperation.ThrowIncompleteOperationException()
   at System.Threading.Channels.AsyncOperation`1.GetResult(Int16 token)
   at AsyncChannels.Worker() in g:\src\gitrepos\dotnet-sandbox\channelstest\AsyncChannelsTest.cs:line 26

如果异常被捕获并忽略,则代码正在工作。但我想消除那些原因不明的错误。

这是我的环境和最少的代码。

  • TargetFramework = netcoreapp2.1
  • System.Threading.Channels版本= 4.5.0
代码语言:javascript
复制
using System.Threading.Channels;
using System.Threading;
using System.Threading.Tasks;
using System;
using System.Linq;

class AsyncChannels : IDisposable
{
    Channel<TaskCompletionSource<bool>> _Channel;
    Thread _Thread;
    CancellationTokenSource _Cancellation;
    public AsyncChannels()
    {
        _Channel = Channel.CreateUnbounded<TaskCompletionSource<bool>>();
        _Thread = new Thread(Worker);
        _Thread.Start();
        _Cancellation = new CancellationTokenSource();
    }
    private void Worker()
    {
        while (!_Cancellation.IsCancellationRequested)
        {
            // System.InvalidOperationException is thrown
            if (!_Channel.Reader.WaitToReadAsync(_Cancellation.Token).Result)
            {
                break;
            }
            while (_Channel.Reader.TryRead(out var item))
            {
                item.TrySetResult(true);
            }
        }
    }
    public void Dispose()
    {
        _Cancellation.Cancel();
        _Channel.Writer.TryComplete();
        _Thread.Join();
    }
    public Task<bool> Enqueue()
    {
        var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
        _Channel.Writer.TryWrite(tcs);
        return tcs.Task;
    }
    public static async Task Test()
    {
        using (var queue = new AsyncChannels())
        {
            for (int i = 0; i < 100000; i++)
            {
                await queue.Enqueue().ConfigureAwait(false);
            }
        }
    }
}
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-04-25 13:29:43

你这样ValueTask

_Channel.Reader.WaitToReadAsync(_Cancellation.Token).Result

您只能使用ValueTask<T>做两件事:await it (仅一次),或者通过调用AsTask()将其转换为Task<T>。如果您需要做任何复杂的事情,比如多次使用await或阻塞它,那么您就需要使用AsTask()

或者,在本例中,只需在标准通道消费模式中使用await

代码语言:javascript
复制
class AsyncChannels : IDisposable
{
  Channel<TaskCompletionSource<bool>> _Channel;
  Task _Thread;
  CancellationTokenSource _Cancellation;
  public AsyncChannels()
  {
    _Channel = Channel.CreateUnbounded<TaskCompletionSource<bool>>();
    _Thread = Task.Run(() => WorkerAsync());
    _Cancellation = new CancellationTokenSource();
  }
  private async Task WorkerAsync()
  {
    try
    {
      while (await _Channel.Reader.WaitToReadAsync(_Cancellation.Token))
        while (_Channel.Reader.TryRead(out var item))
          item.TrySetResult(true);
    }
    catch (OperationCanceledException)
    {
    }
  }
  public void Dispose()
  {
    _Cancellation.Cancel();
    _Channel.Writer.TryComplete();
  }
  ...
}
票数 5
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/55840824

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档