首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >C /C++ -无锁(或非阻塞)环形缓冲区,覆盖最旧的数据?

C /C++ -无锁(或非阻塞)环形缓冲区,覆盖最旧的数据?
EN

Stack Overflow用户
提问于 2010-12-10 12:42:42
回答 3查看 5.7K关注 0票数 9

我正在试图找到一种方法,使锁自由或非阻塞的方式,使一个单一的消费者/单一消费者的环形缓冲区,将覆盖缓冲区中最旧的数据。我读过很多无锁算法,它们在缓冲区已满时“返回false”时有效--也就是说,不要添加;但我甚至找不到伪代码来讨论如何在需要覆盖最旧的数据时这样做。

我使用的是GCC 4.1.2 (工作限制,无法升级版本...)我有Boost库,在过去我做了我自己的Atomic< T>变量类型,它非常符合升级规范(它不是完美的,但它是线程安全的,并且做了我需要的事情)。

当我想到这一点时,我认为使用这些原子应该能真正解决这个问题。一些关于我的想法的粗略的伪代码:

代码语言:javascript
复制
template< typename T , unsigned int Size>
class RingBuffer {
private:
Atomic<unsigned int> readIndex;
Atomic<unsigned int> writeIndex;
enum Capacity { size = Size };
T* buf;

unsigned int getNextIndex(unsigned int i)
{
 return (i + 1 ) % size;
}

public:
RingBuffer() { //create array of size, set readIndex = writeIndex = 0 }
~RingBuffer() { //delete data }
void produce(const T& t)
{
 if(writeIndex == getNextIndex(readIndex)) //1
 {
  readIndex = getNextIndex(readIndex); //2
  }
  buf[writeIndex] = t;
  writeIndex = getNextIndex(writeIndex);  //3
}

bool consume(T& t)
{
  if(readIndex == writeIndex)  //4
   return false;
  t = buf[readIndex];  
  readIndex = getNexIndex(readIndex);  //5
  return true;
}

};

据我所知,这里没有死锁的情况,所以我们是安全的(如果我上面的实现是错误的,即使是在伪代码级别,建设性的批评总是值得感谢的)。然而,我能找到的最大的竞态条件是:

让我们假设缓冲区已满。也就是说,writeIndex +1 = readIndex;(1)在调用时发生。和为true (4)为false,因此我们移动到从缓冲区读取(5)发生,并且readIndex超前1(因此,实际上,在缓冲区(2)中有空间发生,再次超前readIndex,从而丢失该值。

基本上,这是一个典型的问题,写入者必须修改读取器,从而导致竞争条件。如果不在每次访问整个列表时阻塞整个列表,我想不出一种方法来防止这种情况发生。我遗漏了什么??

EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2010-12-10 13:18:50

  1. 从单个生产者/多个消费者队列开始,并提供适当的进度保证。
  2. 如果队列已满且推送失败,则弹出一个值。然后将有空间来推动新值。
票数 7
EN

Stack Overflow用户

发布于 2010-12-10 13:12:02

我错过了什么??

Lots:

  • 说你消费了一个t,而它正在被生产者覆盖--你是如何检测/处理它的?
    • 有很多选项--例如,do {复制值;使用修改序列num检查拷贝的完整性等等。

使用原子数的屏障是不够的-您还需要使用CAS式循环来影响索引增量(尽管我假设您知道这一点,因为您已经阅读了大量关于already)

  • memory

的文章

但是,让我们将其写为低于您的伪代码级别,并考虑您明确的问题:

  • 点(5)实际上需要CAS操作。如果readIndex在consume()之上被正确采样/复制-在复制(可能损坏的) t之前-那么如果生产者已经递增了CAS指令,那么它将失败。而不是通常的重新采样和重试CAS,只需继续。
票数 1
EN

Stack Overflow用户

发布于 2016-06-17 22:31:25

这是我最近创建的关于原子变量的循环缓冲区代码。我已经将其修改为“覆盖”数据,而不是返回false。免责声明-它还没有经过生产级测试。

代码语言:javascript
复制
    template<int capacity, int gap, typename T> class nonblockigcircular {
  /*
   * capacity - size of cicular buffer
   * gap - minimum safety distance between head and tail to start insertion operation
   *       generally gap should exceed number of threads attempting insertion concurrently 
   *       capacity should be gap plus desired buffer size 
   * T   - is a data type for buffer to keep
   */
  volatile T buf[capacity];  // buffer

  std::atomic<int> t, h, ph, wh; 
  /* t to h data available for reading
   * h to ph - zone where data is likely written but h is not updated yet
   *   to make sure data is written check if ph==wh 
   * ph to wh - zone where data changes in progress 
   */

  bool pop(T &pwk) {
    int td, tnd;

    do {
      int hd=h.load()%capacity;
      td=t.load()%capacity;
      if(hd==td) return false;
      tnd=(td+1)%capacity;
    } while(!t.compare_exchange_weak(td, tnd));

    pwk=buf[td];
    return true;
  }


  const int  count() {
    return ( h.load()+capacity-t.load() ) % capacity;
    }

  bool push(const T &pwk) {
    const int tt=t.load();
    int hd=h.load();

    if(  capacity - (hd+capacity-tt) % capacity < gap) {
       // Buffer is too full to insert
       // return false; 
       // or delete last record as below
       int nt=t.fetch_add(1);
       if(nt==capacity-1) t.fetch_sub(capacity);
       }


    int nwh=wh.fetch_add(1);
    if(nwh==capacity-1) wh.fetch_sub(capacity);

    buf[nwh%capacity]=pwk;

    int nph=ph.fetch_add(1);
    if(nph==capacity-1) ph.fetch_sub(capacity);

    if(nwh==nph) {
      int ohd=hd;
      while(! h.compare_exchange_weak(hd, nwh) ) {
        hd=h.load();
        if( (ohd+capacity-hd) % capacity > (ohd+capacity-nwh) % capacity ) break;
      }
    }
    return true;
  }

};
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/4405721

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档