这是这里讨论的主题的后续文章。
我试图实现一个健壮的、多线程的生产者-消费者模型,主线程中只有一个生产者,其他线程中有几个使用者。
在研究了这个主题之后,我还添加了一个复制构造函数和一个赋值操作符(或者更好的是删除它),以满足三条规则。我这样做了,因为能够复制线程管理类实例是没有意义的。
操作模式与以前相同,例如,生成器线程遍历数据,通过某种条件识别它正在处理的数据类型,并继续将示例委托给相应的工作线程(通过pushBack(数据))。当到达源数据的末尾时,它会将一个特殊值排队到工作线程,并等待它们完成。然后,它清理干净,然后离开。
队列实现(已发现的这里)提供了阻塞脱队列功能,例如不需要轮询。缺点是,使用者不一定要注册主线程设置了m_running (指示是否应该停止执行的标志),因为他们一直在等待。正如前面提到的,可以通过排队设置一个特殊值来绕过这一点。在提供的示例中,我假设队列只管理指针类型(例如int*)。在这种情况下,特殊值被确定为nullptr。
归功于@G.Sliepen,他在前面的主题中提供了许多建议。
#include <chrono>
#include <iostream>
#include <thread>
#include <functional>
#include "readerwriterqueue.h"
template<typename T>
class Consumer
{
public:
Consumer(int id, std::function<void(int, T&)> func) : m_id(id), m_func(func) {}
~Consumer(){
m_running = false;
this->pushBack(nullptr);
m_thread.join();
}
Consumer(const Consumer&) = delete;
Consumer& operator=(const Consumer&) = delete;
Consumer() : m_id(0) {};
void pushBack(const T& t){
m_BufferQueue.enqueue(t);
}
private:
void work() {
m_running = true;
while(m_running || m_BufferQueue.peek())
{
T t;
m_BufferQueue.wait_dequeue(t);
if (t == nullptr)
break;
m_func(m_id, t);
}
std::cout << "EXIT thread " << m_id << std::endl;
}
int m_id;
std::function<void(int, T&)> m_func;
moodycamel::BlockingReaderWriterQueue<T> m_BufferQueue{64};
std::thread m_thread{&Consumer::work, this};
std::atomic_bool m_running;
};
int main() {
auto func = [](int id, int* val){
std::cout << "Thread " << id << " received value " << *val << std::endl;
delete val;
};
Consumer<int*> c1(1, func);
Consumer<int*> c2(2, func);
// data generator
for(int i = 0; i < 10; i++)
{
int* val = new int(i);
if (i % 2 == 0)
c1.pushBack(val);
else
c2.pushBack(val);
}
std::cout << "EXIT" << std::endl;
return 0;
}当在模板中使用非指针数据类型时,此实现会中断。据我所知,引用总是必须引用非空对象,因此检查“空引用”是没有意义的。检查每一种可能的类型似乎都很容易出错。我想加入一个参数化的成员变量来保存堕胎值,但我不确定这是否是最好的方法。
简而言之:是否有一种优雅的方法来定义不可知的堕胎值?
发布于 2021-12-17 12:01:30
T成为指针我强烈建议避免模板参数是指针,而让它们成为值类型。在需要的地方,您可以很容易地将它们转换为指针。首先,将指针作为模板参数传递给类似容器的类型并不是很习惯的做法,而Consumer只是一个容器(队列)和一个相关的线程。所以作为你们班的一个用户,我的第一反应是:
Consumer<int> c1(1, func);但这将无法编译。还要考虑的是,如果让用户传递指针类型,他们可以尝试执行以下操作:
Consumer<const int*> c1(1, func);那会是什么语义学?另一个问题是,糟糕的事情会发生,打电话的人会做这样的事情:
c1.pushBack(nullptr);
c1.pushBack(new int(42));对pushBack()的第一次调用将导致工作人员进一步停止处理队列,这意味着第二个调用将导致内存泄漏。当然,不太可能有人会故意推一个nullptr,但是考虑一下,您可能有如下代码:
c1.pushBack(get_the_thing(...));调用的函数有时可能返回一个nullptr。这将导致很难调试问题。最后,这个设计鼓励使用对new和delete的原始调用,这应该避免:
Consumer管理队列项的生存期()
在前面版本的代码中,Consumer负责管理队列中项的存储。您仍然可以通过使用std::unique_ptr类型的队列存储项来避免原始指针。您仍然可以有一个空的std::unique_ptr,您可以使用它作为员工应该停止的信号:
template<typename T>
class Consumer {
public:
...
~Consumer() {
...
m_BufferQueue.enqueue(nullptr);
...
}
...
void pushBack(const T& t) {
m_BufferQueue.enqueue(std::make_unique<T>(t));
}
private:
void work() {
...
while(...) {
std::unique_ptr<T> ptr;
m_BufferQueue.wait_dequeue(ptr);
if (!ptr)
break;
m_func(m_id, *ptr);
}
...
}
...
moodycamel::BlockingReaderWriterQueue<std::unique_ptr<T>> m_BufferQueue{64};
...
};使用指针(无论是原始指针还是智能指针)的缺点是,将内存分配给您推送到队列的每个元素,这对性能影响不大。或者,考虑使用std::optional代替。您可以在上面的代码中使用它作为std::unique_ptr<T>的插入替代。
emplaceBack()函数以及就像STL容器有push_back()和emplace_back()一样,也可以考虑添加后者。这避免了首先必须构造一个类型为T的对象,然后必须将该对象移动或复制到队列中。当然,这需要您的BlockingReaderWriterQueue()也有一个emplace()函数。如果您想要将不可复制和不可移动的对象添加到队列中,那么拥有类似于嵌入的函数尤其重要。
m_running不再是必要的由于您保证在运行Consumer的析构函数时始终会将一个特殊值推送到队列中,所以根本不需要m_running。你可以安全地把它移走。
请注意,在工作线程开始时将m_running设置为true也有点危险;请考虑在创建Consumer对象之后立即销毁它,然后在析构函数中的m_running = false和work()中的m_running = true之间发生争用条件。
所有具有指针或选项的方法都有一个缺点,即它们增加了队列中每个元素所需的存储量。对于具有小元素的大型队列,这可能是一个问题。您唯一想知道的是是否应该在work()中脱离循环,理想情况下,只需要一个(原子)布尔值,比如m_running。但是问题是,如果work()在一个空队列上等待,它就会被唤醒。相反,您可以将这个问题推到BlockingReadeWriterQueue上,这样您就可以向队列本身发出信号,表明它应该终止,然后wait_dequeue()应该在队列终止且没有剩下的项时返回false。这样,您的Consumer就可以做到:
~Consumer() {
m_BUfferQueue.terminate();
m_thread.join();
}
void work() {
for (T t; m_BufferQueue.wait_dequeue(t);)
m_func(m_id, t);
}https://codereview.stackexchange.com/questions/271047
复制相似问题