说明:每个生产者在自己的线程上运行,并将使用Push()将其项目输入到RingBuffer中。
每个使用者都在自己的线程上运行,并且有自己的唯一号码。当消费者TryRead()时,它将输入其唯一的号码。
RingBuffer将跟踪每个消费者在std::vector<long> mReadIdxVec中读取的最后一个位置。
如果mReadIdxVec[consumer unique number] >= mMaxReadIdx,TryRead()将返回一个nullptr。
问:在Push()方法中,如果不同线程上的多个生产者同时调用这一行mVector[index] = pMsg;会不会成为一个问题?
这个代码还能进一步改进吗?欢迎任何意见!
RingBuffer.h
#ifndef __RING_BUFFER_H_
#define __RING_BUFFER_H_
#include <atomic>
class RingBuffer
{
private:
std::vector<Foo *> mVector;
std::atomic<long> mWriteIdx{0}; //mWriteIdx will not exceed 2,000,000,000
std::atomic<long> mMaxReadIdx{0}; //mMaxReadIdx will not exceed 2,000,000,000
std::vector<long> mReadIdxVec;
public:
RingBuffer(int pNumOfConsumers);
void Push(Foo *);
Foo * TryRead(const int&);
};
#endifRingBuffer.cc
#include "RingBuffer.h"
#define MAX_SIZE 1000
RingBuffer::RingBuffer(int pNumOfConsumers)
{
mVector.reserve(MAX_SIZE);
mVector.assign(MAX_SIZE, nullptr);
mReadIdxVec.assign(pNumOfConsumers, 0);
}
void RingBuffer::Push(Foo * pMsg)
{
if(!pMsg) return;
long writeIdx = mWriteIdx++;
long index = writeIdx % MAX_SIZE;
if(writeIdx >= MAX_SIZE) delete mVector[index];
mVector[index] = pMsg;
mMaxReadIdx++;
}
Foo * RingBuffer::TryRead(const int& pConsumer)
{
if(mReadIdxVec[pConsumer] >= mMaxReadIdx) return nullptr;
long index = mReadIdxVec[pConsumer] % MAX_SIZE;
mReadIdxVec[pConsumer]++;
return mVector[index];
}发布于 2018-08-16 03:47:18
对于某些成员的值限制,在RingBuffer的定义中有注释,但是代码中没有任何东西强制执行或检查是否超过了这些限制。
为什么TryRead用const int&来取它的参数?这一提法是不必要的。只需输入int。
为什么MAX_SIZE是一个宏?它应该是一个constexpr (如果您的编译器支持它)或static const int (如果不是)。
在RingBuffer构造函数中,可以将reserve和assign调用组合为一个,并将其包含在成员初始化程序列表中。
Push将愉快地、无声地覆盖添加到环形缓冲区但未被消耗的内容。
按照设置Push的方式,您可以让多个线程写入相同的内存位置(例如,如果对Push有足够的其他调用,那么为索引计算的值就会返回)。
mMaxReadIdx也有可能在没有读槽的情况下表示它是可用的,例如,线程A启动一个推送,并获得一个写索引。然后线程B启动另一个推送,获取一个写索引,存储它的值,并在线程A存储它的消息之前增加mMaxReaddx。线程A使用的插槽将被TryRead读取,但其中还没有数据。
TryRead可以向多个使用者返回相同的消息,因为每个使用者都对缓冲区底部的位置有自己的想法。
RingBuffer::RingBuffer(int pNumOfConsumers): mVector(MAX_SIZE, nullptr)发布于 2018-08-24 04:32:33
为了解决mMaxReadIdx在线程A存储消息之前被线程B递增的问题,我们可以考虑使用以下原子布尔值吗?
Ringbuffer.h
#ifndef __RING_BUFFER_H_
#define __RING_BUFFER_H_
#include <atomic>
class RingBuffer
{
private:
std::vector<Foo> mVector;
std::atomic<long> mWriteIdx{0}; //mWriteIdx will not exceed 2,000,000,000
std::atomic<long> mMaxReadIdx{0}; //mMaxReadIdx will not exceed 2,000,000,000
std::vector<long> mReadIdxVec;
std::atomic<bool> mIsPushing{false};
public:
RingBuffer(int pNumOfConsumers);
void Push(Foo);
Foo * TryRead(int);
};
#endif#include "RingBuffer.h"
static constexpr int const& MAX_SIZE = 5000;
RingBuffer::RingBuffer(int pNumOfConsumers)
{
for(int i=0; i < MAX_SIZE; i++)
{
Foo f;
mVector.push_back(f);
}
mReadIdxVec.assign(pNumOfConsumers, 0);
}
void RingBuffer::Push(Foo pMsg)
{
long writeIdx = mWriteIdx++;
long index = writeIdx % MAX_SIZE;
while(mIsPushing.load(std::memory_order_seq_cst)){};
mIsPushing.store(true,std::memory_order_seq_cst);
if(writeIdx >= MAX_SIZE) delete mVector[index];
mVector[index] = pMsg;
mMaxReadIdx++;
mIsPushing.store(false,std::memory_order_seq_cst);
}
Foo * RingBuffer::TryRead(const int& pConsumer)
{
if(mReadIdxVec[pConsumer] >= mMaxReadIdx) return nullptr;
long index = mReadIdxVec[pConsumer] % MAX_SIZE;
mReadIdxVec[pConsumer]++;
return mVector[index];
}环缓冲h(无锁)
#ifndef __RING_BUFFER_H_
#define __RING_BUFFER_H_
#include <atomic>
class Foo
{
public:
long bar = 0;
double bar2 = 0;
std::atomic<bool> isSetting{false};
void set(long pBar, double pBar2)
{
isSetting = true;
bar = pBar;
bar2 = pBar2;
isSetting = false;
}
Foo(){};
Foo(const Foo &m2) //copy constructor required as they are default-deleted for atomics
{
set(m2.bar, m2.bar2);
}
};
class RingBuffer
{
private:
std::vector<Foo> mVector;
std::atomic<long> mWriteIdx{0}; //mWriteIdx will not exceed 2,000,000,000
std::atomic<long> mMaxReadIdx{0}; //mMaxReadIdx will not exceed 2,000,000,000
std::vector<long> mReadIdxVec;
public:
RingBuffer(int pNumOfConsumers);
void Push(Foo);
Foo * TryRead(int);
};
#endif#include "RingBuffer.h"
static constexpr int const& MAX_SIZE = 5000;
RingBuffer::RingBuffer(int pNumOfConsumers)
{
for(int i=0; i < MAX_SIZE; i++)
{
Foo f;
mVector.push_back(f);
}
mReadIdxVec.assign(pNumOfConsumers, 0);
}
void RingBuffer::Push(Foo pMsg)
{
long writeIdx = mWriteIdx++;
long index = writeIdx % MAX_SIZE;
mVector[index].set(pMsg.bar, pMsg.bar2);
}
Foo * RingBuffer::TryRead(const int& pConsumer)
{
if(mReadIdxVec[pConsumer] >= mWriteIdx) return nullptr;
if(mVector[index].isSetting) return nullptr;
long index = mReadIdxVec[pConsumer] % MAX_SIZE;
mReadIdxVec[pConsumer]++;
return &mVector[index];
}https://codereview.stackexchange.com/questions/201770
复制相似问题