我试图用多个生产者和一个消费者实现一个相当简单的生产者/消费者风格的应用程序。
研究让我进入了BlockingCollection<T>,它很有用,并且允许我实现一个长期运行的消费者任务,如下所示:
var c1 = Task.Factory.StartNew(() =>
{
var buffer = new List<int>(BATCH_BUFFER_SIZE);
foreach (var value in blockingCollection.GetConsumingEnumerable())
{
buffer.Add(value);
if (buffer.Count == BATCH_BUFFER_SIZE)
{
ProcessItems(buffer);
buffer.Clear();
}
}
});ProcessItems函数将缓冲区提交给数据库,并且它确实是批量工作的。然而,这个解是次优的。在男爵生产期间,可能需要一段时间才能填充缓冲区,这意味着数据库已过时。
一个更理想的解决方案是要么在30秒计时器上运行任务,要么用超时来短路foreach。
我拿出计时器的主意,想出了这样的点子:
syncTimer = new Timer(new TimerCallback(TimerElapsed), blockingCollection, 5000, 5000);
private static void TimerElapsed(object state)
{
var buffer = new List<int>();
var collection = ((BlockingCollection<int>)state).GetConsumingEnumerable();
foreach (var value in collection)
{
buffer.Add(value);
}
ProcessItems(buffer);
buffer.Clear();
}这有一个明显的问题,就是foreach会一直阻塞到结束,从而破坏计时器的用途。
有人能给出方向吗?本质上,我需要定期对BlockingCollection进行快照,并处理清除它的内容。也许BlockingCollection是错误的类型?
发布于 2014-06-27 09:11:39
不要在计时器回调中使用GetConsumingEnumerable,而是使用以下方法之一,将结果添加到列表中,直到返回false或达到满意的批处理大小为止。
BlockingCollection.TryTake方法(T) --也许你需要什么,你根本不想做进一步的等待。
BlockingCollection.TryTake方法(T,Int32)
BlockingCollection.TryTake方法(T,TimeSpan)
您可以轻松地将其提取到扩展中(未经测试):
public static IList<T> Flush<T>
(this BlockingCollection<T> collection, int maxSize = int.MaxValue)
{
// Argument checking.
T next;
var result = new List<T>();
while(result.Count < maxSize && collection.TryTake(out next))
{
result.Add(next);
}
return result;
}https://stackoverflow.com/questions/24447985
复制相似问题