首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >当任何生产者可以在任何时候关闭整个频道时,渠道使用者如何从多个频道生产者那里获得所有信息?

当任何生产者可以在任何时候关闭整个频道时,渠道使用者如何从多个频道生产者那里获得所有信息?
EN

Stack Overflow用户
提问于 2022-01-28 21:49:47
回答 2查看 441关注 0票数 1

我是System.Threading.Channels的新手。我有以下的消费者代码:

代码语言:javascript
复制
await foreach (var thing in this.Reader.ReadAllAsync(cancellationToken)
    .ConfigureAwait(false))
{
    await this.HandleThingAsync(thing, cancellationToken).ConfigureAwait(false);
}

当消费由这样一个生产者生产的东西时,这种方法似乎很好:

代码语言:javascript
复制
var things = await this.GetThingsAsync(cancellationToken).ConfigureAwait(false);
await foreach (var thing in things.WithCancellation(cancellationToken)
    .ConfigureAwait(false))
{
    await this.Writer.WriteAsync(thing, cancellationToken).ConfigureAwait(false);
}

this.Writer.Complete();

但是,当我尝试添加同一个通用形式的第二个生产者时,一旦两个生产者中的一个完成(并调用this.Writer.Complete()),其他生产者仍然需要添加的任何内容都将被拒绝,因为通道已经关闭。这是个问题,因为我想让读者阅读所有的东西,而不仅仅是所有的东西,直到任何一个制片人都没有更多的作品。

如何处理这种情况?有没有什么内置的或者其他的“标准”方式?例如,可能是一个“凝汽器”通道,它公开了多个Channel.Writer对象(每个“真实”生产者一个)和一个Channel.Reader (用于单个“真实”消费者)?

EN

回答 2

Stack Overflow用户

发布于 2022-01-29 02:11:37

我不认为有什么办法你可以称之为“标准”。Channel<T>是一种可以以许多不同的方式使用的工具,很像TaskSemaphoreSlim。在这种情况下,可以使用如下计数器传播所有生产者的完成:

代码语言:javascript
复制
int producersCount = X;
//...
await foreach (var thing in things)
    await channel.Writer.WriteAsync(thing);
if (Interlocked.Decrement(ref producersCount) == 0) channel.Writer.Complete();

或者,如果每个生产者都是一个Task,则可以将一个延续附加到所有这些任务的组合中,如下所示:

代码语言:javascript
复制
var producers = new List<Task>();
//...
_ = Task.WhenAll(producers).ContinueWith(_ => channel.Writer.Complete(),
    default, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);

上面的丢弃(_)被用来传达ContinueWith延续是以一种失忆的方式启动的。如果您不喜欢像我这样随心所欲地抛出未观察到的任务,您可以在这样的async void方法中处理生产者的完成:

代码语言:javascript
复制
var producers = new List<Task>();
//...
HandleProducersCompletion();
//...
async void HandleProducersCompletion()
{
    try { await Task.WhenAll(producers); }
    finally { channel.Writer.Complete(); }
}

这样,channel.Writer.Complete();调用引发的异常将被不处理,并将导致进程崩溃。这可以说是一件好事,考虑到另一种选择--这是一个没有明显理由而陷入僵局的过程。

票数 3
EN

Stack Overflow用户

发布于 2022-01-29 05:54:36

最后,我根据我在最初的问题中提到的“频道冷凝器”的想法,上了这门课。它可能是可怕的,也可能不是充满错误的,但至少到目前为止,它似乎以一种对我来说相当自然和不显眼的方式来完成这项工作:

代码语言:javascript
复制
using Nito.AsyncEx;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;

namespace Rwv37.System.Threading.Channels
{
    public class ChannelCondenser<T>
    {
        private bool IsGoing { get; set; }
        private AsyncLock IsGoingLock { get; init; }
        private ConcurrentBag<Channel<T>> IncomingChannel { get; init; }
        private Channel<T> OutgoingChannel { get; init; }

        public ChannelCondenser()
        {
            this.IsGoingLock = new AsyncLock();
            this.IncomingChannel = new();
            this.OutgoingChannel = Channel.CreateUnbounded<T>();
        }

        public async Task GoAsync(CancellationToken cancellationToken = default)
        {
            using (await this.IsGoingLock.LockAsync(cancellationToken).ConfigureAwait(false))
            {
                if (this.IsGoing)
                {
                    throw new System.InvalidOperationException("Cannot go - already going!");
                }

                this.IsGoing = true;
            }

            List<Task> tasks = new();
            foreach (var incomingChannel in this.IncomingChannel)
            {
                tasks.Add(this.HandleIncomingChannelAsync(incomingChannel, cancellationToken));
            }

            await Task.WhenAll(tasks).ConfigureAwait(false);

            this.OutgoingChannel.Writer.Complete();
        }

        public ChannelWriter<T> AddIncomingChannel()
        {
            using (this.IsGoingLock.Lock())
            {
                if (this.IsGoing)
                {
                    throw new System.InvalidOperationException("New incoming channels cannot be added while going!");
                }
            }

            Channel<T> incomingChannel = Channel.CreateUnbounded<T>();
            this.IncomingChannel.Add(incomingChannel);

            return incomingChannel.Writer;
        }

        public ChannelReader<T> GetOutgoingChannel()
        {
            return this.OutgoingChannel.Reader;
        }

        private async Task HandleIncomingChannelAsync(Channel<T> incomingChannel, CancellationToken cancellationToken)
        {
            await foreach (var item in incomingChannel.Reader.ReadAllAsync(cancellationToken).ConfigureAwait(false))
            {
                await this.OutgoingChannel.Writer.WriteAsync(item, cancellationToken).ConfigureAwait(false);
            }
        }
    }
}

消费者和生产者内部的使用与我原来的问题中所显示的完全相同。

我唯一需要改变的是使用它们的类是如何构造的。消费者结构从..。

代码语言:javascript
复制
private Channel<Thing> WantedThingsChannel { get; init; }

(...)

this.WantedThingsChannel = Channel.CreateUnbounded<Thing>();
this.WantedThingsHandler = new(this.WantedThingsChannel.Reader);

..。为了..。

代码语言:javascript
复制
private ChannelCondenser<Thing> WantedThingsCondenser { get; init; }

(...)

this.WantedThingsCondenser = new();
this.WantedThingsHandler = new(this.WantedThingsCondenser.GetOutgoingChannel());

同样的,制片人的结构也从.

代码语言:javascript
复制
this.WantedThingsRetriever = new(this.WantedThingsChannel.Writer);

..。为了..。

代码语言:javascript
复制
 this.WantedThingsRetriever = new(this.WantedThingsCondenser.AddIncomingChannel());

哦,不,等等,我撒谎了。它们之外的另一个变化是:我的程序的主Task.WhenAll被更改了,因此它还在ChannelCondenser上等待。所以,从..。

代码语言:javascript
复制
List<Task> tasks = new()
{
    this.WantedThingsHandler.GoAsync(cancellationToken),
    this.WantedThingsRetriever.GoAsync(cancellationToken),
};

..。为了..。

代码语言:javascript
复制
List<Task> tasks = new()
{
    this.WantedThingsCondenser.GoAsync(cancellationToken),
    this.WantedThingsHandler.GoAsync(cancellationToken),
    this.WantedThingsRetriever.GoAsync(cancellationToken),
};
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/70900668

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档