首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如果X分钟内没有新的项目进入通道,如何读取Channel<T>中小于批处理大小的剩余项?

如果X分钟内没有新的项目进入通道,如何读取Channel<T>中小于批处理大小的剩余项?
EN

Stack Overflow用户
提问于 2020-09-14 09:27:26
回答 1查看 1.6K关注 0票数 3

我正在使用来自ChannelSystem.Threading.Channels,并希望批量阅读项目(5项),我有如下方法,

代码语言:javascript
复制
public class Batcher
{
    private readonly Channel<MeasurementViewModel> _channel;
    public Batcher()
    {
        _channel = Channel.CreateUnbounded<MeasurementViewModel>();
    }
    public async Task<MeasurementViewModel[]> ReadBatchAsync(int batchSize, CancellationToken stoppingToken)
    {
        var result = new MeasurementViewModel[batchSize];

        for (var i = 0; i < batchSize; i++)
        {
            result[i] = await _channel.Reader.ReadAsync(stoppingToken);
        }

        return result;
    }
}

在ASP.NET核心后台服务中,我使用它的方式如下,

代码语言:javascript
复制
public class WriterService : BackgroundService
{
    private readonly Batcher _batcher;
    public WriterService(Batcher batcher)
    {
        _batcher = batcher;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            var batchOfItems = await _batcher.ReadBatchAsync(5, stoppingToken);

            var range = string.Join(',', batchOfItems.Select(item => item.Value));

            var x = range;
        }
    }
}

这是可行的,每当Channel中有5个条目时,我就会得到range

问题是,当Channel中只剩下2项时,由于最后10分钟没有项目进入Channel,那么如何读取Channel中的其余2项

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-09-14 12:26:23

您可以创建一个连系 CancellationTokenSource,以便可以同时查看外部取消请求和内部诱导的超时。下面是一个使用此技术的示例,方法是为ReadBatchAsync类创建一个ChannelReader扩展方法:

代码语言:javascript
复制
public static async ValueTask<T[]> ReadBatchAsync<T>(
    this ChannelReader<T> channelReader,
    int batchSize, TimeSpan timeout, CancellationToken cancellationToken = default)
{
    // Arguments validation omitted
    var items = new List<T>(batchSize);
    using (var linkedCTS
        = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken))
    {
        linkedCTS.CancelAfter(timeout);
        while (true)
        {
            var token = items.Count == 0 ? cancellationToken : linkedCTS.Token;
            T item;
            try
            {
                item = await channelReader.ReadAsync(token).ConfigureAwait(false);
            }
            catch (OperationCanceledException)
            {
                cancellationToken.ThrowIfCancellationRequested();
                break; // The cancellation was induced by timeout (ignore it)
            }
            catch (ChannelClosedException)
            {
                if (items.Count == 0) throw;
                break;
            }
            items.Add(item);
            if (items.Count >= batchSize) break;
        }
    }
    return items.ToArray();
}

此方法将在指定的timeout结束后立即生成批处理,如果已到达batchSize,则在此之前生成批处理,条件是该批处理至少包含一个项。否则,一旦收到第一个项目,它就会产生一个单项批。

如果通道已经通过调用channel.Writer.Complete()方法完成,并且它不包含更多的项,则ReadBatchAsync方法传播由本机ReadAsync方法引发的相同的ChannelClosedException

如果取消了外部CancellationToken,则通过抛出OperationCanceledException来传播取消。此时可能已经从ChannelReader<T>内部提取的任何项都会丢失。这使得取消功能成为破坏性的操作。建议在此之后丢弃整个Channel<T>

用法示例:

代码语言:javascript
复制
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
    while (true)
    {
        MeasurementViewModel[] batch;
        try
        {
            batch = await _channel.Reader.ReadBatchAsync(
                5, TimeSpan.FromMinutes(10), stoppingToken);
        }
        catch (OperationCanceledException) { return; }
        catch (ChannelClosedException) { break; }

        Console.WriteLine(String.Join(',', batch.Select(item => item.Value)));
    }
    await _channel.Reader.Completion; // Propagate possible failure
}

警告:当前(.NET 6)在与这个问题和这个答案相关的条件下,ChannelReader.ReadAsync方法容易发生内存泄漏。特别是,当CancellationToken被取消时,关联的AsyncOperation仍然附加在通道的内部数据结构中,并且直到在通道中写入项时才会释放。因此,在循环中反复等待基于计时器的ChannelReader<T>.ReadAsync是不可取的。参考文献:

对于另一种方法,您可以查看这里

票数 6
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/63881607

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档