编辑:基于反馈的改进代码可用这里。
作为前一个问题的续集,这里有一个改进版本(有明确的命名)。想法是一样的:一个整数threads_ready被增加到threads.size(),直到所有线程都完成了有效负载,然后当所有线程准备好再次执行时,返回到0。
这个版本没有繁忙的等待,它更通用,因为模板。有没有可能进行更多的优化?我认为使用原子作为状态是过分的,因为它大多是在互斥项下修改的,但是如果不使用atomic,程序就无法工作。
我尝试过使用可变模板参数重新实现此操作,但失败了。我认为这主要是因为运行的论点是通过指针提供的。在这里使用lambda会是一个更好的解决方案吗?(主要涉及可移植性,但也涉及性能)
#include <iostream>
#include <functional>
#include <vector>
#include <thread>
#include <mutex>
#include <iomanip>
#include <cassert>
#include <numeric>
#include <atomic>
#include <condition_variable>
#include <chrono>
using std::atomic;
using std::vector;
using std::function;
using std::thread;
using std::mutex;
using std::unique_lock;
using std::lock_guard;
using std::condition_variable;
template<typename T>
class ThreadGroup{
public:
ThreadGroup(int number_of_threads, function<void(T&, int, int)> function)
: target_buffer(nullptr)
, worker_function(function)
, threads()
, threads_ready(0)
, state(IDLE_VALUE)
, state_mutex()
, synchroniser()
{
for(int i = 0; i < number_of_threads; ++i)
threads.push_back(thread(&ThreadGroup::worker, this, i));
}
~ThreadGroup(){
{ /* Signal to the worker threads that the show is over */
lock_guard<mutex> my_lock(state_mutex);
state.store(END_VALUE);
}
while(0 < threads.size()){
if(threads.back().joinable())
threads.back().join();
threads.pop_back();
}
}
void start_and_block(T& buffer){
{ /* initialize, start.. */
unique_lock<mutex> my_lock(state_mutex);
target_buffer = &buffer;
state.store(START_VALUE);
}
{ /* wait until the work is done */
unique_lock<mutex> my_lock(state_mutex);
if(threads.size() > threads_ready)synchroniser.wait(my_lock,[=](){
return (threads.size() <= threads_ready);
});
}
{ /* set appropriate state */
unique_lock<mutex> my_lock(state_mutex);
state.store(IDLE_VALUE);
}
synchroniser.notify_all(); /* Notify worker threads that the main thread is finished */
{ /* wait until all threads are notified */
unique_lock<mutex> my_lock(state_mutex);
if(0 < threads_ready)synchroniser.wait(my_lock,[=](){
return (0 >= threads_ready); /* All threads are notified once the @threads_ready variable is zero again */
});
}
}
private:
static const int IDLE_VALUE = 0;
static const int START_VALUE = 1;
static const int END_VALUE = 2;
T* target_buffer;
function<void(T&, int, int)> worker_function; /* buffer, start, length */
vector<thread> threads;
int threads_ready;
atomic<int> state;
mutex state_mutex;
condition_variable synchroniser;
void worker(int thread_index){
while(END_VALUE != state.load()){ /* Until the pool is stopped */
while(START_VALUE == state.load()){ /* Wait until start signal is provided */
worker_function(
(*target_buffer),
(thread_index * (target_buffer->size()/threads.size())),
(target_buffer->size()/threads.size())
);/* do the work */
{ /* signal that work is done! */
unique_lock<mutex> my_lock(state_mutex);
++threads_ready; /* increase "done counter" */
}
synchroniser.notify_all(); /* Notify main thread that this thread is finsished */
{ /* Wait until main thread is closing the iteration */
unique_lock<mutex> my_lock(state_mutex);
if(START_VALUE == state.load())synchroniser.wait(my_lock,[=](){
return (START_VALUE != state.load());
});
}
{ /* signal that this thread is notified! */
unique_lock<mutex> my_lock(state_mutex);
--threads_ready; /* decrease the "done counter" to do so */
}
synchroniser.notify_all(); /* Notify main thread that this thread is finsished */
} /*while(START_VALUE == state)*/
} /*while(END_VALUE != state)*/
}
};
int main(int argc, char** agrs){
int result = 0;
mutex cout_mutex;
ThreadGroup<vector<double>> pool(5,[&](vector<double>& buffer, int start, int length){
double sum = 0;
for(int i = 0; i < length; ++i){
sum += buffer[i];
}
lock_guard<mutex> my_lock(cout_mutex);
std::cout << "Partial sum: " << std::setw(4) << sum << " \t\t |" << "\r";
result += sum;
//std::this_thread::sleep_for(std::chrono::milliseconds(200)); //to test with some payload
});
result = 0;
for(int i = 0; i< 10000; ++i){
vector<double> test_buffer(500, rand()%10);
result = 0;
pool.start_and_block(test_buffer);
{
lock_guard<mutex> my_lock(cout_mutex);
std::cout << "result["<< i << "]: " << std::setw(4) << result << "\t\t " << std::endl;
}
assert(std::accumulate(test_buffer.begin(),test_buffer.end(), 0) == result);
}
std::cout << "All assertions passed! "<< std::endl;
return 0;
}发布于 2021-06-15 21:35:15
启用编译器警告并尝试修复所有这些警告。有几个未使用的参数。您可以通过不给参数命名来避免这些警告。例如,您可以从start中的lambda中省略名称main():
[&](std::vector<double>& buffer, int, int length){...}对于main()本身的参数,您可以使用相同的技巧,也可以使用不接受参数的其他允许的main()形式:
int main() {剩下的警告是关于有符号整数和无符号整数之间的比较。使用std::size_t作为threads_ready的类型。
wait()如果要传递谓词,则不需要在对条件变量调用wait()之前检查条件。wait()将做的第一件事是执行谓词,看看它是否需要等待。
ThreadGroup的构造函数有一个大的初始化程序列表。其中一些是冗余的,有些可以被默认的初始化器替换。在初始化程序列表中,唯一需要做的事情通常是依赖于传递给构造函数的参数。所以:
class ThreadGroup {
public:
ThreadGroup(std::size_t number_of_threads, std::function<void(T&, int, int)> function)
: worker_function(function)
{
for(std::size_t i = 0; i < number_of_threads; ++i)
threads.emplace_back(&ThreadGroup::worker, this, i);
}
...
private:
T* target_buffer = nullptr;
std::function<void(T&, int, int)> worker_function;
std::vector<std::thread> threads;
std::size_t threads_ready = 0;
std::atomic<int> state = {IDLE_VALUE};
std::mutex state_mutex;
std::condition_variable synchroniser;
...
};请注意,我们必须在这里使用聚合初始化进行std::atomic<int>,否则将选择已删除的复制构造函数。
在构造函数中启动工作线程,但是state被初始化为IDLE_VALUE。这将导致worker()进入忙环,直到state更改。使用一个条件变量,这样工作线程就可以为工作到达wait()。
join()线程我不明白您为什么要检查线程是否可连接。一旦将它们添加到threads中,它们就始终处于可连接状态。因此,我会将析构函数改为:
~ThreadGroup() {
{
std::lock_guard<mutex> my_lock(state_mutex);
state.store(END_VALUE);
// signal a condition variable to ensure idle threads get woken up
}
for (auto &thread: threads)
thread.join();
}中避免using namespace std或using std::
虽然在源文件中使用using namespace std通常是安全的,更好的做法是像您做的那样将std::中的单个元素引入全局命名空间,只要您将class ThreadGroup的定义移到头文件中,就不应该再这样做了,因为这将导致源文件出现意外的行为,这些源文件不想使用该元素,而是执行#include头文件。输入std::并不是那么多额外的工作,而且作为一项额外的工作,当您拥有诸如function<...> function这样的变量时,您可以避免可能出现的混淆,在这种情况下,变量开始跟踪类型。
发布于 2021-06-16 00:07:40
尝试类似于enum state_t { Idle, Start, End };的方法,并使用state_t类型而不是普通的int来表示这种类型的值。
第一个答案已经详细介绍了构造函数的初始化器,我在上一篇文章的答复中已经指出了这一点。
threads.push_back(thread(&ThreadGroup::worker, this, i));
您正在构建一个临时std::thread实例,将其复制到vector中,然后销毁该临时实例。这是对emplace_back的完美使用,您可以为它提供构造函数参数,而不是临时构造的对象。(虽然我看到push_back有一个rvalue表单,而thread有一个move构造函数,所以工作量减少了,而不是简单地消除了。)
while(0 < threads.size()){
这在(while !threads.empty())中得到了更明确的表述。
unique_lock<mutex> my_lock(state_mutex);
在创建lock_guard或unique_lock时,我总是觉得必须指定确切的互斥类型是很烦人的,因为类型名称通常更冗长、更具体。现在,您可以使用CTAD,只需让模板自动确定参数类型:unique_lock my_lock(state_mutex);
if(threads.size() > threads_ready)synchroniser.wait(my_lock,[=](){
return (threads.size() <= threads_ready);您是否知道您正在捕获this,而不是单个成员?我认为是这样的,因为您是通过值捕获的,这对于指针来说是有意义的,但对于向量本身则没有意义。列出捕获信息会更好(也更安全),而不是使用默认值。也就是说,简单地说是[this]。事实上,
默认捕获为
*this时,不建议使用=的隐式捕获。(自C++20以来)
https://codereview.stackexchange.com/questions/263059
复制相似问题