我已经创建了一个线程池,它将根据您需要的数量动态地创建线程。
#pragma once
#include <thread>
#include <atomic>
#include <mutex>
#include <vector>
#include <functional>
#include <condition_variable>
class thread_pool {
public:
thread_pool() : stop(false), busy_workers(0) { }
~thread_pool() {
stop = true;
task_available.notify_all();
for (auto& worker : workers) {
if (worker.joinable()) {
worker.join();
}
}
}
void run_task(std::function<void()> task) {
{
std::lock_guard<std::mutex> task_lock{ task_mu };
current_task = std::move(task);
}
{
std::lock_guard<std::mutex> workers_lock{ workers_mu };
if (workers.size() == busy_workers++) {
workers.emplace_back(work);
return;
}
}
task_available.notify_one();
}
private:
std::atomic_bool stop;
std::atomic_size_t busy_workers;
std::vector<std::thread> workers;
std::function<void()> current_task;
std::condition_variable task_available;
std::mutex task_mu;
std::mutex workers_mu;
std::function<void()> work = [&]() {
while (true) {
std::unique_lock<std::mutex> task_lock{ task_mu };
task_available.wait(task_lock, [&]() { return current_task || stop; });
if (!current_task && stop) return;
auto task = std::move(current_task);
task_lock.unlock();
task();
busy_workers--;
}
};
};唯一的问题是,即使我加入了析构函数中的所有线程,在完成所有任务之前,线程都将被销毁。
发布于 2018-01-08 20:16:01
你有几个比赛条件:
不能保证一名工人在这两次呼叫之间保持联系。
if (worker.joinable()) {
worker.join();这是一个多线程环境,您不能假设只从一个线程调用run_task。您必须假设可以从任意数量的线程调用它。
{
std::lock_guard<std::mutex> task_lock{ task_mu };
// There is no guarantee that `current_task` stays the same
// between here and when you create a new worker thread.
// As soon as the scope is exited another thread can enter
// and overwrite `current_task` before the first thread
// pushes `work` onto the thread queue or an existing worker
// picks it up
current_task = std::move(task);
}通常,线程池是用固定数量的线程创建的。作业被添加到队列中。当线程可用时,它会从队列中提取作业或等待条件变量,直到作业可用为止。
我不喜欢增加线程的设计。因为这可能会产生大量线程,当出现突然的高工作负载时,这些线程从未被释放过。线程是创建的昂贵资源。另外,创建许多线程并不意味着更多的并行性(它只是意味着更多的交换)。机器通常有一个可用并行性的上限,分配比这更多的线程,这通常是反作用的)。
https://codereview.stackexchange.com/questions/184538
复制相似问题