首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用System.Threading.Channels时不执行的异步方法

使用System.Threading.Channels时不执行的异步方法
EN

Stack Overflow用户
提问于 2022-01-11 16:42:35
回答 2查看 506关注 0票数 1

由于某种原因,它似乎在使用者或生产者任务中执行代码。我哪里出问题了?

代码语言:javascript
复制
using System.Threading.Channels;

namespace TEST.CHANNELS
{
    public class Program
    {
        public static async Task Main(string[] args)
        {
            var channel = Channel.CreateUnbounded<int>();
            var cancel = new CancellationToken();
            await Consumer(channel, cancel);
            await Producer(channel, cancel);

            Console.ReadKey();
        }

        private static async Task Producer(Channel<int, int> ch, CancellationToken cancellationToken)
        {
            for (int i = 0; i < 59; i++)
            {
                await Task.Delay(1000, cancellationToken);
                await ch.Writer.WriteAsync(i, cancellationToken);
            }
        }
        
        private static async Task Consumer(Channel<int, int> ch, CancellationToken cancellationToken)
        {
            await foreach (var item in ch.Reader.ReadAllAsync(cancellationToken))
            {
                Console.WriteLine(item);
            }
        }
    }
}
EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2022-01-11 16:50:03

如果您是新手,我建议您阅读教程:学习如何使用Visual调试C#代码。您应该知道如何放置断点来看到代码一步一步地运行。

但是,由于这一项涉及异步/任务,它可能看起来很混乱,但是当您进入Consumer时,您将看到它在await foreach (var item in ch.Reader.ReadAllAsync(cancellationToken))行停止。

原因是消费者在等待生产者从来不投入的东西。原因是您的第一个await停止了您的代码,所以第2行永远不会被执行。

代码语言:javascript
复制
await Consumer(channel, cancel);
await Producer(channel, cancel);

这应能解决以下问题:

代码语言:javascript
复制
var consumerTask = Consumer(channel, cancel);
var producerTask = Producer(channel, cancel);

await Task.WhenAll(consumerTask, producerTask);

上面的代码是,

  1. 运行Consumer,不要等待它,而是在consumerTask中跟踪它。
  2. 运行生产者任务,不要等待它,但是要在producerTask中跟踪它。
  3. 等待直到consumerTaskproducerTask都使用了Task.WhenAll

请注意,消费者似乎仍然有一个逻辑问题,因为它永远不会退出,所以您的ReadKey()可能不会被击中(您的应用程序会卡在WhenAll线上)。我认为,如果您打算修复它,如果它是一个bug,对您来说“练习”要容易得多。

票数 3
EN

Stack Overflow用户

发布于 2022-01-11 17:08:43

在生成任何消息之前,您的代码正在尝试使用通道中的所有消息。虽然您可以存储生产者/消费者任务,而不是等待它们,但最好使用特定于频道的习惯用法和模式。

与使用通道作为某种容器不同,它只向使用者创建和拥有的通道公开和共享读取器。这就是围棋中使用频道的方式。

这就是为什么您也只能使用ChannelReader和ChannelWriter:

  • ChannelReader是Go中的ch ->,是从通道读取的唯一方法。
  • ChannelWriter是Go中的ch <-,是编写的唯一方法。

使用自有通道的

如果需要异步处理数据,请在生产者/使用者方法内的任务中执行此操作。这使得控制通道和知道什么时候处理完成或取消要容易得多。它还允许您很容易地从通道构建管道。

就你的情况而言,制片人可能是:

代码语言:javascript
复制
public ChannelReader<int> Producer(CancellationToken cancellationToken)
{
    var channel=Channel.CreateUnbounded<int>();
    var writer=channel.Writer;
    _ = Task.Run(()=>{
        for (int i = 0; i < 59; i++)
        {
            await Task.Delay(1000, cancellationToken);
            await writer.WriteAsync(i, cancellationToken);
        }
    },cancellationToken)
   .ContinueWith(t=>writer.TryComplete(t.Exception));

   return channel;
}

如果一个人懒惰,消费者可以:

代码语言:javascript
复制
static async Task ConsumeNumbers(this ChannelReader<int> reader, CancellationToken cancellationToken)
    {
        await foreach (var item in reader.ReadAllAsync(cancellationToken))
        {
            Console.WriteLine(item);
        }
    }

将其作为扩展方法可以与以下两种方法相结合:

代码语言:javascript
复制
await Producer(cancel)
     .ConsumeNumbers(cancel);

在更一般的情况下,管道块从通道读取并返回通道:

代码语言:javascript
复制
public ChannelReader<int> RaiseTo(this ChannelReader<int> reader, double pow,CancellationToken cancellationToken)
{
    var channel=Channel.CreateUnbounded<int>();
    var writer=channel.Writer;
    _ = Task.Run(async ()=>{
        await foreach (var item in reader.ReadAllAsync(cancellationToken))
        {
            var newItem=Math.Pow(item,pow);
            await writer.WriteAsync(newItem);
        }
    },cancellationToken)
   .ContinueWith(t=>writer.TryComplete(t.Exception));

   return channel;
}

这将允许创建一系列步骤(如:

代码语言:javascript
复制
await Producer(cancel)
      .RaiseTo(0.3,cancel)
      .RaiseTo(3,cancel)
      .ConsumeNumbers(cancel);

并行处理

每个块也可以使用多个任务,以加快处理速度。在.NET 6中,使用Parallel.ForEachAsync可以很容易地做到这一点:

代码语言:javascript
复制
public ChannelReader<int> RaiseTo(this ChannelReader<int> reader, double pow,CancellationToken cancellationToken)
{
    var channel=Channel.CreateUnbounded<int>();
    var writer=channel.Writer;

    _ = Parallel.ForEachAsync(
            reader.ReadAllAsync(cancellationToken),
            cancellationToken,
            async item=>
            {
                var newItem=Math.Pow(item,pow);
                await writer.WriteAsync(newItem);
            })
   .ContinueWith(t=>writer.TryComplete(t.Exception));

   return channel;
}

小心订单

通道保留项目顺序和读取请求。这意味着单个任务步骤总是按照顺序使用和生成消息。然而,Parallel.ForEachAsync并没有这样的保证。如果订单很重要,您必须添加代码以确保消息按顺序发出,或者尝试用另一步重新排序。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/70670368

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档