首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >中断Boost Asio线程池中的所有线程

中断Boost Asio线程池中的所有线程
EN

Stack Overflow用户
提问于 2022-11-02 07:34:13
回答 1查看 47关注 0票数 2

我通常使用boost::thread来运行线程。我需要线程容易被打断,所以我使用boost::this_thread::interruption_point()。代码如下所示:

代码语言:javascript
复制
void do_long_calculations()
{
  for (...)
  {
    boost::this_thread::interruption_point();
    do_some_work();
  }
}

auto t = boost::thread(do_long_calculations);
...
t.interrupt();
t.join();

现在我需要一个线程池,我尝试使用boost::asio::thread_pool。似乎没有一种标准的方式来中断这样的线程。那么,我如何中断池中所有正在运行的线程呢?在销毁线程池之前,我需要中断所有正在运行的作业。

我一个人能打断别人吗?类似的事情(或以任何其他方式):

代码语言:javascript
复制
boost::asio::thread_pool p;
std::set<boost::thread::id> thread_ids;
post(p, []()
{
  thread_ids.insert(boost::this_thread::get_id());
  do_long_calculations();
});
...
for (auto id : thread_ids)
  SOME_INTERRUPTION_FUNCTION(id); // fire an event for boost::this_thread::interruption_point();
p.stop();

这样做是行不通的:

代码语言:javascript
复制
boost::asio::thread_pool p;
std::set<boost::detail::thread_data_ptr> threads;
post(p, []()
{
  if (auto d = boost::detail::get_current_thread_data())
    threads.insert(d); // NEVER GOT HERE
  do_long_calculations();
});
...
for (auto &d : threads)
  d->interrupt();
p.stop();

或者我可能需要使用与线程池兼容的其他中断检查,而不是boost::this_thread::interruption_point()调用?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-11-02 13:14:29

我认为在现实中,您不希望以这种方式控制线程。我认为当线程池被看作是一个“团队”,一种总是可用的计算资源,“随时准备”时,线程池就会发光。

但是,如果要中断任务(而不是线程),则可以。

不适用于实现细节(detail::thread_data_base*,或者它将与asio的线程池一起工作的想法;您不知道数据成员的锁定所需的或无文档化的语义;可能需要get_or_make_current_thread_datamake_external_thread_data等等)。

取而代之的是thread_pool的文档化的界面:附加

看吧,住在Coliru

代码语言:javascript
复制
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <iomanip>
#include <iostream>
#include <list>
using boost::chrono::seconds;
using boost::this_thread::sleep_for; // interruptable

static auto const now   = std::chrono::steady_clock::now;
static auto const start = now();
auto trace(auto const&... msg) {
    using namespace std::chrono_literals;
    static constexpr std::hash<std::thread::id> hash{};
    static std::mutex                           mx;

    std::lock_guard lk(mx);
    std::cout << std::fixed << std::setprecision(3) << std::setfill(' ') << std::setw(8)
              << (now() - start) / 1.0s << "ms " //
              << std::hex << std::showbase << std::setw(2) << std::setfill('0')
              << hash(std::this_thread::get_id()) % 256 << std::dec << " ";
    (std::cout << ... << msg) << std::endl;
}

void long_calculation() {
    try {
        trace("start long_calculation");
        sleep_for(seconds(5));
        trace("complete long_calculation");
    } catch (boost::thread_interrupted) {
        trace("interrupted long_calculation");
    }
}

int main() {
    trace("Start");
    boost::asio::thread_pool tp(0);
    std::list<boost::thread> threads;

    for (int i = 0; i < 4; ++i)
        threads.emplace_back([&] { tp.attach(); });

    post(tp, long_calculation);
        post(tp, long_calculation);

    sleep_for(seconds(2));

    trace("Interrupt");
    for (auto& th : threads)
        th.interrupt();

    tp.join(); // in case any asio native threads
    trace("Waiting...");
    tp.stop();
    for (auto& th : threads)
        if (th.joinable())
            th.join();

    trace("Bye");
}

打印

代码语言:javascript
复制
   0.000ms 0xac Start
   0.002ms 0x9b start long_calculation
   0.002ms 0x8e start long_calculation
   2.002ms 0xac Interrupt
   2.002ms 0xac Waiting...
   2.003ms 0x9b interrupted long_calculation
   2.004ms 0x8e interrupted long_calculation
   2.004ms 0xac Bye

推荐

我仍然不相信线程中断--我只听说过一些不好的事情,它们会导致所有Boost thread内部设备的相当大的膨胀(参见例如:)。

此外,我看到一些非平凡的问题与控制关闭,即使使用attach方法刚才显示.

我想说的是,在代码中使用原子标志或某些weak_ptr可能允许您中断计算。如果您需要维护对中断点的支持,那么我建议您不要使用Asio的thread_pool,因为它在这一点上并没有增加任何价值:

住在Coliru

代码语言:javascript
复制
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <iomanip>
#include <iostream>
using boost::chrono::seconds;
using boost::this_thread::sleep_for; // interruptable

static auto const now   = std::chrono::steady_clock::now;
static auto const start = now();
auto trace(auto const&... msg) {
    using namespace std::chrono_literals;
    static constexpr std::hash<std::thread::id> hash{};
    static std::mutex                           mx;

    std::lock_guard lk(mx);
    std::cout << std::fixed << std::setprecision(3) << std::setfill(' ') << std::setw(8)
              << (now() - start) / 1.0s << "ms " //
              << std::hex << std::showbase << std::setw(2) << std::setfill('0')
              << hash(std::this_thread::get_id()) % 256 << std::dec << " ";
    (std::cout << ... << msg) << std::endl;
}

void long_calculation() {
    try {
        trace("start long_calculation");
        sleep_for(seconds(5));
        trace("complete long_calculation");
    } catch (boost::thread_interrupted) {
        trace("interrupted long_calculation");
    }
}

struct my_pool : boost::asio::io_context {
    my_pool(unsigned nthreads) {
        while (nthreads--)
            threads_.create_thread([this] { worker(); });
    }

    void join() {
        work_.reset();
        threads_.join_all();
    }

    void interrupt() {
        threads_.interrupt_all();
    }

    ~my_pool() {
        join();
    }

  private:
    void worker() {
        // http://www.boost.org/doc/libs/1_61_0/doc/html/boost_asio/reference/io_service.html#boost_asio.reference.io_service.effect_of_exceptions_thrown_from_handlers
        for (;;) {
            try {
                this->run();
                break; // exited normally
            } catch (std::exception const& e) {
                trace("pool_worker exception: ", e.what());
            } catch (...) {
                trace("pool_worker exception: unhandled");
            }
        }
    }

    boost::thread_group                             threads_;
    boost::asio::executor_work_guard<executor_type> work_{get_executor()};
};

int main() {
    trace("Start");
    my_pool tp(4);

    for (int i = 0; i < 5; ++i)
        post(tp, long_calculation);

    sleep_for(seconds(2));

    trace("Interrupt");
    tp.interrupt();

    trace("Waiting...");
    tp.join();

    trace("Bye");
}

印刷品。

代码语言:javascript
复制
   0.000ms 0x18 Start
   0.000ms 0x88 start long_calculation
   0.000ms 0x88 start long_calculation
   0.000ms 0xbe start long_calculation
   0.000ms 0x68 start long_calculation
   2.000ms 0x18 Interrupt
   2.000ms 0x18 Waiting...
   2.001ms 0x68 interrupted long_calculation
   2.001ms 0x68 start long_calculation
   2.001ms 0x88 interrupted long_calculation
   2.001ms 0xbe interrupted long_calculation
   2.001ms 0x88 interrupted long_calculation
   7.001ms 0x68 complete long_calculation
   7.001ms 0x18 Bye

注意,即使在未中断的任务挂起时,受控关闭也是如何正确工作的。

(不需要thread_grouphttp://coliru.stacked-crooked.com/a/261986d2975e1da4,就可以轻松地做到这一点)

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/74285737

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档