我有一个ConcurrentQueue,它用来自一个线程的对象填充,另一个线程从其中获取对象并处理它们。
如果队列变得很大,我可以通过删除重复项来“压缩”它。压缩把队列变成一个列表,遍历它,然后创建一个只有不同值的新队列。所以我替换了队列,由于我这样做了,所以我不能将对象插入到被覆盖的队列中,所以我将释放它们。
我的问题是,如果我添加一个LockHandle (Obj) {}或者某种锁,我会失去很多性能。有很多事务,但处理时间非常短,所以锁定看起来像是扼杀我的性能的原因。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using System.IO;
namespace A
{
public abstract class Base
{
private ConcurrentQueue<Data> unProcessed = new ConcurrentQueue<Data>();
private const int MIN_COLLAPSETIME = 30;
private const int MIN_COLLAPSECOUNT = 1000;
private QueueCollapser Collapser;
private ManualResetEventSlim waitForCollapsing = new ManualResetEventSlim(true);
private ManualResetEventSlim waitForWrite = new ManualResetEventSlim();
// Thread signal.
public AutoResetEvent unProcessedEvent = new AutoResetEvent(false);
// exiting
public volatile bool Exiting = false;
private Task task;
public BasePusher()
{
// initiate Collapser
Collapser = new QueueCollapser();
// set up thread
task = new Task(
() =>
{
consumerTask();
}, TaskCreationOptions.LongRunning
);
}
public void Start()
{
task.Start();
}
private void consumerTask()
{
Data data = null;
while (!Exiting)
{
try
{
// do we try to collapse
if (unProcessed.Count > MIN_COLLAPSECOUNT && (DateTime.Now - Collapser.LastCollapse).TotalSeconds > MIN_COLLAPSETIME)
{
waitForCollapsing.Reset();
waitForWrite.Wait();
unProcessed = Collapser.Collapse(unProcessed);
waitForCollapsing.Set();
// tried this aswell instead of using my own locking, this is like Monitor.Enter
lock(this) {
unProcessed = Collapser.Collapse(unProcessed);
}
}
if (sum == 0)
{
// we wake the thread after 20 seconds, if nothing is in queue it will just go back and wait
unProcessedEvent.WaitOne(20000);
}
var gotOne = unProcessed.TryDequeue(out data);
if (gotOne)
{
ProcessTime(data);
}
}
}
catch (Exception e)
{
}
}
}
protected abstract void ProcessTime(Data d);
public void AddToQueue(Data d)
{
waitForCollapsing.Wait();
waitForWrite.Reset();
unProcessed.Enqueue(d);
waitForWrite.Set();
unProcessedEvent.Set();
}
// tried this aswell instead of using my own locking, this is like Monitor.Enter
public void AddToQueueAlternate(Data d)
{
lock(this) {
unProcessed.Enqueue(d);
waitForWrite.Set();
unProcessedEvent.Set();
}
}
}
}这可以在没有锁定的情况下完成吗?我可以使用更轻量级的锁吗?到目前为止,只有一个添加数据的线程和一个读取数据的线程。如果我能得到更好的锁,我可以保持这种状态。
发布于 2014-05-17 19:13:55
如果您想要并发且没有重复,则应使用ConcurrentDictionary
所以重新声明你的队列:
private ConcurrentDictionary<Data, Data> unProcessed =
new ConcurrentDictionary<Data, Data>();这将大大简化您的代码,同时保持非常好的性能。
发布于 2014-05-17 19:05:19
为什么你会有副本呢?
如果发布者真的可以添加重复项,那么您需要某种类型的并发散列对象(Dictionary或HashSet)来检测并防止这种情况在发布者中发生。
您可能还想研究一下ReaderWriterLockSlim。
https://stackoverflow.com/questions/23710400
复制相似问题