首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >C++ ThreadGroup实现

C++ ThreadGroup实现
EN

Code Review用户
提问于 2021-06-15 06:55:01
回答 2查看 230关注 0票数 3

编辑:基于反馈的改进代码可用这里

作为前一个问题的续集,这里有一个改进版本(有明确的命名)。想法是一样的:一个整数threads_ready被增加到threads.size(),直到所有线程都完成了有效负载,然后当所有线程准备好再次执行时,返回到0。

这个版本没有繁忙的等待,它更通用,因为模板。有没有可能进行更多的优化?我认为使用原子作为状态是过分的,因为它大多是在互斥项下修改的,但是如果不使用atomic,程序就无法工作。

我尝试过使用可变模板参数重新实现此操作,但失败了。我认为这主要是因为运行的论点是通过指针提供的。在这里使用lambda会是一个更好的解决方案吗?(主要涉及可移植性,但也涉及性能)

代码语言:javascript
复制
#include <iostream>
#include <functional>
#include <vector>
#include <thread>
#include <mutex>
#include <iomanip>
#include <cassert>
#include <numeric>
#include <atomic>
#include <condition_variable>
#include <chrono>

using std::atomic;
using std::vector;
using std::function;
using std::thread;
using std::mutex;
using std::unique_lock;
using std::lock_guard;
using std::condition_variable;

template<typename T>
class ThreadGroup{
public:
    ThreadGroup(int number_of_threads, function<void(T&, int, int)> function)
    :  target_buffer(nullptr)
    ,  worker_function(function)
    ,  threads()
    ,  threads_ready(0)
    ,  state(IDLE_VALUE)
    ,  state_mutex()
    ,  synchroniser()
    {
      for(int i = 0; i < number_of_threads; ++i)
        threads.push_back(thread(&ThreadGroup::worker, this, i));
    }

    ~ThreadGroup(){
      { /* Signal to the worker threads that the show is over */
        lock_guard<mutex> my_lock(state_mutex);
        state.store(END_VALUE);
      }
      while(0 < threads.size()){
        if(threads.back().joinable())
          threads.back().join();
        threads.pop_back();
      }
    }

    void start_and_block(T& buffer){
      { /* initialize, start.. */
        unique_lock<mutex> my_lock(state_mutex);
        target_buffer = &buffer;
        state.store(START_VALUE);
      }

      { /* wait until the work is done */
        unique_lock<mutex> my_lock(state_mutex);
        if(threads.size() > threads_ready)synchroniser.wait(my_lock,[=](){
          return (threads.size() <= threads_ready);
        });
      }
      { /* set appropriate state */
        unique_lock<mutex> my_lock(state_mutex);
        state.store(IDLE_VALUE);
      }
      synchroniser.notify_all(); /* Notify worker threads that the main thread is finished */

      { /* wait until all threads are notified */
        unique_lock<mutex> my_lock(state_mutex);
        if(0 < threads_ready)synchroniser.wait(my_lock,[=](){
          return (0 >= threads_ready); /* All threads are notified once the @threads_ready variable is zero again */
        });
      }
    }

private:
    static const int IDLE_VALUE = 0;
    static const int START_VALUE = 1;
    static const int END_VALUE = 2;

    T* target_buffer;
    function<void(T&, int, int)> worker_function; /* buffer, start, length */
    vector<thread> threads;
    int threads_ready;
    atomic<int> state;
    mutex state_mutex;
    condition_variable synchroniser;

    void worker(int thread_index){
      while(END_VALUE != state.load()){ /* Until the pool is stopped */
        while(START_VALUE == state.load()){ /* Wait until start signal is provided */
          worker_function(
            (*target_buffer),
            (thread_index * (target_buffer->size()/threads.size())),
            (target_buffer->size()/threads.size())
          );/* do the work */

          { /* signal that work is done! */
            unique_lock<mutex> my_lock(state_mutex);
            ++threads_ready; /* increase "done counter" */
          }
          synchroniser.notify_all(); /* Notify main thread that this thread  is finsished */

          { /* Wait until main thread is closing the iteration */
            unique_lock<mutex> my_lock(state_mutex);
            if(START_VALUE == state.load())synchroniser.wait(my_lock,[=](){
              return (START_VALUE != state.load());
            });
          }

          { /* signal that this thread is notified! */
            unique_lock<mutex> my_lock(state_mutex);
            --threads_ready; /* decrease the "done counter" to do so */
          }
          synchroniser.notify_all(); /* Notify main thread that this thread  is finsished */
        } /*while(START_VALUE == state)*/
      } /*while(END_VALUE != state)*/
    }
};

int main(int argc, char** agrs){
  int result = 0;
  mutex cout_mutex;

  ThreadGroup<vector<double>> pool(5,[&](vector<double>& buffer, int start, int length){
    double sum = 0;
    for(int i = 0; i < length; ++i){
      sum += buffer[i];
    }
    lock_guard<mutex> my_lock(cout_mutex);
    std::cout << "Partial sum: " << std::setw(4) << sum << " \t\t    |" << "\r";
    result += sum;
    //std::this_thread::sleep_for(std::chrono::milliseconds(200)); //to test with some payload
  });

  result = 0;
  for(int i = 0; i< 10000; ++i){
    vector<double> test_buffer(500, rand()%10);
    result = 0;
    pool.start_and_block(test_buffer);
    {
      lock_guard<mutex> my_lock(cout_mutex);
      std::cout << "result["<< i << "]: " << std::setw(4) << result << "\t\t    " << std::endl;
    }
    assert(std::accumulate(test_buffer.begin(),test_buffer.end(), 0) == result);
  }
  std::cout << "All assertions passed!   "<< std::endl;
  return 0;
}
EN

回答 2

Code Review用户

回答已采纳

发布于 2021-06-15 21:35:15

修复编译器警告

启用编译器警告并尝试修复所有这些警告。有几个未使用的参数。您可以通过不给参数命名来避免这些警告。例如,您可以从start中的lambda中省略名称main()

代码语言:javascript
复制
[&](std::vector<double>& buffer, int, int length){...}

对于main()本身的参数,您可以使用相同的技巧,也可以使用不接受参数的其他允许的main()形式:

代码语言:javascript
复制
int main() {

剩下的警告是关于有符号整数和无符号整数之间的比较。使用std::size_t作为threads_ready的类型。

无条件调用wait()

如果要传递谓词,则不需要在对条件变量调用wait()之前检查条件。wait()将做的第一件事是执行谓词,看看它是否需要等待。

更喜欢默认的初始化器而不是初始化程序列表

ThreadGroup的构造函数有一个大的初始化程序列表。其中一些是冗余的,有些可以被默认的初始化器替换。在初始化程序列表中,唯一需要做的事情通常是依赖于传递给构造函数的参数。所以:

代码语言:javascript
复制
class ThreadGroup {
public:
    ThreadGroup(std::size_t number_of_threads, std::function<void(T&, int, int)> function)
    : worker_function(function)
    {
        for(std::size_t i = 0; i < number_of_threads; ++i)
            threads.emplace_back(&ThreadGroup::worker, this, i);
    }

    ...

private:
    T* target_buffer = nullptr;
    std::function<void(T&, int, int)> worker_function;
    std::vector<std::thread> threads;
    std::size_t threads_ready = 0;
    std::atomic<int> state = {IDLE_VALUE};
    std::mutex state_mutex;
    std::condition_variable synchroniser;
    ...
};

请注意,我们必须在这里使用聚合初始化进行std::atomic<int>,否则将选择已删除的复制构造函数。

不要忙-循环

在构造函数中启动工作线程,但是state被初始化为IDLE_VALUE。这将导致worker()进入忙环,直到state更改。使用一个条件变量,这样工作线程就可以为工作到达wait()

无条件join()线程

我不明白您为什么要检查线程是否可连接。一旦将它们添加到threads中,它们就始终处于可连接状态。因此,我会将析构函数改为:

代码语言:javascript
复制
~ThreadGroup() {
    {
        std::lock_guard<mutex> my_lock(state_mutex);
        state.store(END_VALUE);
        // signal a condition variable to ensure idle threads get woken up
    }

    for (auto &thread: threads)
        thread.join();
}

在headers

中避免using namespace stdusing std::

虽然在源文件中使用using namespace std通常是安全的,更好的做法是像您做的那样将std::中的单个元素引入全局命名空间,只要您将class ThreadGroup的定义移到头文件中,就不应该再这样做了,因为这将导致源文件出现意外的行为,这些源文件不想使用该元素,而是执行#include头文件。输入std::并不是那么多额外的工作,而且作为一项额外的工作,当您拥有诸如function<...> function这样的变量时,您可以避免可能出现的混淆,在这种情况下,变量开始跟踪类型。

票数 5
EN

Code Review用户

发布于 2021-06-16 00:07:40

尝试类似于enum state_t { Idle, Start, End };的方法,并使用state_t类型而不是普通的int来表示这种类型的值。

第一个答案已经详细介绍了构造函数的初始化器,我在上一篇文章的答复中已经指出了这一点。

threads.push_back(thread(&ThreadGroup::worker, this, i));

您正在构建一个临时std::thread实例,将其复制到vector中,然后销毁该临时实例。这是对emplace_back的完美使用,您可以为它提供构造函数参数,而不是临时构造的对象。(虽然我看到push_back有一个rvalue表单,而thread有一个move构造函数,所以工作量减少了,而不是简单地消除了。)

while(0 < threads.size()){

这在(while !threads.empty())中得到了更明确的表述。

unique_lock<mutex> my_lock(state_mutex);

在创建lock_guardunique_lock时,我总是觉得必须指定确切的互斥类型是很烦人的,因为类型名称通常更冗长、更具体。现在,您可以使用CTAD,只需让模板自动确定参数类型:unique_lock my_lock(state_mutex);

代码语言:javascript
复制
if(threads.size() > threads_ready)synchroniser.wait(my_lock,[=](){
          return (threads.size() <= threads_ready);

您是否知道您正在捕获this,而不是单个成员?我认为是这样的,因为您是通过值捕获的,这对于指针来说是有意义的,但对于向量本身则没有意义。列出捕获信息会更好(也更安全),而不是使用默认值。也就是说,简单地说是[this]事实上

默认捕获为*this时,不建议使用=的隐式捕获。(自C++20以来)

票数 2
EN
页面原文内容由Code Review提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://codereview.stackexchange.com/questions/263059

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档