首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >多线程生产者-消费者模型实现

多线程生产者-消费者模型实现
EN

Code Review用户
提问于 2021-12-12 14:36:36
回答 1查看 689关注 0票数 5

描述

我试图实现一个健壮的、多线程的生产者-消费者模型,主线程中只有一个生产者,其他线程中有几个使用者。

生成器线程遍历数据,通过某种条件识别它正在处理的数据类型,并继续将示例委托给相应的工作线程(通过pushBack(data))。当到达源数据的末尾时,它会向工作线程发送一个信号,并等待它们完成。然后,它清理干净,然后离开。

对于使用者线程,我编写了一个类,它在实例化时启动一个线程。每个类实例都有自己的FIFO队列,生产者线程将样本推入其中。接收到数据后,它可以立即开始处理,因为它不依赖于其他线程。当队列为空时,它将无限期地等待直到接收到一个样本或接收到一个停止信号(例如,调用方法stopThread )。如果在队列尚未空时收到停止信号,则继续完成队列中的其余元素,然后退出。

我使用的队列实现并不复杂(https://github.com/cameron314/readerwriterqueue,但我将简要介绍每种使用的方法所做的事情:

  • try_enqueue:如果队列中有空间,则对元素副本进行排队。如果元素已排队,则返回true,否则返回false。
  • try_dequeue:尝试将元素排出队列;如果队列为空,则返回false。
  • peek:返回一个指向队列中前端元素的指针(接下来将通过调用try_dequeuepop删除该指针)。如果在调用方法时队列显示为空,则返回nullptr。

代码

代码语言:javascript
复制
#include 
#include 
#include 
#include "readerwriterqueue.h"

class Consumer
{
public:
    Consumer(int id) : m_BufferQueue(64) {
        m_id = id;
        m_thread = std::thread([this] {work();});
    }
       
    void pushBack(int* num){
        while (!m_BufferQueue.try_enqueue(num)){
            std::this_thread::sleep_for(std::chrono::milliseconds(1));
        }
    }

    void stopThread(){
        m_running = false;

        if (m_thread.joinable())
            m_thread.join();
    }

    void work() {
        m_running = true;

        while(m_running || m_BufferQueue.peek())
        {
            int* received;

            bool succeeded = m_BufferQueue.try_dequeue(received);

            if(succeeded)
            {
                std::cout << "Thread " << m_id << " received value " << *received << std::endl;
                delete received;
            }
            else
                std::this_thread::sleep_for(std::chrono::milliseconds(1));
        }
    }

private:
    int m_id;
    moodycamel::ReaderWriterQueue m_BufferQueue;
    std::thread m_thread;
    std::atomic_bool m_running;
};


int main() {

    Consumer* c1 = new Consumer(1);
    Consumer* c2 = new Consumer(2);

    // data generator

    for(int i = 0; i < 10; i++)
    {
        int* val = new int(i);

        if (i % 2 == 0)
            c1->pushBack(val);
        else
            c2->pushBack(val);
    }

    c1->stopThread();
    c2->stopThread();

    delete c1;
    delete c2;

    std::cout << "EXIT" << std::endl;
    return 0;
}

关注的Points

  1. 代码是否以这样一种方式封装各个线程,使它们真正相互独立?我担心我可能忽略了这里的一些东西。
  2. 当停止线程时,在加入线程之前检查它是否可连接是一个很好的实践吗?据我所知,可连接只是检查线程是否已经启动,但在那个时候应该总是这样。所以这可能是不必要的。
  3. 还有其他我可能错过的疏忽吗?

链接到文件"readerwriterqueue.h

EN

回答 1

Code Review用户

发布于 2021-12-12 23:14:54

票数 3
EN
页面原文内容由Code Review提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://codereview.stackexchange.com/questions/270927

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档