上面的代码是一个无锁队列的实现,它假设只有一个使用者线程和一个生产者线程。这是按计划工作吗?记忆屏障是在正确的地方使用吗?代码有比赛分数吗?
PS:我知道在.NET 4.0+中存在无锁队列实现。
using System;
using System.Threading;
using System.Collections.Generic;
using Nohros.Resources;
namespace Nohros.Concurrent
{
/// <summary>
/// <see cref="YQueue{T}"/> is an efficient queue implementation ported from
/// the zeromq library. <see cref="YQueue{T}"/> allows one thread to use the
/// <see cref="Enqueue"/> function while another one use the
/// <see cref="Dequeue(out T)"/> function without locking.
/// <typeparam name="T">
/// The type of the objects in the queue.
/// </typeparam>
/// </summary>
public class YQueue<T>
{
class Chunk
{
public long distance;
public volatile int head_pos;
public volatile Chunk next;
public volatile int tail_pos;
public T[] values;
#region .ctor
/// <summary>
/// Initializes a new instance of the <see cref="Chunk"/> class by using
/// the specified capacity.
/// </summary>
/// <param name="capacity">
/// The number of elements that the chunk can hold.
/// </param>
public Chunk(int capacity) {
values = new T[capacity];
head_pos = 0;
tail_pos = 0;
next = null;
distance = 0;
}
#endregion
}
const int kDefaultCapacity = 32;
volatile Chunk divider_;
readonly int granularity_;
volatile Chunk tail_chunk_;
#region .ctor
/// <summary>
/// Initializes a new instance of the <see cref="YQueue{T}"/> class
/// that has the default capacity.
/// </summary>
public YQueue() : this(kDefaultCapacity) {
}
/// <summary>
/// Initializes a new instance of the <see cref="YQueue{T}"/> class
/// by using the specified granularity.
/// </summary>
/// <param name="granularity">
/// A number that defines the granularity of the queue(how many pushes
/// have to be done till actual memory allocation is required).
/// </param>
public YQueue(int granularity) {
granularity_ = granularity;
divider_ = new Chunk(granularity);
// make sure that the divider will not be used to store elements.
divider_.tail_pos = granularity - 1;
divider_.head_pos = granularity;
tail_chunk_ = divider_;
}
#endregion
/// <summary>
/// Adds an element to the back end of the queue.
/// </summary>
/// <param name="element">
/// The element to be added to the back end of the queue.
/// </param>
public void Enqueue(T element) {
int tail_pos = tail_chunk_.tail_pos;
// If either the queue is not empty or the tail chunk is not full, adds
// the specified element to the back end of the current tail chunk.
if (tail_chunk_ != divider_ && ++tail_pos < granularity_) {
tail_chunk_.values[tail_pos] = element;
// Prevents any kind of instruction reorderring or caching.
Thread.MemoryBarrier();
// "Commit" the newly added item and "publish" it atomically
// to the consumer thread.
tail_chunk_.tail_pos = tail_pos;
return;
}
// Create a new chunk if a cached one does not exists and links it
// to the current last node.
Chunk chunk = new Chunk(granularity_);
tail_chunk_.next = chunk;
// Reset the chunk and append the specified element to the first slot.
chunk.tail_pos = 0; // An unconsumed element is added to the first slot.
chunk.head_pos = 0;
chunk.next = null;
chunk.values[0] = element;
chunk.distance = tail_chunk_.distance + 1;
// Make sure that the new chunk is fully initialized before it is
// assigned to the tail chunk.
Thread.MemoryBarrier();
// At this point the newly created chunk(or the last cached chunk) is
// not yet shared, but still private to the producer; the consumer will
// not follow the linked chunk unless the value of |tail_chunk_| says
// it may follow. The line above "commit" the update and publish it
// atomically to the consumer thread.
tail_chunk_ = tail_chunk_.next;
}
/// <summary>
/// Removes all elements from the <see cref="YQueue{T}"/>.
/// </summary>
/// <remarks>
/// <para>
/// <see cref="Clear"/> removes the elements that are not currently
/// present in the queue. Elements added to the queue after
/// <see cref="Clear"/> is called and while <see cref="Clear"/> is running,
/// will not be cleared.
/// </para>
/// This operation should be sychronized with the <see cref="Dequeue()"/>
/// and <see cref="Dequeue(out T)"/> operations.
/// </remarks>
public void Clear() {
// Save the current tail chunk to ensure that the future elements are
// not cleared.
Chunk current_tail_chunk = tail_chunk_;
while (divider_ != current_tail_chunk) {
divider_ = divider_.next;
}
}
/// <summary>
/// Removes and returns the object at the beginning of the
/// <see cref="YQueue{T}"/>.
/// </summary>
/// <returns><typeparamref name="T"/> The object that is removed from the
/// <see cref="YQueue{T}"/></returns>
/// <exception cref="InvalidOperationException">The
/// <see cref="YQueue{T}"/> is empty.</exception>
public T Dequeue() {
T t;
bool ok = Dequeue(out t);
if (!ok) {
throw new InvalidOperationException(
StringResources.InvalidOperation_EmptyQueue);
}
return t;
}
/// <summary>
/// Removes and returns the object at the beginning of the
/// <see cref="YQueue<T>"/>.
/// </summary>
/// <param name="t">When this method returns, contains the object that was
/// removed from the beginning of the <see cref="YQueue<T>"/>, if
/// the object was successfully removed; otherwise the default value
/// of the type <typeparamref name="T"/>.</param>
/// <returns><c>true</c> if the queue is not empty and the object at the
/// beginning of it was successfully removed; otherwise, false.
/// </returns>
public bool Dequeue(out T t) {
// checks if the queue is empty
while (divider_ != tail_chunk_) {
// The chunks that sits between the |divider_| and the |tail_chunk_|,
// excluding the |divider_| and including the |tail_chunk_|, are
// unconsumed.
Chunk current_chunk = divider_.next;
// We need to compare the current chunk |tail_pos| with the |head_pos|
// and |granularity|. Since, the |tail_pos| can be modified by the
// producer thread we need to cache it instantaneous value.
int tail_pos;
tail_pos = current_chunk.tail_pos;
if (current_chunk.head_pos > tail_pos) {
if (tail_pos == granularity_ - 1) {
// we have reached the end of the chunk, go to the next chunk and
// frees the unused chunk.
divider_ = current_chunk;
//head_chunk_ = head_chunk_.next;
} else {
// we already consume all the available itens.
t = default(T);
return false;
}
} else {
// Ensure that we are reading the freshness value from the chunk
// values array.
Thread.MemoryBarrier();
// Here the |head_pos| is less than or equals to |tail_pos|, get
// the first unconsumed element and increments |head_pos| to publish
// the queue item removal.
t = current_chunk.values[current_chunk.head_pos];
// keep the order between assignment and publish operations.
Thread.MemoryBarrier();
current_chunk.head_pos++;
return true;
}
}
t = default(T);
return false;
}
/// <summary>
/// Gets a value indicating whether the <see cref="YQueue{T}"/> is empty.
/// </summary>
/// <remarks>
/// Since this collection is intended to be accessed concurrently by two
/// threads in a producer/consumer pattern, it may be the case that another
/// thread will modify the collection after <see cref="IsEmpty"/> returns,
/// thus invalidatind the result.
/// </remarks>
public bool IsEmpty {
get {
Chunk divider = divider_;
Chunk tail = tail_chunk_;
return (divider.next == tail || divider == tail) &&
tail.head_pos > tail.tail_pos;
}
}
}
}发布于 2012-12-22 13:02:32
_中为私有字段使用C#后缀的命名约定。最常用的约定使用m_、_作为前缀,或者根本不使用。类似地,k前缀对于C#中的常量来说并不常见。而且,在underscore_names中使用C#并不常见。++表达式中使用if。Chunk的字段设置两次为相同的值。特别是因为0和null无论如何都是默认值。divider_的存在。为什么在列表开始时需要一个空块?考虑到所有其他内容都得到了广泛的评论,这似乎是一个奇怪的疏忽。distance字段的原因是什么?你好像哪儿都没用过。Clear()可以轻松清除在执行过程中添加的项。您可以通过在Thread.Sleep(1)之前添加while并在另一个线程上调用Clear()时对某些项进行排队来看到这一点。(您还应该为此添加一些日志记录,这样您就可以看到事件的顺序。)t不是一个很好的变量名。像result这样的东西会更好。while在Dequeue()中是否必要,因为它最多只有两个迭代。我认为摆脱while会使您的代码更容易理解。否则,我没有发现任何线程问题,但这并不意味着没有任何问题。无锁编程可能非常棘手。
https://codereview.stackexchange.com/questions/19625
复制相似问题