首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >C++改进的ThreadGroup实现

C++改进的ThreadGroup实现
EN

Code Review用户
提问于 2021-06-16 08:17:33
回答 1查看 101关注 0票数 1

这些 问题的惊人反馈之后,我已经准备了第三个版本的原始发布代码。

想法是一样的:std::size_t变量threads_ready被增加到threads.size(),直到所有线程都完成了有效负载,然后当所有线程准备好再次执行时,返回到0。

  • 我消除了所有我能找到的忙碌的等待
  • 通过使类使用各种模板,使类更具一般性。
    • 最好的解决方案是让lambda捕获所需的上下文,但不幸的是,我找不到一种方法可以轻松地从泛型lambda中阻止线程相关的参数。我所能管理的最好的方法是将使用的参数减少到“线程索引”,这样每个线程都会对输入中的相关区域有一个了解。

  • 为了更好地测试实现,我做了一个更好的用例。
代码语言:javascript
复制
#include <iostream>
#include <functional>
#include <tuple>
#include <vector>
#include <thread>
#include <mutex>
#include <iomanip>
#include <numeric>
#include <atomic>
#include <condition_variable>

#include <algorithm>
#include <cassert>
#include <chrono>
#include <cmath>

using std::atomic;
using std::vector;
using std::function;
using std::tuple;
using std::thread;
using std::mutex;
using std::unique_lock;
using std::lock_guard;
using std::condition_variable;
using std::size_t;

template<typename First, typename ...T>
class ThreadGroup{
public:
    ThreadGroup(int number_of_threads, function<void(tuple<First, T...>&, int)> function)
    :  worker_function(function)
    ,  state(Idle)
    {
      for(int i = 0; i < number_of_threads; ++i)
        threads.emplace_back(thread(&ThreadGroup::worker, this, i));
    }

    ~ThreadGroup(){
      { /* Signal to the worker threads that the show is over */
        lock_guard<mutex> my_lock(state_mutex);
        state.store(End);
      }
      synchroniser.notify_all();
      for(thread& thread : threads) thread.join();
    }

    void start_and_block(tuple<First, T...>& buffer){
      { /* initialize, start.. */
        unique_lock<mutex> my_lock(state_mutex);
        target_buffers = &buffer;
        state.store(Start);
      }
      synchroniser.notify_all(); /* Whip the peons */

      { /* wait until the work is done */
        unique_lock<mutex> my_lock(state_mutex);
        synchroniser.wait(my_lock,[this](){
          return (threads.size() <= threads_ready);
        });
      }
      { /* set appropriate state */
        unique_lock<mutex> my_lock(state_mutex);
        state.store(Idle);
      }
      synchroniser.notify_all(); /* Notify worker threads that the main thread is finished */

      { /* wait until all threads are notified */
        unique_lock<mutex> my_lock(state_mutex);
        synchroniser.wait(my_lock,[this](){
          return (0 >= threads_ready); /* All threads are notified once the @threads_ready variable is zero again */
        });
      }
    }

private:
    enum state_t{Idle, Start, End};

    tuple<First, T...>* target_buffers = nullptr;
    function<void(tuple<First, T...>&, int)> worker_function; /* start, length */
    vector<thread> threads;
    size_t threads_ready = 0;
    atomic<state_t> state;
    mutex state_mutex;
    condition_variable synchroniser;

    void worker(int thread_index){
      while(End != state.load()){ /* Until the pool is stopped */
        { /* Wait until main thread triggers a task */
          unique_lock<mutex> my_lock(state_mutex);
          synchroniser.wait(my_lock,[this](){
            return (Idle != state.load());
          });
        }
        if(End != state.load()){
          worker_function((*target_buffers), thread_index);/* do the work */

          { /* signal that work is done! */
            unique_lock<mutex> my_lock(state_mutex);
            ++threads_ready; /* increase "done counter" */
          }
          synchroniser.notify_all(); /* Notify main thread that this thread  is finsished */

          { /* Wait until main thread is closing the iteration */
            unique_lock<mutex> my_lock(state_mutex);
            synchroniser.wait(my_lock,[this](){
              return (Start != state.load());
            });
          }

          { /* signal that this thread is notified! */
            unique_lock<mutex> my_lock(state_mutex);
            --threads_ready; /* decrease the "done counter" to do so */
          }
          synchroniser.notify_all(); /* Notify main thread that this thread  is finsished */
        } /* Avoid segfault at destruction */
      } /*while(END_VALUE != state)*/
    }
};

int main(int argc, char** agrs){
  const int number_of_threads = 5;
  vector<double> test_buffer;
  double expected;
  double result = 0;
  mutex cout_mutex;

  ThreadGroup<vector<double>&> pool(number_of_threads,[&](tuple<vector<double>&>& inputs, int thread_index){
    double sum = 0;
    vector<double>& used_buffer = std::get<vector<double>&>(inputs);
    size_t length = (used_buffer.size() / number_of_threads) + 1u;
    size_t start = length * thread_index;
    length = std::min(length, (used_buffer.size() - start));
    if(start < used_buffer.size()) /* More threads could be available, than needed */
      for(size_t i = 0; i < length; ++i) sum += used_buffer[start + i];
    //std::this_thread::sleep_for(std::chrono::milliseconds(200)); //to test with some payload
    { /* Print partial results and accumulate the full results */
      lock_guard<mutex> my_lock(cout_mutex);
      std::cout << "Partial sum[" << thread_index << "]: " << std::setw(4) << sum << " \t\t    \r";
      result += sum;
    }
  });

  for(int i = 0; i< 1000; ++i){
    test_buffer = vector<double>(rand()%500);
    std::for_each(test_buffer.begin(),test_buffer.end(),[](double& element){
      element = rand()%10;
    });
    expected = std::accumulate(test_buffer.begin(),test_buffer.end(), 0.0);
    result = 0;
    auto tpl = std::forward_as_tuple(test_buffer);
    pool.start_and_block(tpl);
    std::cout << "result["<< i << "]: " << std::setw(4) << result << "\t\t    \r";
    assert(expected == result);
  }
  std::cout << "All assertions passed!   "<< std::endl;
  return 0;
}

还有什么可以通过此实现进行优化/改进的吗?

EN

回答 1

Code Review用户

回答已采纳

发布于 2021-06-23 17:08:26

羔羊不是低效的。您需要以某种方式打包这些值,lambda捕获工作的方式只是构造器参数。也就是说,它将所需的值复制到存储位置,这正是传递参数时所需的工作(将其存储到被调用函数中的局部变量中)。您的tuple将值打包到元组中,这在道义上与包含未命名成员的结构相同;同样,假设没有额外的副本,这又是相同的工作量。

最简单的解决方案是声明您的function只接受线程ID,并传入一个lambda,它捕获执行实际函数所需的任何内容。

我们希望(无论如何,在发行版构建中)构造器参数得到优化。但是,如果要捕获类似vectorstring之类的内容,则需要使用move语义。为此,构造函数参数应该是“接收器”参数。也就是说,声明它的值取值,并在成员初始化中使用std::move。在任何情况下,您都可以在编译器资源管理器( Compiler )中四处游玩,以确保额外的副本实际上已被优化掉。在实践中,复制几个字节与启动线程或同步任何东西的成本相比都不算什么!只要您不深入复制一些昂贵的东西,比如vector,它就不会效率低下。

澄清:

ThreadGroup x { 5, [=](int id){ } };将直接在构造函数的function参数中初始化lambda的主体(捕获)。

然后,构造函数中的: worker_function{std::move(function)}将初始化类成员。这种额外的复制是我们希望优化的:将捕获直接存储到std::function内部ThreadGroup中的lambda内的最终休息位置。使用move语义可以确保即使副本没有被完全消除,它也不会做昂贵的深拷贝。

票数 1
EN
页面原文内容由Code Review提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://codereview.stackexchange.com/questions/263098

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档