我多次使用BlockingCollection来实现生产者/消费者模式,但是由于相关的开销,我在极细粒度的数据方面的性能很差。这通常迫使我通过分组/分区数据来即兴发挥作用,换句话说,使用BlockingCollection<T[]>而不是BlockingCollection<T>。这是一个resent example。这是可行的,但它是丑陋和容易出错。最后,我在生产者和使用者处使用嵌套循环,我必须记住在生产者的工作负载结束时还剩下什么。因此,我有了实现一个块状BlockingCollection的想法,它将在内部处理所有这些复杂的问题,并将与现有BlockingCollection的相同简单接口具体化。我的问题是,我还没有设法达到复杂手动分区的性能。对于极细粒度的数据(基本上只是整数值),我最好的尝试仍然支付大约+100%的性能税。因此,我想在此介绍我迄今所做的工作,希望能得到一些建议,帮助我缩小业绩差距。
我最好的尝试是使用ThreadLocal>,这样每个线程都可以在一个专用块上工作,从而消除了对锁的任何需求。
public class ChunkyBlockingCollection1<T>
{
private readonly BlockingCollection<T[]> _blockingCollection;
public readonly int _chunkSize;
private readonly ThreadLocal<List<T>> _chunk;
public ChunkyBlockingCollection1(int chunkSize)
{
_blockingCollection = new BlockingCollection<T[]>();
_chunkSize = chunkSize;
_chunk = new ThreadLocal<List<T>>(() => new List<T>(chunkSize), true);
}
public void Add(T item)
{
var chunk = _chunk.Value;
chunk.Add(item);
if (chunk.Count >= _chunkSize)
{
_blockingCollection.Add(chunk.ToArray());
chunk.Clear();
}
}
public void CompleteAdding()
{
var chunks = _chunk.Values.ToArray();
foreach (var chunk in chunks)
{
_blockingCollection.Add(chunk.ToArray());
chunk.Clear();
}
_blockingCollection.CompleteAdding();
}
public IEnumerable<T> GetConsumingEnumerable()
{
foreach (var chunk in _blockingCollection.GetConsumingEnumerable())
{
for (int i = 0; i < chunk.Length; i++)
{
yield return chunk[i];
}
}
}
}我的第二个最佳尝试是使用单个List<T>作为块,所有线程都可以使用锁以线程安全的方式访问该块。令人惊讶的是,这只是稍微慢于ThreadLocal<List<T>>解决方案。
public class ChunkyBlockingCollection2<T>
{
private readonly BlockingCollection<T[]> _blockingCollection;
public readonly int _chunkSize;
private readonly List<T> _chunk;
private readonly object _locker = new object();
public ChunkyBlockingCollection2(int chunkSize)
{
_blockingCollection = new BlockingCollection<T[]>();
_chunkSize = chunkSize;
_chunk = new List<T>(chunkSize);
}
public void Add(T item)
{
lock (_locker)
{
_chunk.Add(item);
if (_chunk.Count >= _chunkSize)
{
_blockingCollection.Add(_chunk.ToArray());
_chunk.Clear();
}
}
}
public void CompleteAdding()
{
lock (_locker)
{
_blockingCollection.Add(_chunk.ToArray());
_chunk.Clear();
}
_blockingCollection.CompleteAdding();
}
public IEnumerable<T> GetConsumingEnumerable()
{
foreach (var chunk in _blockingCollection.GetConsumingEnumerable())
{
for (int i = 0; i < chunk.Length; i++)
{
yield return chunk[i];
}
}
}
}我还尝试将ConcurrentBag<T>作为块使用,这会导致性能差和正确性问题(因为我没有使用锁)。另一个尝试是将lock (_locker)替换为SpinLock,性能甚至更差。锁定显然是我的问题的根源,因为如果我完全删除它,那么我的类就会获得最佳的性能。当然,对于不止一个生产者来说,拆除锁是很不幸的。
发布于 2019-07-09 13:03:23
您可以尝试使用用于_chunk的数组,而不是使用List<T>。然后,您可以使用Interlocked.Increment增加Add上填充的下一个索引,当计数超过块大小时,将其全部移动到阻塞集合中,当然,还可以在锁中重置索引。
https://stackoverflow.com/questions/56911067
复制相似问题