首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >生产者-消费者模型队列中的不确定堕胎条件

生产者-消费者模型队列中的不确定堕胎条件
EN

Code Review用户
提问于 2021-12-16 00:35:17
回答 1查看 52关注 0票数 0

这是这里讨论的主题的后续文章。

描述

我试图实现一个健壮的、多线程的生产者-消费者模型,主线程中只有一个生产者,其他线程中有几个使用者。

在研究了这个主题之后,我还添加了一个复制构造函数和一个赋值操作符(或者更好的是删除它),以满足三条规则。我这样做了,因为能够复制线程管理类实例是没有意义的。

操作模式与以前相同,例如,生成器线程遍历数据,通过某种条件识别它正在处理的数据类型,并继续将示例委托给相应的工作线程(通过pushBack(数据))。当到达源数据的末尾时,它会将一个特殊值排队到工作线程,并等待它们完成。然后,它清理干净,然后离开。

队列实现(已发现的这里)提供了阻塞脱队列功能,例如不需要轮询。缺点是,使用者不一定要注册主线程设置了m_running (指示是否应该停止执行的标志),因为他们一直在等待。正如前面提到的,可以通过排队设置一个特殊值来绕过这一点。在提供的示例中,我假设队列只管理指针类型(例如int*)。在这种情况下,特殊值被确定为nullptr

归功于@G.Sliepen,他在前面的主题中提供了许多建议。

代码语言:javascript
复制
#include <chrono>
#include <iostream>
#include <thread>
#include <functional>
#include "readerwriterqueue.h"

template<typename T>
class Consumer
{
public:
    Consumer(int id, std::function<void(int, T&)> func) : m_id(id), m_func(func) {}

    ~Consumer(){
        m_running = false;
        this->pushBack(nullptr);
        m_thread.join();
    }

    Consumer(const Consumer&) = delete;
    Consumer& operator=(const Consumer&) = delete;
    Consumer() : m_id(0) {};

    void pushBack(const T& t){
        m_BufferQueue.enqueue(t);
    }

private:

    void work() {
        m_running = true;

        while(m_running || m_BufferQueue.peek())
        {
            T t;
            m_BufferQueue.wait_dequeue(t);

            if (t == nullptr)
                break;

            m_func(m_id, t);
        }

        std::cout << "EXIT thread " << m_id << std::endl;
    }

    int m_id;
    std::function<void(int, T&)> m_func;
    moodycamel::BlockingReaderWriterQueue<T> m_BufferQueue{64};
    std::thread m_thread{&Consumer::work, this};
    std::atomic_bool m_running;
};

int main() {

    auto func = [](int id, int* val){
        std::cout << "Thread " << id << " received value " << *val << std::endl;
        delete val;
    };

    Consumer<int*> c1(1, func);
    Consumer<int*> c2(2, func);

    // data generator
    for(int i = 0; i < 10; i++)
    {
        int* val = new int(i);
        if (i % 2 == 0)
            c1.pushBack(val);
        else
            c2.pushBack(val);
    }

    std::cout << "EXIT" << std::endl;
    return 0;
}

问题

当在模板中使用非指针数据类型时,此实现会中断。据我所知,引用总是必须引用非空对象,因此检查“空引用”是没有意义的。检查每一种可能的类型似乎都很容易出错。我想加入一个参数化的成员变量来保存堕胎值,但我不确定这是否是最好的方法。

简而言之:是否有一种优雅的方法来定义不可知的堕胎值?

EN

回答 1

Code Review用户

回答已采纳

发布于 2021-12-17 12:01:30

避免让T成为指针

我强烈建议避免模板参数是指针,而让它们成为值类型。在需要的地方,您可以很容易地将它们转换为指针。首先,将指针作为模板参数传递给类似容器的类型并不是很习惯的做法,而Consumer只是一个容器(队列)和一个相关的线程。所以作为你们班的一个用户,我的第一反应是:

代码语言:javascript
复制
Consumer<int> c1(1, func);

但这将无法编译。还要考虑的是,如果让用户传递指针类型,他们可以尝试执行以下操作:

代码语言:javascript
复制
Consumer<const int*> c1(1, func);

那会是什么语义学?另一个问题是,糟糕的事情会发生,打电话的人会做这样的事情:

代码语言:javascript
复制
c1.pushBack(nullptr);
c1.pushBack(new int(42));

pushBack()的第一次调用将导致工作人员进一步停止处理队列,这意味着第二个调用将导致内存泄漏。当然,不太可能有人会故意推一个nullptr,但是考虑一下,您可能有如下代码:

代码语言:javascript
复制
c1.pushBack(get_the_thing(...));

调用的函数有时可能返回一个nullptr。这将导致很难调试问题。最后,这个设计鼓励使用对newdelete的原始调用,这应该避免:

Consumer管理队列项的生存期(

)

在前面版本的代码中,Consumer负责管理队列中项的存储。您仍然可以通过使用std::unique_ptr类型的队列存储项来避免原始指针。您仍然可以有一个空的std::unique_ptr,您可以使用它作为员工应该停止的信号:

代码语言:javascript
复制
template<typename T>
class Consumer {
public:
    ...
    ~Consumer() {
         ...
         m_BufferQueue.enqueue(nullptr);
         ...
    }
    ...   
    void pushBack(const T& t) {
        m_BufferQueue.enqueue(std::make_unique<T>(t));
    }

private:
    void work() {
        ...
        while(...) {
            std::unique_ptr<T> ptr;
            m_BufferQueue.wait_dequeue(ptr);

            if (!ptr)
                break;

            m_func(m_id, *ptr);
        }
        ...
    }
    ...
    moodycamel::BlockingReaderWriterQueue<std::unique_ptr<T>> m_BufferQueue{64};
    ...
};

使用指针(无论是原始指针还是智能指针)的缺点是,将内存分配给您推送到队列的每个元素,这对性能影响不大。或者,考虑使用std::optional代替。您可以在上面的代码中使用它作为std::unique_ptr<T>的插入替代。

考虑添加emplaceBack()函数以及

就像STL容器有push_back()emplace_back()一样,也可以考虑添加后者。这避免了首先必须构造一个类型为T的对象,然后必须将该对象移动或复制到队列中。当然,这需要您的BlockingReaderWriterQueue()也有一个emplace()函数。如果您想要将不可复制和不可移动的对象添加到队列中,那么拥有类似于嵌入的函数尤其重要。

m_running不再是必要的

由于您保证在运行Consumer的析构函数时始终会将一个特殊值推送到队列中,所以根本不需要m_running。你可以安全地把它移走。

请注意,在工作线程开始时将m_running设置为true也有点危险;请考虑在创建Consumer对象之后立即销毁它,然后在析构函数中的m_running = falsework()中的m_running = true之间发生争用条件。

替代解决方案

所有具有指针或选项的方法都有一个缺点,即它们增加了队列中每个元素所需的存储量。对于具有小元素的大型队列,这可能是一个问题。您唯一想知道的是是否应该在work()中脱离循环,理想情况下,只需要一个(原子)布尔值,比如m_running。但是问题是,如果work()在一个空队列上等待,它就会被唤醒。相反,您可以将这个问题推到BlockingReadeWriterQueue上,这样您就可以向队列本身发出信号,表明它应该终止,然后wait_dequeue()应该在队列终止且没有剩下的项时返回false。这样,您的Consumer就可以做到:

代码语言:javascript
复制
~Consumer() {
    m_BUfferQueue.terminate();
    m_thread.join();
}

void work() {
    for (T t; m_BufferQueue.wait_dequeue(t);)
        m_func(m_id, t);
}
票数 2
EN
页面原文内容由Code Review提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://codereview.stackexchange.com/questions/271047

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档