我尝试通过将ConcurrentDictionary包装在BlockingCollection中来实现它,但似乎没有成功。
我知道一个变量声明适用于BlockingCollection,如ConcurrentBag<T>、ConcurrentQueue<T>等。
因此,要创建包装在BlockingCollection中的ConcurrentBag,我将像这样声明和实例化:
BlockingCollection<int> bag = new BlockingCollection<int>(new ConcurrentBag<int>());但是如何为ConcurrentDictionary做到这一点呢?我需要生产者和消费者端的BlockingCollection的阻塞功能。
发布于 2013-01-30 22:48:14
也许你需要一个并发的blockingCollection字典
ConcurrentDictionary<int, BlockingCollection<string>> mailBoxes = new ConcurrentDictionary<int, BlockingCollection<string>>();
int maxBoxes = 5;
CancellationTokenSource cancelationTokenSource = new CancellationTokenSource();
CancellationToken cancelationToken = cancelationTokenSource.Token;
Random rnd = new Random();
// Producer
Task.Factory.StartNew(() =>
{
while (true)
{
int index = rnd.Next(0, maxBoxes);
// put the letter in the mailbox 'index'
var box = mailBoxes.GetOrAdd(index, new BlockingCollection<string>());
box.Add("some message " + index, cancelationToken);
Console.WriteLine("Produced a letter to put in box " + index);
// Wait simulating a heavy production item.
Thread.Sleep(1000);
}
});
// Consumer 1
Task.Factory.StartNew(() =>
{
while (true)
{
int index = rnd.Next(0, maxBoxes);
// get the letter in the mailbox 'index'
var box = mailBoxes.GetOrAdd(index, new BlockingCollection<string>());
var message = box.Take(cancelationToken);
Console.WriteLine("Consumed 1: " + message);
// consume a item cost less than produce it:
Thread.Sleep(50);
}
});
// Consumer 2
Task.Factory.StartNew(() =>
{
while (true)
{
int index = rnd.Next(0, maxBoxes);
// get the letter in the mailbox 'index'
var box = mailBoxes.GetOrAdd(index, new BlockingCollection<string>());
var message = box.Take(cancelationToken);
Console.WriteLine("Consumed 2: " + message);
// consume a item cost less than produce it:
Thread.Sleep(50);
}
});
Console.ReadLine();
cancelationTokenSource.Cancel();通过这种方式,期望在邮箱5中有东西的消费者将等待,直到生产者将字母放入邮箱5。
发布于 2012-05-24 21:19:06
您需要编写自己的适配器类-类似于:
public class ConcurrentDictionaryWrapper<TKey,TValue> : IProducerConsumerCollection<KeyValuePair<TKey,TValue>>
{
private ConcurrentDictionary<TKey, TValue> dictionary;
public IEnumerator<KeyValuePair<TKey, TValue>> GetEnumerator()
{
return dictionary.GetEnumerator();
}
IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}
public void CopyTo(Array array, int index)
{
throw new NotImplementedException();
}
public int Count
{
get { return dictionary.Count; }
}
public object SyncRoot
{
get { return this; }
}
public bool IsSynchronized
{
get { return true; }
}
public void CopyTo(KeyValuePair<TKey, TValue>[] array, int index)
{
throw new NotImplementedException();
}
public bool TryAdd(KeyValuePair<TKey, TValue> item)
{
return dictionary.TryAdd(item.Key, item.Value);
}
public bool TryTake(out KeyValuePair<TKey, TValue> item)
{
item = dictionary.FirstOrDefault();
TValue value;
return dictionary.TryRemove(item.Key, out value);
}
public KeyValuePair<TKey, TValue>[] ToArray()
{
throw new NotImplementedException();
}
}https://stackoverflow.com/questions/10736209
复制相似问题