由于某种原因,它似乎在使用者或生产者任务中执行代码。我哪里出问题了?
using System.Threading.Channels;
namespace TEST.CHANNELS
{
public class Program
{
public static async Task Main(string[] args)
{
var channel = Channel.CreateUnbounded<int>();
var cancel = new CancellationToken();
await Consumer(channel, cancel);
await Producer(channel, cancel);
Console.ReadKey();
}
private static async Task Producer(Channel<int, int> ch, CancellationToken cancellationToken)
{
for (int i = 0; i < 59; i++)
{
await Task.Delay(1000, cancellationToken);
await ch.Writer.WriteAsync(i, cancellationToken);
}
}
private static async Task Consumer(Channel<int, int> ch, CancellationToken cancellationToken)
{
await foreach (var item in ch.Reader.ReadAllAsync(cancellationToken))
{
Console.WriteLine(item);
}
}
}
}发布于 2022-01-11 16:50:03
如果您是新手,我建议您阅读教程:学习如何使用Visual调试C#代码。您应该知道如何放置断点来看到代码一步一步地运行。
但是,由于这一项涉及异步/任务,它可能看起来很混乱,但是当您进入Consumer时,您将看到它在await foreach (var item in ch.Reader.ReadAllAsync(cancellationToken))行停止。
原因是消费者在等待生产者从来不投入的东西。原因是您的第一个await停止了您的代码,所以第2行永远不会被执行。
await Consumer(channel, cancel);
await Producer(channel, cancel);这应能解决以下问题:
var consumerTask = Consumer(channel, cancel);
var producerTask = Producer(channel, cancel);
await Task.WhenAll(consumerTask, producerTask);上面的代码是,
consumerTask中跟踪它。producerTask中跟踪它。consumerTask和producerTask都使用了Task.WhenAll。请注意,消费者似乎仍然有一个逻辑问题,因为它永远不会退出,所以您的ReadKey()可能不会被击中(您的应用程序会卡在WhenAll线上)。我认为,如果您打算修复它,如果它是一个bug,对您来说“练习”要容易得多。
发布于 2022-01-11 17:08:43
在生成任何消息之前,您的代码正在尝试使用通道中的所有消息。虽然您可以存储生产者/消费者任务,而不是等待它们,但最好使用特定于频道的习惯用法和模式。
与使用通道作为某种容器不同,它只向使用者创建和拥有的通道公开和共享读取器。这就是围棋中使用频道的方式。
这就是为什么您也只能使用ChannelReader和ChannelWriter:
ch ->,是从通道读取的唯一方法。ch <-,是编写的唯一方法。使用自有通道的
如果需要异步处理数据,请在生产者/使用者方法内的任务中执行此操作。这使得控制通道和知道什么时候处理完成或取消要容易得多。它还允许您很容易地从通道构建管道。
就你的情况而言,制片人可能是:
public ChannelReader<int> Producer(CancellationToken cancellationToken)
{
var channel=Channel.CreateUnbounded<int>();
var writer=channel.Writer;
_ = Task.Run(()=>{
for (int i = 0; i < 59; i++)
{
await Task.Delay(1000, cancellationToken);
await writer.WriteAsync(i, cancellationToken);
}
},cancellationToken)
.ContinueWith(t=>writer.TryComplete(t.Exception));
return channel;
}如果一个人懒惰,消费者可以:
static async Task ConsumeNumbers(this ChannelReader<int> reader, CancellationToken cancellationToken)
{
await foreach (var item in reader.ReadAllAsync(cancellationToken))
{
Console.WriteLine(item);
}
}将其作为扩展方法可以与以下两种方法相结合:
await Producer(cancel)
.ConsumeNumbers(cancel);在更一般的情况下,管道块从通道读取并返回通道:
public ChannelReader<int> RaiseTo(this ChannelReader<int> reader, double pow,CancellationToken cancellationToken)
{
var channel=Channel.CreateUnbounded<int>();
var writer=channel.Writer;
_ = Task.Run(async ()=>{
await foreach (var item in reader.ReadAllAsync(cancellationToken))
{
var newItem=Math.Pow(item,pow);
await writer.WriteAsync(newItem);
}
},cancellationToken)
.ContinueWith(t=>writer.TryComplete(t.Exception));
return channel;
}这将允许创建一系列步骤(如:
await Producer(cancel)
.RaiseTo(0.3,cancel)
.RaiseTo(3,cancel)
.ConsumeNumbers(cancel);并行处理
每个块也可以使用多个任务,以加快处理速度。在.NET 6中,使用Parallel.ForEachAsync可以很容易地做到这一点:
public ChannelReader<int> RaiseTo(this ChannelReader<int> reader, double pow,CancellationToken cancellationToken)
{
var channel=Channel.CreateUnbounded<int>();
var writer=channel.Writer;
_ = Parallel.ForEachAsync(
reader.ReadAllAsync(cancellationToken),
cancellationToken,
async item=>
{
var newItem=Math.Pow(item,pow);
await writer.WriteAsync(newItem);
})
.ContinueWith(t=>writer.TryComplete(t.Exception));
return channel;
}小心订单
通道保留项目顺序和读取请求。这意味着单个任务步骤总是按照顺序使用和生成消息。然而,Parallel.ForEachAsync并没有这样的保证。如果订单很重要,您必须添加代码以确保消息按顺序发出,或者尝试用另一步重新排序。
https://stackoverflow.com/questions/70670368
复制相似问题