我正在读的一本书是“多处理器编程的艺术”,作者是Maurice Herlihy和Nir Shavit。在它中,有一个“无等待”队列,它(经过一点语言调整)在线程环境中完美地执行测试和逻辑功能,至少在分布在5个线程上的10,000,000个项目上也没有冲突,并且逻辑检查通过。
(我编辑了队列,使其在无法获取商品时返回false,而不是抛出异常。代码如下所示)。
但是,它有一个警告:队列不能增长。粗略的逻辑检查它不能在不锁定队列的情况下增长-这在某种程度上否定了拥有无锁队列的意义。
这样做的目的是创建一个可以增长的无锁队列(或者至少是无饥饿锁队列)。
因此:如果我们本质上给每个线程自己的共享队列,以一种不矛盾的方式(并接受这个问题已经解决的可能性很高,并且为了边做边学的目的更好):
WaitFreeQueue<Queue<int>> queues = new WaitFreeQueue<Queue<int>>(threadCount);
// Dequeue a queue, enqueue an item, enqueue the queue.
// Dequeue a queue, dequeue an item, enqueue the queue.和等待自由队列(以前的代码包含在注释中,以防我做出任何破坏性的更改):
/// <summary>
/// A wait-free queue for use in threaded environments.
/// Significantly adapted from "The Art of Multiprocessor Programming by Maurice Herlihy and Nir Shavit".
/// </summary>
/// <typeparam name="T">The type of item in the queue.</typeparam>
public class WaitFreeQueue<T>
{
/// <summary>
/// The index to dequeue from.
/// </summary>
protected int head;
/// <summary>
/// The index to queue to.
/// </summary>
protected int tail;
/// <summary>
/// The array to queue in.
/// </summary>
protected T[] items;
/// <summary>
/// The number of items queued.
/// </summary>
public int Count
{
get { return tail - head; }
}
/// <summary>
/// Creates a new wait-free queue.
/// </summary>
/// <param name="capacity">The capacity of the queue.</param>
public WaitFreeQueue(int capacity)
{
items = new T[capacity];
head = 0; tail = 0;
}
/// <summary>
/// Attempts to enqueue an item.
/// </summary>
/// <param name="value">The item to enqueue.</param>
/// <returns>Returns false if there was no room in the queue.</returns>
public bool Enqueue(T value)
{
if (tail - head == items.Length)
// throw new IndexOutOfRangeException();
return false;
items[tail % items.Length] = value;
System.Threading.Interlocked.Increment(ref tail);
return true;
// tail++;
}
/// <summary>
/// Attempts to dequeue an item.
/// </summary>
/// <param name="r">The variable to dequeue to.</param>
/// <returns>Returns true if there was an available item to dequeue.</returns>
public bool Dequeue(out T r)
{
if (tail - head == 0)
// throw new InvalidOperationException("No more items.");
{ r = default(T); return false; }
r = items[head % items.Length];
System.Threading.Interlocked.Increment(ref head);
// head++;
// return r;
return true;
}
}那么:这行得通吗?若否,原因为何?如果是这样的话,还有什么可以预见的问题吗?
谢谢。
发布于 2011-06-26 07:32:40
尝试编写无锁的多线程代码是困难的,如果可能的话,你应该把它留给比你我更了解它的人(并使用ConcurrentQueue<T>),或者根本不写它(并使用锁)。
话虽如此,但您的代码存在以下几个问题:
threadCount为2,您将项目1、2和3一个接一个地入队,然后出队,您将获得项目2!Interlocked.Increment()。想象一下这样的场景:1. On thread 1: `items[tail % items.Length] = value;`
2. On thread 2: `items[tail % items.Length] = value;`
3. On thread 2: `Interlocked.Increment(ref head);`
4. On thread 1: `Interlocked.Increment(ref head);`现在,两个线程排队到相同的位置,之后的位置没有改变。这是错误的。
发布于 2011-06-26 07:59:31
您发布的代码对于入队和出队都不是线程安全的。
入队
你有一个空的queue
head = tail = 0
Enqueue,在items[tail % items.Length] = value;之后,但是在执行互锁增量之前,被中断了。它现在已经将一个值写入到item[0]中,但没有递增tailEnqueue并执行该方法。现在tail仍然是0,因此前一个线程写入的值被替换,因为第二个线程也会向item[0]写入一个值。出列
假设您在items[1]
head = 0和tail = 2
Dequeue,并在r = items[head % items.Length];之后中断,但在执行互锁的increments.
Dequeue,并将返回相同的项目,因为head仍然为0<代码>H246<代码>H147之后,您的队列将为空,但您将已经读取了一个项目两次,而一个项目根本没有读取。<代码>H248<代码>F249https://stackoverflow.com/questions/6480977
复制相似问题