我是System.Threading.Channels的新手。我有以下的消费者代码:
await foreach (var thing in this.Reader.ReadAllAsync(cancellationToken)
.ConfigureAwait(false))
{
await this.HandleThingAsync(thing, cancellationToken).ConfigureAwait(false);
}当消费由这样一个生产者生产的东西时,这种方法似乎很好:
var things = await this.GetThingsAsync(cancellationToken).ConfigureAwait(false);
await foreach (var thing in things.WithCancellation(cancellationToken)
.ConfigureAwait(false))
{
await this.Writer.WriteAsync(thing, cancellationToken).ConfigureAwait(false);
}
this.Writer.Complete();但是,当我尝试添加同一个通用形式的第二个生产者时,一旦两个生产者中的一个完成(并调用this.Writer.Complete()),其他生产者仍然需要添加的任何内容都将被拒绝,因为通道已经关闭。这是个问题,因为我想让读者阅读所有的东西,而不仅仅是所有的东西,直到任何一个制片人都没有更多的作品。
如何处理这种情况?有没有什么内置的或者其他的“标准”方式?例如,可能是一个“凝汽器”通道,它公开了多个Channel.Writer对象(每个“真实”生产者一个)和一个Channel.Reader (用于单个“真实”消费者)?
发布于 2022-01-29 02:11:37
我不认为有什么办法你可以称之为“标准”。Channel<T>是一种可以以许多不同的方式使用的工具,很像Task或SemaphoreSlim。在这种情况下,可以使用如下计数器传播所有生产者的完成:
int producersCount = X;
//...
await foreach (var thing in things)
await channel.Writer.WriteAsync(thing);
if (Interlocked.Decrement(ref producersCount) == 0) channel.Writer.Complete();或者,如果每个生产者都是一个Task,则可以将一个延续附加到所有这些任务的组合中,如下所示:
var producers = new List<Task>();
//...
_ = Task.WhenAll(producers).ContinueWith(_ => channel.Writer.Complete(),
default, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);上面的丢弃(_)被用来传达ContinueWith延续是以一种失忆的方式启动的。如果您不喜欢像我这样随心所欲地抛出未观察到的任务,您可以在这样的async void方法中处理生产者的完成:
var producers = new List<Task>();
//...
HandleProducersCompletion();
//...
async void HandleProducersCompletion()
{
try { await Task.WhenAll(producers); }
finally { channel.Writer.Complete(); }
}这样,channel.Writer.Complete();调用引发的异常将被不处理,并将导致进程崩溃。这可以说是一件好事,考虑到另一种选择--这是一个没有明显理由而陷入僵局的过程。
发布于 2022-01-29 05:54:36
最后,我根据我在最初的问题中提到的“频道冷凝器”的想法,上了这门课。它可能是可怕的,也可能不是充满错误的,但至少到目前为止,它似乎以一种对我来说相当自然和不显眼的方式来完成这项工作:
using Nito.AsyncEx;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
namespace Rwv37.System.Threading.Channels
{
public class ChannelCondenser<T>
{
private bool IsGoing { get; set; }
private AsyncLock IsGoingLock { get; init; }
private ConcurrentBag<Channel<T>> IncomingChannel { get; init; }
private Channel<T> OutgoingChannel { get; init; }
public ChannelCondenser()
{
this.IsGoingLock = new AsyncLock();
this.IncomingChannel = new();
this.OutgoingChannel = Channel.CreateUnbounded<T>();
}
public async Task GoAsync(CancellationToken cancellationToken = default)
{
using (await this.IsGoingLock.LockAsync(cancellationToken).ConfigureAwait(false))
{
if (this.IsGoing)
{
throw new System.InvalidOperationException("Cannot go - already going!");
}
this.IsGoing = true;
}
List<Task> tasks = new();
foreach (var incomingChannel in this.IncomingChannel)
{
tasks.Add(this.HandleIncomingChannelAsync(incomingChannel, cancellationToken));
}
await Task.WhenAll(tasks).ConfigureAwait(false);
this.OutgoingChannel.Writer.Complete();
}
public ChannelWriter<T> AddIncomingChannel()
{
using (this.IsGoingLock.Lock())
{
if (this.IsGoing)
{
throw new System.InvalidOperationException("New incoming channels cannot be added while going!");
}
}
Channel<T> incomingChannel = Channel.CreateUnbounded<T>();
this.IncomingChannel.Add(incomingChannel);
return incomingChannel.Writer;
}
public ChannelReader<T> GetOutgoingChannel()
{
return this.OutgoingChannel.Reader;
}
private async Task HandleIncomingChannelAsync(Channel<T> incomingChannel, CancellationToken cancellationToken)
{
await foreach (var item in incomingChannel.Reader.ReadAllAsync(cancellationToken).ConfigureAwait(false))
{
await this.OutgoingChannel.Writer.WriteAsync(item, cancellationToken).ConfigureAwait(false);
}
}
}
}消费者和生产者内部的使用与我原来的问题中所显示的完全相同。
我唯一需要改变的是使用它们的类是如何构造的。消费者结构从..。
private Channel<Thing> WantedThingsChannel { get; init; }
(...)
this.WantedThingsChannel = Channel.CreateUnbounded<Thing>();
this.WantedThingsHandler = new(this.WantedThingsChannel.Reader);..。为了..。
private ChannelCondenser<Thing> WantedThingsCondenser { get; init; }
(...)
this.WantedThingsCondenser = new();
this.WantedThingsHandler = new(this.WantedThingsCondenser.GetOutgoingChannel());同样的,制片人的结构也从.
this.WantedThingsRetriever = new(this.WantedThingsChannel.Writer);..。为了..。
this.WantedThingsRetriever = new(this.WantedThingsCondenser.AddIncomingChannel());哦,不,等等,我撒谎了。它们之外的另一个变化是:我的程序的主Task.WhenAll被更改了,因此它还在ChannelCondenser上等待。所以,从..。
List<Task> tasks = new()
{
this.WantedThingsHandler.GoAsync(cancellationToken),
this.WantedThingsRetriever.GoAsync(cancellationToken),
};..。为了..。
List<Task> tasks = new()
{
this.WantedThingsCondenser.GoAsync(cancellationToken),
this.WantedThingsHandler.GoAsync(cancellationToken),
this.WantedThingsRetriever.GoAsync(cancellationToken),
};https://stackoverflow.com/questions/70900668
复制相似问题