首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >publisher/使用者线程安全的无锁队列,只有一个发布者/使用者

publisher/使用者线程安全的无锁队列,只有一个发布者/使用者
EN

Code Review用户
提问于 2012-12-14 21:08:50
回答 1查看 2.5K关注 0票数 2

上面的代码是一个无锁队列的实现,它假设只有一个使用者线程和一个生产者线程。这是按计划工作吗?记忆屏障是在正确的地方使用吗?代码有比赛分数吗?

PS:我知道在.NET 4.0+中存在无锁队列实现。

代码语言:javascript
复制
using System;
using System.Threading;
using System.Collections.Generic;
using Nohros.Resources;

namespace Nohros.Concurrent
{
  /// <summary>
  /// <see cref="YQueue{T}"/> is an efficient queue implementation ported from
  /// the zeromq library. <see cref="YQueue{T}"/> allows one thread to use the
  /// <see cref="Enqueue"/> function while another one use the
  /// <see cref="Dequeue(out T)"/> function without locking.
  /// <typeparam name="T">
  /// The type of the objects in the queue.
  /// </typeparam>
  /// </summary>
  public class YQueue<T>
  {
    class Chunk
    {
      public long distance;
      public volatile int head_pos;
      public volatile Chunk next;
      public volatile int tail_pos;
      public T[] values;

      #region .ctor
      /// <summary>
      /// Initializes a new instance of the <see cref="Chunk"/> class by using
      /// the specified capacity.
      /// </summary>
      /// <param name="capacity">
      /// The number of elements that the chunk can hold.
      /// </param>
      public Chunk(int capacity) {
        values = new T[capacity];
        head_pos = 0;
        tail_pos = 0;
        next = null;
        distance = 0;
      }
      #endregion
    }

    const int kDefaultCapacity = 32;
    volatile Chunk divider_;
    readonly int granularity_;
    volatile Chunk tail_chunk_;

    #region .ctor
    /// <summary>
    /// Initializes a new instance of the <see cref="YQueue{T}"/> class
    /// that has the default capacity.
    /// </summary>
    public YQueue() : this(kDefaultCapacity) {
    }

    /// <summary>
    /// Initializes a new instance of the <see cref="YQueue{T}"/> class
    /// by using the specified granularity.
    /// </summary>
    /// <param name="granularity">
    /// A number that defines the granularity of the queue(how many pushes
    /// have to be done till actual memory allocation is required).
    /// </param>
    public YQueue(int granularity) {
      granularity_ = granularity;
      divider_ = new Chunk(granularity);

      // make sure that the divider will not be used to store elements.
      divider_.tail_pos = granularity - 1;
      divider_.head_pos = granularity;

      tail_chunk_ = divider_;
    }
    #endregion

    /// <summary>
    /// Adds an element to the back end of the queue.
    /// </summary>
    /// <param name="element">
    /// The element to be added to the back end of the queue.
    /// </param>
    public void Enqueue(T element) {
      int tail_pos = tail_chunk_.tail_pos;

      // If either the queue is not empty or the tail chunk is not full, adds
      // the specified element to the back end of the current tail chunk.
      if (tail_chunk_ != divider_ && ++tail_pos < granularity_) {
        tail_chunk_.values[tail_pos] = element;

        // Prevents any kind of instruction reorderring or caching.
        Thread.MemoryBarrier();

        // "Commit" the newly added item and "publish" it atomically
        // to the consumer thread.
        tail_chunk_.tail_pos = tail_pos;
        return;
      }

      // Create a new chunk if a cached one does not exists and links it
      // to the current last node.
      Chunk chunk = new Chunk(granularity_);
      tail_chunk_.next = chunk;

      // Reset the chunk and append the specified element to the first slot.
      chunk.tail_pos = 0; // An unconsumed element is added to the first slot.
      chunk.head_pos = 0;
      chunk.next = null;
      chunk.values[0] = element;
      chunk.distance = tail_chunk_.distance + 1;

      // Make sure that the new chunk is fully initialized before it is
      // assigned to the tail chunk.
      Thread.MemoryBarrier();

      // At this point the newly created chunk(or the last cached chunk) is
      // not yet shared, but still private to the producer; the consumer will
      // not follow the linked chunk unless the value of |tail_chunk_| says
      // it may follow. The line above "commit" the update and publish it
      // atomically to the consumer thread.
      tail_chunk_ = tail_chunk_.next;
    }

    /// <summary>
    /// Removes all elements from the <see cref="YQueue{T}"/>.
    /// </summary>
    /// <remarks>
    /// <para>
    /// <see cref="Clear"/> removes the elements that are not currently
    /// present in the queue. Elements added to the queue after
    /// <see cref="Clear"/> is called and while <see cref="Clear"/> is running,
    /// will not be cleared.
    /// </para>
    /// This operation should be sychronized with the <see cref="Dequeue()"/>
    /// and <see cref="Dequeue(out T)"/> operations.
    /// </remarks>
    public void Clear() {
      // Save the current tail chunk to ensure that the future elements are
      // not cleared.
      Chunk current_tail_chunk = tail_chunk_;
      while (divider_ != current_tail_chunk) {
        divider_ = divider_.next;
      }
    }

    /// <summary>
    /// Removes and returns the object at the beginning of the
    /// <see cref="YQueue{T}"/>.
    /// </summary>
    /// <returns><typeparamref name="T"/> The object that is removed from the
    /// <see cref="YQueue{T}"/></returns>
    /// <exception cref="InvalidOperationException">The
    /// <see cref="YQueue{T}"/> is empty.</exception>
    public T Dequeue() {
      T t;
      bool ok = Dequeue(out t);
      if (!ok) {
        throw new InvalidOperationException(
          StringResources.InvalidOperation_EmptyQueue);
      }
      return t;
    }

    /// <summary>
    /// Removes and returns the object at the beginning of the
    /// <see cref="YQueue<T>"/>.
    /// </summary>
    /// <param name="t">When this method returns, contains the object that was
    /// removed from the beginning of the <see cref="YQueue<T>"/>, if
    /// the object was successfully removed; otherwise the default value
    /// of the type <typeparamref name="T"/>.</param>
    /// <returns><c>true</c> if the queue is not empty and the object at the
    /// beginning of it was successfully removed; otherwise, false.
    /// </returns>
    public bool Dequeue(out T t) {
      // checks if the queue is empty
      while (divider_ != tail_chunk_) {
        // The chunks that sits between the |divider_| and the |tail_chunk_|,
        // excluding the |divider_| and including the |tail_chunk_|, are
        // unconsumed.
        Chunk current_chunk = divider_.next;

        // We need to compare the current chunk |tail_pos| with the |head_pos|
        // and |granularity|. Since, the |tail_pos| can be modified by the
        // producer thread we need to cache it instantaneous value.
        int tail_pos;
        tail_pos = current_chunk.tail_pos;

        if (current_chunk.head_pos > tail_pos) {
          if (tail_pos == granularity_ - 1) {
            // we have reached the end of the chunk, go to the next chunk and
            // frees the unused chunk.
            divider_ = current_chunk;
            //head_chunk_ = head_chunk_.next;
          } else {
            // we already consume all the available itens.
            t = default(T);
            return false;
          }
        } else {
          // Ensure that we are reading the freshness value from the chunk
          // values array.
          Thread.MemoryBarrier();

          // Here the |head_pos| is less than or equals to |tail_pos|, get
          // the first unconsumed element and increments |head_pos| to publish
          // the queue item removal.
          t = current_chunk.values[current_chunk.head_pos];

          // keep the order between assignment and publish operations.
          Thread.MemoryBarrier();

          current_chunk.head_pos++;
          return true;
        }
      }
      t = default(T);
      return false;
    }

    /// <summary>
    /// Gets a value indicating whether the <see cref="YQueue{T}"/> is empty.
    /// </summary>
    /// <remarks>
    /// Since this collection is intended to be accessed concurrently by two
    /// threads in a producer/consumer pattern, it may be the case that another
    /// thread will modify the collection after <see cref="IsEmpty"/> returns,
    /// thus invalidatind the result.
    /// </remarks>
    public bool IsEmpty {
      get {
        Chunk divider = divider_;
        Chunk tail = tail_chunk_;

        return (divider.next == tail || divider == tail) &&
          tail.head_pos > tail.tail_pos;
      }
    }
  }
}
EN

回答 1

Code Review用户

发布于 2012-12-22 13:02:32

  1. 我从未见过在_中为私有字段使用C#后缀的命名约定。最常用的约定使用m__作为前缀,或者根本不使用。类似地,k前缀对于C#中的常量来说并不常见。而且,在underscore_names中使用C#并不常见。
  2. 通常情况下,如果一个表达式用于其值或其副作用,而不是两者兼而有之,则更好。因此,您应该避免在++表达式中使用if
  3. 我看不出为什么要将新Chunk的字段设置两次为相同的值。特别是因为0null无论如何都是默认值。
  4. 关于“缓存块”的评论是令人困惑的,因为您似乎根本不缓存块。
  5. 我不明白divider_的存在。为什么在列表开始时需要一个空块?考虑到所有其他内容都得到了广泛的评论,这似乎是一个奇怪的疏忽。
  6. distance字段的原因是什么?你好像哪儿都没用过。
  7. 如果添加到最后一个块(而不是一个新的块),Clear()可以轻松清除在执行过程中添加的项。您可以通过在Thread.Sleep(1)之前添加while并在另一个线程上调用Clear()时对某些项进行排队来看到这一点。(您还应该为此添加一些日志记录,这样您就可以看到事件的顺序。)
  8. t不是一个很好的变量名。像result这样的东西会更好。
  9. 我不确定whileDequeue()中是否必要,因为它最多只有两个迭代。我认为摆脱while会使您的代码更容易理解。

否则,我没有发现任何线程问题,但这并不意味着没有任何问题。无锁编程可能非常棘手。

票数 2
EN
页面原文内容由Code Review提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://codereview.stackexchange.com/questions/19625

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档