我通常使用boost::thread来运行线程。我需要线程容易被打断,所以我使用boost::this_thread::interruption_point()。代码如下所示:
void do_long_calculations()
{
for (...)
{
boost::this_thread::interruption_point();
do_some_work();
}
}
auto t = boost::thread(do_long_calculations);
...
t.interrupt();
t.join();现在我需要一个线程池,我尝试使用boost::asio::thread_pool。似乎没有一种标准的方式来中断这样的线程。那么,我如何中断池中所有正在运行的线程呢?在销毁线程池之前,我需要中断所有正在运行的作业。
我一个人能打断别人吗?类似的事情(或以任何其他方式):
boost::asio::thread_pool p;
std::set<boost::thread::id> thread_ids;
post(p, []()
{
thread_ids.insert(boost::this_thread::get_id());
do_long_calculations();
});
...
for (auto id : thread_ids)
SOME_INTERRUPTION_FUNCTION(id); // fire an event for boost::this_thread::interruption_point();
p.stop();这样做是行不通的:
boost::asio::thread_pool p;
std::set<boost::detail::thread_data_ptr> threads;
post(p, []()
{
if (auto d = boost::detail::get_current_thread_data())
threads.insert(d); // NEVER GOT HERE
do_long_calculations();
});
...
for (auto &d : threads)
d->interrupt();
p.stop();或者我可能需要使用与线程池兼容的其他中断检查,而不是boost::this_thread::interruption_point()调用?
发布于 2022-11-02 13:14:29
我认为在现实中,您不希望以这种方式控制线程。我认为当线程池被看作是一个“团队”,一种总是可用的计算资源,“随时准备”时,线程池就会发光。
但是,如果要中断任务(而不是线程),则可以。
不适用于实现细节(detail::thread_data_base*,或者它将与asio的线程池一起工作的想法;您不知道数据成员的锁定所需的或无文档化的语义;可能需要get_or_make_current_thread_data或make_external_thread_data等等)。
取而代之的是thread_pool的文档化的界面:附加。
看吧,住在Coliru
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <iomanip>
#include <iostream>
#include <list>
using boost::chrono::seconds;
using boost::this_thread::sleep_for; // interruptable
static auto const now = std::chrono::steady_clock::now;
static auto const start = now();
auto trace(auto const&... msg) {
using namespace std::chrono_literals;
static constexpr std::hash<std::thread::id> hash{};
static std::mutex mx;
std::lock_guard lk(mx);
std::cout << std::fixed << std::setprecision(3) << std::setfill(' ') << std::setw(8)
<< (now() - start) / 1.0s << "ms " //
<< std::hex << std::showbase << std::setw(2) << std::setfill('0')
<< hash(std::this_thread::get_id()) % 256 << std::dec << " ";
(std::cout << ... << msg) << std::endl;
}
void long_calculation() {
try {
trace("start long_calculation");
sleep_for(seconds(5));
trace("complete long_calculation");
} catch (boost::thread_interrupted) {
trace("interrupted long_calculation");
}
}
int main() {
trace("Start");
boost::asio::thread_pool tp(0);
std::list<boost::thread> threads;
for (int i = 0; i < 4; ++i)
threads.emplace_back([&] { tp.attach(); });
post(tp, long_calculation);
post(tp, long_calculation);
sleep_for(seconds(2));
trace("Interrupt");
for (auto& th : threads)
th.interrupt();
tp.join(); // in case any asio native threads
trace("Waiting...");
tp.stop();
for (auto& th : threads)
if (th.joinable())
th.join();
trace("Bye");
}打印
0.000ms 0xac Start
0.002ms 0x9b start long_calculation
0.002ms 0x8e start long_calculation
2.002ms 0xac Interrupt
2.002ms 0xac Waiting...
2.003ms 0x9b interrupted long_calculation
2.004ms 0x8e interrupted long_calculation
2.004ms 0xac Bye推荐
我仍然不相信线程中断--我只听说过一些不好的事情,它们会导致所有Boost thread内部设备的相当大的膨胀(参见例如:)。
此外,我看到一些非平凡的问题与控制关闭,即使使用attach方法刚才显示.
我想说的是,在代码中使用原子标志或某些weak_ptr可能允许您中断计算。如果您需要维护对中断点的支持,那么我建议您不要使用Asio的thread_pool,因为它在这一点上并没有增加任何价值:
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <iomanip>
#include <iostream>
using boost::chrono::seconds;
using boost::this_thread::sleep_for; // interruptable
static auto const now = std::chrono::steady_clock::now;
static auto const start = now();
auto trace(auto const&... msg) {
using namespace std::chrono_literals;
static constexpr std::hash<std::thread::id> hash{};
static std::mutex mx;
std::lock_guard lk(mx);
std::cout << std::fixed << std::setprecision(3) << std::setfill(' ') << std::setw(8)
<< (now() - start) / 1.0s << "ms " //
<< std::hex << std::showbase << std::setw(2) << std::setfill('0')
<< hash(std::this_thread::get_id()) % 256 << std::dec << " ";
(std::cout << ... << msg) << std::endl;
}
void long_calculation() {
try {
trace("start long_calculation");
sleep_for(seconds(5));
trace("complete long_calculation");
} catch (boost::thread_interrupted) {
trace("interrupted long_calculation");
}
}
struct my_pool : boost::asio::io_context {
my_pool(unsigned nthreads) {
while (nthreads--)
threads_.create_thread([this] { worker(); });
}
void join() {
work_.reset();
threads_.join_all();
}
void interrupt() {
threads_.interrupt_all();
}
~my_pool() {
join();
}
private:
void worker() {
// http://www.boost.org/doc/libs/1_61_0/doc/html/boost_asio/reference/io_service.html#boost_asio.reference.io_service.effect_of_exceptions_thrown_from_handlers
for (;;) {
try {
this->run();
break; // exited normally
} catch (std::exception const& e) {
trace("pool_worker exception: ", e.what());
} catch (...) {
trace("pool_worker exception: unhandled");
}
}
}
boost::thread_group threads_;
boost::asio::executor_work_guard<executor_type> work_{get_executor()};
};
int main() {
trace("Start");
my_pool tp(4);
for (int i = 0; i < 5; ++i)
post(tp, long_calculation);
sleep_for(seconds(2));
trace("Interrupt");
tp.interrupt();
trace("Waiting...");
tp.join();
trace("Bye");
}印刷品。
0.000ms 0x18 Start
0.000ms 0x88 start long_calculation
0.000ms 0x88 start long_calculation
0.000ms 0xbe start long_calculation
0.000ms 0x68 start long_calculation
2.000ms 0x18 Interrupt
2.000ms 0x18 Waiting...
2.001ms 0x68 interrupted long_calculation
2.001ms 0x68 start long_calculation
2.001ms 0x88 interrupted long_calculation
2.001ms 0xbe interrupted long_calculation
2.001ms 0x88 interrupted long_calculation
7.001ms 0x68 complete long_calculation
7.001ms 0x18 Bye注意,即使在未中断的任务挂起时,受控关闭也是如何正确工作的。
(不需要thread_group:http://coliru.stacked-crooked.com/a/261986d2975e1da4,就可以轻松地做到这一点)
https://stackoverflow.com/questions/74285737
复制相似问题