描述
我试图实现一个健壮的、多线程的生产者-消费者模型,主线程中只有一个生产者,其他线程中有几个使用者。
生成器线程遍历数据,通过某种条件识别它正在处理的数据类型,并继续将示例委托给相应的工作线程(通过pushBack(data))。当到达源数据的末尾时,它会向工作线程发送一个信号,并等待它们完成。然后,它清理干净,然后离开。
对于使用者线程,我编写了一个类,它在实例化时启动一个线程。每个类实例都有自己的FIFO队列,生产者线程将样本推入其中。接收到数据后,它可以立即开始处理,因为它不依赖于其他线程。当队列为空时,它将无限期地等待直到接收到一个样本或接收到一个停止信号(例如,调用方法stopThread )。如果在队列尚未空时收到停止信号,则继续完成队列中的其余元素,然后退出。
我使用的队列实现并不复杂(https://github.com/cameron314/readerwriterqueue,但我将简要介绍每种使用的方法所做的事情:
try_dequeue或pop删除该指针)。如果在调用方法时队列显示为空,则返回nullptr。代码
#include
#include
#include
#include "readerwriterqueue.h"
class Consumer
{
public:
Consumer(int id) : m_BufferQueue(64) {
m_id = id;
m_thread = std::thread([this] {work();});
}
void pushBack(int* num){
while (!m_BufferQueue.try_enqueue(num)){
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}
void stopThread(){
m_running = false;
if (m_thread.joinable())
m_thread.join();
}
void work() {
m_running = true;
while(m_running || m_BufferQueue.peek())
{
int* received;
bool succeeded = m_BufferQueue.try_dequeue(received);
if(succeeded)
{
std::cout << "Thread " << m_id << " received value " << *received << std::endl;
delete received;
}
else
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}
private:
int m_id;
moodycamel::ReaderWriterQueue m_BufferQueue;
std::thread m_thread;
std::atomic_bool m_running;
};
int main() {
Consumer* c1 = new Consumer(1);
Consumer* c2 = new Consumer(2);
// data generator
for(int i = 0; i < 10; i++)
{
int* val = new int(i);
if (i % 2 == 0)
c1->pushBack(val);
else
c2->pushBack(val);
}
c1->stopThread();
c2->stopThread();
delete c1;
delete c2;
std::cout << "EXIT" << std::endl;
return 0;
}关注的Points
链接到文件"readerwriterqueue.h“
发布于 2021-12-12 23:14:54
https://codereview.stackexchange.com/questions/270927
复制相似问题