首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何加快庞大的BlockingCollection实现

如何加快庞大的BlockingCollection实现
EN

Stack Overflow用户
提问于 2019-07-06 03:42:45
回答 1查看 357关注 0票数 0

我多次使用BlockingCollection来实现生产者/消费者模式,但是由于相关的开销,我在极细粒度的数据方面的性能很差。这通常迫使我通过分组/分区数据来即兴发挥作用,换句话说,使用BlockingCollection<T[]>而不是BlockingCollection<T>。这是一个resent example。这是可行的,但它是丑陋和容易出错。最后,我在生产者和使用者处使用嵌套循环,我必须记住在生产者的工作负载结束时还剩下什么。因此,我有了实现一个块状BlockingCollection的想法,它将在内部处理所有这些复杂的问题,并将与现有BlockingCollection的相同简单接口具体化。我的问题是,我还没有设法达到复杂手动分区的性能。对于极细粒度的数据(基本上只是整数值),我最好的尝试仍然支付大约+100%的性能税。因此,我想在此介绍我迄今所做的工作,希望能得到一些建议,帮助我缩小业绩差距。

我最好的尝试是使用ThreadLocal>,这样每个线程都可以在一个专用块上工作,从而消除了对锁的任何需求。

代码语言:javascript
复制
public class ChunkyBlockingCollection1<T>
{
    private readonly BlockingCollection<T[]> _blockingCollection;
    public readonly int _chunkSize;
    private readonly ThreadLocal<List<T>> _chunk;

    public ChunkyBlockingCollection1(int chunkSize)
    {
        _blockingCollection = new BlockingCollection<T[]>();
        _chunkSize = chunkSize;
        _chunk = new ThreadLocal<List<T>>(() => new List<T>(chunkSize), true);
    }

    public void Add(T item)
    {
        var chunk = _chunk.Value;
        chunk.Add(item);
        if (chunk.Count >= _chunkSize)
        {
            _blockingCollection.Add(chunk.ToArray());
            chunk.Clear();
        }
    }

    public void CompleteAdding()
    {
        var chunks = _chunk.Values.ToArray();
        foreach (var chunk in chunks)
        {
            _blockingCollection.Add(chunk.ToArray());
            chunk.Clear();
        }
        _blockingCollection.CompleteAdding();
    }

    public IEnumerable<T> GetConsumingEnumerable()
    {
        foreach (var chunk in _blockingCollection.GetConsumingEnumerable())
        {
            for (int i = 0; i < chunk.Length; i++)
            {
                yield return chunk[i];
            }
        }
    }
}

我的第二个最佳尝试是使用单个List<T>作为块,所有线程都可以使用锁以线程安全的方式访问该块。令人惊讶的是,这只是稍微慢于ThreadLocal<List<T>>解决方案。

代码语言:javascript
复制
public class ChunkyBlockingCollection2<T>
{
    private readonly BlockingCollection<T[]> _blockingCollection;
    public readonly int _chunkSize;
    private readonly List<T> _chunk;
    private readonly object _locker = new object();

    public ChunkyBlockingCollection2(int chunkSize)
    {
        _blockingCollection = new BlockingCollection<T[]>();
        _chunkSize = chunkSize;
        _chunk = new List<T>(chunkSize);
    }

    public void Add(T item)
    {
        lock (_locker)
        {
            _chunk.Add(item);
            if (_chunk.Count >= _chunkSize)
            {
                _blockingCollection.Add(_chunk.ToArray());
                _chunk.Clear();
            }
        }
    }

    public void CompleteAdding()
    {
        lock (_locker)
        {
            _blockingCollection.Add(_chunk.ToArray());
            _chunk.Clear();
        }
        _blockingCollection.CompleteAdding();
    }

    public IEnumerable<T> GetConsumingEnumerable()
    {
        foreach (var chunk in _blockingCollection.GetConsumingEnumerable())
        {
            for (int i = 0; i < chunk.Length; i++)
            {
                yield return chunk[i];
            }
        }
    }
}

我还尝试将ConcurrentBag<T>作为块使用,这会导致性能差和正确性问题(因为我没有使用锁)。另一个尝试是将lock (_locker)替换为SpinLock,性能甚至更差。锁定显然是我的问题的根源,因为如果我完全删除它,那么我的类就会获得最佳的性能。当然,对于不止一个生产者来说,拆除锁是很不幸的。

EN

回答 1

Stack Overflow用户

发布于 2019-07-09 13:03:23

您可以尝试使用用于_chunk的数组,而不是使用List<T>。然后,您可以使用Interlocked.Increment增加Add上填充的下一个索引,当计数超过块大小时,将其全部移动到阻塞集合中,当然,还可以在锁中重置索引。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/56911067

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档