首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >boost::asio、线程池和线程监控

boost::asio、线程池和线程监控
EN

Stack Overflow用户
提问于 2012-08-29 03:49:33
回答 2查看 4.3K关注 0票数 7

我已经使用boost::asio实现了一个线程池,一些boost::thread对象调用了boost::asio::io_service::run()。然而,我得到的一个要求是有一种方法来监控所有线程的“健康”。我的目的是创建一个简单的前哨对象,它可以通过线程池传递--如果它成功通过,那么我们可以假定线程仍在处理工作。

然而,考虑到我的实现,我不确定如何(如果)我可以可靠地监视池中的所有线程。我只是将线程函数委托给了boost::asio::io_service::run(),因此将一个标记对象发布到io_service实例中并不能保证哪个线程将实际获得该标记并完成工作。

一种选择可能是周期性地插入前哨,并希望每个线程在一段合理的时间内至少获得它一次,但这显然不是理想的。

请看下面的例子。由于处理程序的编码方式,在这种情况下,我们可以看到每个线程都会做相同数量的工作,但实际上我无法控制处理程序的实现,其中一些可能是长期运行的,而另一些几乎是即时的。

代码语言:javascript
复制
#include <iostream>
#include <boost/asio.hpp>
#include <vector>
#include <boost/thread.hpp>
#include <boost/bind.hpp>

void handler()
{
   std::cout << boost::this_thread::get_id() << "\n";
   boost::this_thread::sleep(boost::posix_time::milliseconds(100));
}

int main(int argc, char **argv)
{
   boost::asio::io_service svc(3);

   std::unique_ptr<boost::asio::io_service::work> work(new boost::asio::io_service::work(svc));

   boost::thread one(boost::bind(&boost::asio::io_service::run, &svc));
   boost::thread two(boost::bind(&boost::asio::io_service::run, &svc));
   boost::thread three(boost::bind(&boost::asio::io_service::run, &svc));

   svc.post(handler);
   svc.post(handler);
   svc.post(handler);
   svc.post(handler);
   svc.post(handler);
   svc.post(handler);
   svc.post(handler);
   svc.post(handler);
   svc.post(handler);
   svc.post(handler);

   work.reset();

   three.join();
   two.join();
   one.join();

   return 0;
}
EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2012-09-01 04:05:34

我使用的解决方案依赖于我拥有踏板池对象的实现这一事实。我创建了一个包装器类型,它将更新统计信息,并复制发布到线程池的用户定义的处理程序。只有这种包装器类型才会发送到底层io_service。此方法允许我跟踪已发布/执行的处理程序,而不必干扰用户代码。

下面是一个精简和简化的示例:

代码语言:javascript
复制
#include <iostream>
#include <memory>
#include <vector>
#include <boost/thread.hpp>
#include <boost/asio.hpp>

// Supports scheduling anonymous jobs that are
// executable as returning nothing and taking
// no arguments
typedef std::function<void(void)> functor_type;

// some way to store per-thread statistics
typedef std::map<boost::thread::id, int> thread_jobcount_map;

// only this type is actually posted to
// the asio proactor, this delegates to
// the user functor in operator()
struct handler_wrapper
{
   handler_wrapper(const functor_type& user_functor, thread_jobcount_map& statistics)
      : user_functor_(user_functor)
      , statistics_(statistics)
   {
   }

   void operator()()
   {
      user_functor_();

      // just for illustration purposes, assume a long running job
      boost::this_thread::sleep(boost::posix_time::milliseconds(100));

      // increment executed jobs
      ++statistics_[boost::this_thread::get_id()];
   }

   functor_type         user_functor_;
   thread_jobcount_map& statistics_;
};

// anonymous thread function, just runs the proactor
void thread_func(boost::asio::io_service& proactor)
{
   proactor.run();
}

class ThreadPool
{
public:
   ThreadPool(size_t thread_count)
   {
      threads_.reserve(thread_count);

      work_.reset(new boost::asio::io_service::work(proactor_));

      for(size_t curr = 0; curr < thread_count; ++curr)
      {
         boost::thread th(thread_func, boost::ref(proactor_));

         // inserting into this map before any work can be scheduled
         // on it, means that we don't have to look it for lookups
         // since we don't dynamically add threads
         thread_jobcount_.insert(std::make_pair(th.get_id(), 0));

         threads_.emplace_back(std::move(th));
      }
   }

   // the only way for a user to get work into 
   // the pool is to use this function, which ensures
   // that the handler_wrapper type is used
   void schedule(const functor_type& user_functor)
   {
      handler_wrapper to_execute(user_functor, thread_jobcount_);
      proactor_.post(to_execute);
   }

   void join()
   {
      // join all threads in pool:
      work_.reset();
      proactor_.stop();

      std::for_each(
         threads_.begin(),
         threads_.end(),
         [] (boost::thread& t)
      {
         t.join();
      });
   }

   // just an example showing statistics
   void log()
   {
      std::for_each(
         thread_jobcount_.begin(),
         thread_jobcount_.end(),
         [] (const thread_jobcount_map::value_type& it)
      {
         std::cout << "Thread: " << it.first << " executed " << it.second << " jobs\n";
      });
   }

private:
   std::vector<boost::thread> threads_;
   std::unique_ptr<boost::asio::io_service::work> work_;
   boost::asio::io_service    proactor_;
   thread_jobcount_map        thread_jobcount_;
};

struct add
{
   add(int lhs, int rhs, int* result)
      : lhs_(lhs)
      , rhs_(rhs)
      , result_(result)
   {
   }

   void operator()()
   {
      *result_ = lhs_ + rhs_;
   }

   int lhs_,rhs_;
   int* result_;
};

int main(int argc, char **argv)
{
   // some "state objects" that are 
   // manipulated by the user functors
   int x = 0, y = 0, z = 0;

   // pool of three threads
   ThreadPool pool(3);

   // schedule some handlers to do some work
   pool.schedule(add(5, 4, &x));
   pool.schedule(add(2, 2, &y));
   pool.schedule(add(7, 8, &z));

   // give all the handlers time to execute
   boost::this_thread::sleep(boost::posix_time::milliseconds(1000));

   std::cout
      << "x = " << x << "\n"
      << "y = " << y << "\n"
      << "z = " << z << "\n";

   pool.join();

   pool.log();
}

输出:

代码语言:javascript
复制
x = 9
y = 4
z = 15
Thread: 0000000000B25430 executed 1 jobs
Thread: 0000000000B274F0 executed 1 jobs
Thread: 0000000000B27990 executed 1 jobs
票数 2
EN

Stack Overflow用户

发布于 2012-08-29 04:19:56

您可以在所有线程之间使用一个公共io_service实例,并为每个线程使用一个私有io_service实例。每个线程都会执行如下所示的方法:

代码语言:javascript
复制
void Mythread::threadLoop()
{
    while(/* termination condition */)
    {
        commonIoService.run_one();
        privateIoService.run_one();

        commonConditionVariable.timed_wait(time);
    }
}

通过这种方式,如果您希望确保某个任务在线程中执行,则只需在其拥有的io_service中发布此任务。

要在线程池中发布任务,您可以执行以下操作:

代码语言:javascript
复制
void MyThreadPool::post(Hander handler)
{
    commonIoService.post(handler);
    commonConditionVariable.notify_all();
}
票数 7
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/12166513

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档