首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >【Linux系统编程】(四十六)线程池原理与实现:从固定线程池到线程安全单例模式

【Linux系统编程】(四十六)线程池原理与实现:从固定线程池到线程安全单例模式

作者头像
_OP_CHEN
发布2026-03-12 08:19:09
发布2026-03-12 08:19:09
420
举报
文章被收录于专栏:C++C++

目录

前言

一、线程池核心原理:为什么需要线程池?

1.1 线程的 “创建 - 销毁” 开销有多高?

1.2 线程池的核心思想:“线程复用”

1.3 线程池的核心组件

1.4 线程池的工作流程

二、从零实现固定线程池:核心代码拆解

2.1 前置依赖:已封装的工具类

2.1.1 互斥量封装(Lock.hpp)

2.1.2 条件变量封装(Cond.hpp)

2.1.3 日志类简化版(Log.hpp)

2.2 线程池核心实现(ThreadPool.hpp)

2.3 核心代码关键解析

2.3.1 线程函数的设计:静态成员函数

2.3.2 任务处理主循环:HandlerTask

2.3.3 任务提交:Enqueue接口

2.3.4 线程池停止:Stop与Wait

三、线程池测试:验证功能正确性

3.1 测试代码(main.cpp)

3.2 编译与运行

3.3 预期运行结果

四、升级线程池:线程安全的单例模式

4.1 单例模式核心原理

4.2 线程安全的单例模式实现:双重检查锁定

4.3 单例式线程池实现(ThreadPool_Singleton.hpp)

4.4 单例线程池关键改进

4.5 单例线程池测试

五、线程池的进阶优化方向

总结


前言

在高并发编程中,线程池绝对是 “性能优化神器”—— 它能避免频繁创建销毁线程的开销,合理利用系统资源,还能快速响应任务请求。但你知道线程池的核心原理是什么吗?如何从零实现一个稳定的固定线程池?又如何将线程池设计为线程安全的单例模式,确保全局唯一实例? 今天这篇文章,就带大家从原理到实战,一步步吃透线程池的设计与实现。我们先拆解线程池的核心组件和工作流程,再动手实现一个支持任务队列、线程管理、条件变量同步的固定线程池,最后升级为线程安全的单例模式,全程干货拉满,建议收藏跟着敲代码!下面就让我们正式开始吧!


一、线程池核心原理:为什么需要线程池?

在讲实现之前,我们先搞懂线程池的 “底层逻辑”—— 为什么高并发场景下必须用线程池,而不是直接创建线程?

1.1 线程的 “创建 - 销毁” 开销有多高?

线程是操作系统的宝贵资源,创建一个线程需要:

  • 为线程分配栈空间(默认通常是 2MB);
  • 内核创建线程控制块(TCB),维护线程状态、优先级等信息;
  • 将线程加入操作系统的调度队列。

而销毁线程时,需要:

  • 回收栈空间和 TCB 资源;
  • 从调度队列中移除线程。

这些操作都需要内核参与,开销巨大。如果面对的是短时间、高频率的任务(比如 Web 服务器的 HTTP 请求),线程的创建销毁开销可能远超任务本身的执行时间,导致系统性能急剧下降。

1.2 线程池的核心思想:“线程复用”

线程池的本质是“线程复用 + 任务队列”

  1. 程序启动时,提前创建固定数量的线程,让它们处于休眠状态,等待任务到来;
  2. 当有新任务时,将任务加入队列,唤醒一个休眠的线程来执行任务;
  3. 任务执行完毕后,线程不销毁,而是回到休眠状态,等待下一个任务;
  4. 程序退出时,关闭线程池,等待所有线程执行完任务后再销毁。

这样一来,就避免了频繁创建销毁线程的开销,让线程资源得到重复利用,同时任务队列起到 “缓冲” 作用,平衡任务提交和线程处理的速度。

1.3 线程池的核心组件

一个完整的线程池,必须包含以下 4 个核心组件,缺一不可:

  1. 线程数组 / 容器:存储所有工作线程,管理线程的创建、启动、停止;
  2. 任务队列:存储待执行的任务,通常是阻塞队列(无任务时线程阻塞,有任务时唤醒);
  3. 同步机制:互斥量(保护任务队列的线程安全)+ 条件变量(实现线程的阻塞与唤醒);
  4. 控制变量:标记线程池的运行状态(是否正在运行)、等待任务的线程数等。

1.4 线程池的工作流程

用一张图就能看懂线程池的完整工作流程:

具体步骤:

  1. 线程池初始化时,创建 N 个工作线程,每个线程启动后进入循环,等待任务;
  2. 外部线程通过Enqueue接口提交任务到任务队列;
  3. 提交任务后,通过条件变量唤醒一个正在等待的工作线程;
  4. 工作线程被唤醒后,从任务队列中取出任务执行;
  5. 任务执行完毕,工作线程再次进入等待状态;
  6. 当调用Stop接口关闭线程池时,设置运行状态为 “停止”,唤醒所有工作线程,等待它们执行完剩余任务后退出。

二、从零实现固定线程池:核心代码拆解

了解了原理,我们就动手实现一个固定线程数的线程池(线程数初始化后固定,不支持动态扩容)。实现基于 C++11 及以上,使用 POSIX 线程库(pthread),结合之前封装的互斥量(Mutex)、条件变量(Cond)和日志系统(Log),保证线程安全和可调试性。

2.1 前置依赖:已封装的工具类

首先,我们需要之前实现的 3 个基础工具类(如果还没有实现,可以直接用下面的简化版本):

  1. 互斥量类(Lock.hpp):保证临界资源(任务队列)的线程安全;
  2. 条件变量类(Cond.hpp):实现线程的阻塞与唤醒;
  3. 日志类(Log.hpp):打印线程池运行日志,方便调试。
2.1.1 互斥量封装(Lock.hpp)
代码语言:javascript
复制
// Lock.hpp 互斥量与RAII锁守卫封装
#pragma once
#include <iostream>
#include <pthread.h>

namespace LockModule
{
    class Mutex
    {
    public:
        Mutex(const Mutex&) = delete;
        Mutex& operator=(const Mutex&) = delete;

        Mutex()
        {
            if (pthread_mutex_init(&_mutex, nullptr) != 0)
            {
                std::cerr << "Mutex init failed!" << std::endl;
                exit(EXIT_FAILURE);
            }
        }

        void Lock()
        {
            pthread_mutex_lock(&_mutex);
        }

        void Unlock()
        {
            pthread_mutex_unlock(&_mutex);
        }

        pthread_mutex_t* GetMutex()
        {
            return &_mutex;
        }

        ~Mutex()
        {
            pthread_mutex_destroy(&_mutex);
        }

    private:
        pthread_mutex_t _mutex;
    };

    // RAII锁守卫:自动加锁解锁
    class LockGuard
    {
    public:
        LockGuard(const LockGuard&) = delete;
        LockGuard& operator=(const LockGuard&) = delete;

        explicit LockGuard(Mutex& mutex) : _mutex(mutex)
        {
            _mutex.Lock();
        }

        ~LockGuard()
        {
            _mutex.Unlock();
        }

    private:
        Mutex& _mutex;
    };
}
2.1.2 条件变量封装(Cond.hpp)
代码语言:javascript
复制
// Cond.hpp 条件变量封装
#pragma once
#include <pthread.h>
#include "Lock.hpp"

namespace CondModule
{
    using namespace LockModule;

    class Cond
    {
    public:
        Cond(const Cond&) = delete;
        Cond& operator=(const Cond&) = delete;

        Cond()
        {
            if (pthread_cond_init(&_cond, nullptr) != 0)
            {
                std::cerr << "Cond init failed!" << std::endl;
                exit(EXIT_FAILURE);
            }
        }

        void Wait(Mutex& mutex)
        {
            pthread_cond_wait(&_cond, mutex.GetMutex());
        }

        void Notify()
        {
            pthread_cond_signal(&_cond);
        }

        void NotifyAll()
        {
            pthread_cond_broadcast(&_cond);
        }

        ~Cond()
        {
            pthread_cond_destroy(&_cond);
        }

    private:
        pthread_cond_t _cond;
    };
}
2.1.3 日志类简化版(Log.hpp)
代码语言:javascript
复制
// Log.hpp 简化版日志类
#pragma once
#include <iostream>
#include <string>
#include <ctime>
#include <sstream>
#include <unistd.h>

namespace LogModule
{
    enum class LogLevel
    {
        DEBUG,
        INFO,
        ERROR
    };

    std::string GetCurrentTime()
    {
        time_t now = time(nullptr);
        struct tm tm;
        localtime_r(&now, &tm);
        char buf[64];
        snprintf(buf, sizeof(buf), "%4d-%02d-%02d %02d:%02d:%02d",
                 tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday,
                 tm.tm_hour, tm.tm_min, tm.tm_sec);
        return buf;
    }

    std::string LevelToString(LogLevel level)
    {
        switch (level)
        {
            case LogLevel::DEBUG: return "DEBUG";
            case LogLevel::INFO: return "INFO";
            case LogLevel::ERROR: return "ERROR";
            default: return "UNKNOWN";
        }
    }

    #define LOG(level, msg) do { \
        std::cout << "[" << GetCurrentTime() << "] [" << LevelToString(level) << "] " \
                  << "[" << getpid() << "] " << msg << std::endl; \
    } while(0)
}

2.2 线程池核心实现(ThreadPool.hpp)

线程池的核心是ThreadPool类,模板设计支持任意类型的任务(通过std::function封装),核心接口包括:

  • 构造函数:初始化线程数和核心组件;
  • InitThreadPool:创建工作线程(不启动);
  • Start:启动所有工作线程;
  • Enqueue:提交任务到队列;
  • Stop:关闭线程池;
  • Wait:等待所有线程退出;
  • 内部函数HandlerTask:工作线程的任务处理逻辑。

完整代码如下:

代码语言:javascript
复制
// ThreadPool.hpp 固定线程池实现
#pragma once
#include <iostream>
#include <vector>
#include <queue>
#include <functional>
#include <pthread.h>
#include <unistd.h>
#include "Lock.hpp"
#include "Cond.hpp"
#include "Log.hpp"

using namespace LockModule;
using namespace CondModule;
using namespace LogModule;

// 任务类型:无参数无返回值的函数对象
using Task = std::function<void()>;

template <typename T = Task>
class ThreadPool
{
public:
    // 禁用拷贝和赋值(线程池不能被拷贝)
    ThreadPool(const ThreadPool&) = delete;
    ThreadPool& operator=(const ThreadPool&) = delete;

    // 构造函数:指定线程数,默认10个
    explicit ThreadPool(int thread_num = 10)
        : _thread_num(thread_num), _is_running(false), _wait_thread_num(0)
    {
        if (_thread_num <= 0)
        {
            LOG(LogLevel::ERROR, "Thread number must be positive!");
            exit(EXIT_FAILURE);
        }
        LOG(LogLevel::INFO, "ThreadPool constructed, thread num: " + std::to_string(_thread_num));
    }

    // 初始化线程池:创建工作线程(不启动)
    void InitThreadPool()
    {
        // 预留线程容器空间,避免扩容
        _threads.reserve(_thread_num);
        for (int i = 0; i < _thread_num; ++i)
        {
            // 创建线程,绑定任务处理函数HandlerTask
            pthread_t tid;
            if (pthread_create(&tid, nullptr, ThreadFunc, this) != 0)
            {
                LOG(LogLevel::ERROR, "Create thread " + std::to_string(i) + " failed!");
                exit(EXIT_FAILURE);
            }
            _threads.emplace_back(tid);
            LOG(LogLevel::INFO, "Init thread " + std::to_string(i) + " success, tid: " + std::to_string(tid));
        }
    }

    // 启动线程池:设置运行状态为true,所有线程开始工作
    void Start()
    {
        _is_running = true;
        LOG(LogLevel::INFO, "ThreadPool start running...");
    }

    // 提交任务到队列
    bool Enqueue(const T& task)
    {
        LockGuard lock(_mutex); // 加锁保护任务队列

        // 如果线程池已停止,不接受新任务
        if (!_is_running)
        {
            LOG(LogLevel::ERROR, "ThreadPool has stopped, cannot enqueue task!");
            return false;
        }

        // 将任务加入队列
        _task_queue.push(task);
        LOG(LogLevel::DEBUG, "Enqueue task success, queue size: " + std::to_string(_task_queue.size()));

        // 如果有等待的线程,唤醒一个
        if (_wait_thread_num > 0)
        {
            _cond.Notify();
        }

        return true;
    }

    // 停止线程池:等待所有任务执行完毕后退出
    void Stop()
    {
        LockGuard lock(_mutex);
        if (!_is_running)
        {
            LOG(LogLevel::INFO, "ThreadPool has already stopped!");
            return;
        }

        // 设置运行状态为false,唤醒所有等待的线程
        _is_running = false;
        _cond.NotifyAll();
        LOG(LogLevel::INFO, "ThreadPool stopping, wake up all threads...");
    }

    // 等待所有线程退出
    void Wait()
    {
        for (pthread_t tid : _threads)
        {
            pthread_join(tid, nullptr);
            LOG(LogLevel::INFO, "Thread " + std::to_string(tid) + " exited!");
        }
        _threads.clear();
        LOG(LogLevel::INFO, "All threads exited, thread pool stopped!");
    }

    // 析构函数
    ~ThreadPool()
    {
        if (_is_running)
        {
            Stop();
            Wait();
        }
        LOG(LogLevel::INFO, "ThreadPool destructed!");
    }

private:
    // 线程函数:静态成员函数,通过this指针访问成员变量
    static void* ThreadFunc(void* arg)
    {
        ThreadPool* pool = static_cast<ThreadPool*>(arg);
        pool->HandlerTask(); // 调用成员函数处理任务
        return nullptr;
    }

    // 任务处理逻辑:工作线程的主循环
    void HandlerTask()
    {
        pthread_t tid = pthread_self();
        LOG(LogLevel::INFO, "Thread " + std::to_string(tid) + " started, waiting for tasks...");

        while (true)
        {
            LockGuard lock(_mutex); // 加锁

            // 1. 如果线程池运行中且任务队列为空,线程进入等待状态
            while (_is_running && _task_queue.empty())
            {
                _wait_thread_num++;
                LOG(LogLevel::DEBUG, "Thread " + std::to_string(tid) + " no tasks, wait...");
                _cond.Wait(_mutex); // 解锁并等待,被唤醒后重新加锁
                _wait_thread_num--;
            }

            // 2. 检查线程池是否已停止
            // 如果线程池停止且任务队列为空,线程退出
            if (!_is_running && _task_queue.empty())
            {
                LOG(LogLevel::INFO, "Thread " + std::to_string(tid) + " exit, pool stopped and queue empty!");
                break;
            }

            // 3. 取出任务并执行(此时已加锁,确保任务队列操作安全)
            if (!_task_queue.empty())
            {
                T task = _task_queue.front();
                _task_queue.pop();
                lock.~LockGuard(); // 手动解锁,让其他线程可以操作队列(提前解锁提升性能)

                LOG(LogLevel::DEBUG, "Thread " + std::to_string(tid) + " get task, queue size left: " + std::to_string(_task_queue.size()));
                task(); // 执行任务
            }
            else
            {
                // 极端情况:线程被唤醒但队列空且池未停止,继续循环
                continue;
            }
        }
    }

private:
    int _thread_num;                // 工作线程数
    std::vector<pthread_t> _threads;// 工作线程ID容器
    std::queue<T> _task_queue;      // 任务队列
    Mutex _mutex;                   // 保护任务队列的互斥量
    Cond _cond;                     // 线程阻塞与唤醒的条件变量
    bool _is_running;               // 线程池运行状态(true:运行中,false:已停止)
    int _wait_thread_num;           // 正在等待任务的线程数
};

2.3 核心代码关键解析

2.3.1 线程函数的设计:静态成员函数

由于 POSIX 的pthread_create要求线程函数是void* (*)(void*)类型,无法直接调用类的非静态成员函数(非静态成员函数隐含this指针)。因此,我们设计了静态成员函数ThreadFunc,通过void* arg参数传入线程池对象指针,再调用非静态成员函数HandlerTask处理任务。

2.3.2 任务处理主循环:HandlerTask

这是工作线程的核心逻辑,也是线程池的 “心脏”,重点关注 3 点:

  1. 循环等待任务:用while (_is_running && _task_queue.empty())循环判断,避免 “伪唤醒”(即使线程被意外唤醒,发现队列空或池已停止,会重新等待);
  2. 提前解锁优化:取出任务后,手动调用lock.~LockGuard()释放互斥量,让其他线程可以同时操作任务队列(任务执行可能耗时,提前解锁能提升并发性能);
  3. 线程退出条件:只有当线程池已停止(_is_running = false)且任务队列为空时,线程才退出,确保所有已提交的任务都能执行完毕。
2.3.3 任务提交:Enqueue接口

  • 加锁保护任务队列,避免多线程并发提交任务导致队列错乱;
  • 提交任务后,只唤醒一个等待的线程(_cond.Notify()),避免 “惊群效应”(唤醒所有线程导致竞争,浪费 CPU 资源);
  • 如果线程池已停止,拒绝接受新任务,保证线程池的安全性。
2.3.4 线程池停止:StopWait

  • Stop:设置_is_running = false,唤醒所有等待的线程,让它们检查退出条件;
  • Wait:调用pthread_join等待所有线程退出,确保线程资源被正确回收;
  • 析构函数中自动调用StopWait,避免内存泄漏。

三、线程池测试:验证功能正确性

我们写一个测试程序,验证线程池的任务提交、执行、停止等功能是否正常。

3.1 测试代码(main.cpp)

代码语言:javascript
复制
// main.cpp 线程池测试程序
#include <iostream>
#include <unistd.h>
#include "ThreadPool.hpp"

// 测试任务1:简单打印信息
void TestTask1(int task_id)
{
    LOG(LogLevel::INFO, "Task " + std::to_string(task_id) + " started, thread tid: " + std::to_string(pthread_self()));
    usleep(100000); // 模拟任务执行耗时(100ms)
    LOG(LogLevel::INFO, "Task " + std::to_string(task_id) + " finished!");
}

// 测试任务2:计算1~n的和
void TestTask2(int n, int task_id)
{
    LOG(LogLevel::INFO, "Task " + std::to_string(task_id) + " (sum 1~" + std::to_string(n) + ") started!");
    int sum = 0;
    for (int i = 1; i <= n; ++i)
    {
        sum += i;
    }
    LOG(LogLevel::INFO, "Task " + std::to_string(task_id) + " result: 1~" + std::to_string(n) + " sum = " + std::to_string(sum));
}

int main()
{
    // 1. 创建线程池(5个工作线程)
    ThreadPool<> pool(5);

    // 2. 初始化并启动线程池
    pool.InitThreadPool();
    pool.Start();

    // 3. 提交10个任务
    for (int i = 0; i < 10; ++i)
    {
        if (i % 2 == 0)
        {
            // 提交任务1:绑定task_id
            pool.Enqueue(std::bind(TestTask1, i));
        }
        else
        {
            // 提交任务2:绑定n和task_id
            pool.Enqueue(std::bind(TestTask2, 10000 + i * 1000, i));
        }
        usleep(50000); // 每隔50ms提交一个任务
    }

    // 4. 等待所有任务执行完毕(这里休眠2秒,实际开发中可通过信号量等机制优化)
    sleep(2);

    // 5. 停止线程池
    LOG(LogLevel::INFO, "Start stopping thread pool...");
    pool.Stop();
    pool.Wait();

    return 0;
}

3.2 编译与运行

编译时需要链接 POSIX 线程库(-lpthread),支持 C++11 及以上:

代码语言:javascript
复制
# 编译命令
g++ main.cpp -o thread_pool_test -std=c++11 -lpthread

# 运行程序
./thread_pool_test

3.3 预期运行结果

  1. 线程池初始化 5 个工作线程,每个线程启动后进入等待状态;
  2. 每隔 50ms 提交一个任务,共 10 个任务,线程池会唤醒空闲线程执行任务;
  3. 任务执行时打印线程 ID 和任务信息,验证线程复用(多个任务可能由同一个线程执行);
  4. 所有任务执行完毕后,调用Stop停止线程池,5 个线程依次退出;
  5. 日志中无任务丢失、线程错乱等问题,说明线程池功能正常。

四、升级线程池:线程安全的单例模式

上面实现的线程池,每次使用都需要手动创建对象,无法保证全局唯一实例。在实际开发中,线程池通常是全局唯一的(多个模块共享一个线程池,避免资源浪费),这就需要用到单例模式

但普通的单例模式在多线程环境下是线程不安全的 —— 如果多个线程同时调用GetInstance,可能会创建多个实例。因此,我们需要实现线程安全的单例模式

4.1 单例模式核心原理

单例模式的核心是“一个类只能创建一个实例”,实现要点:

  1. 私有化构造函数、拷贝构造函数、赋值运算符,禁止外部创建对象;
  2. 提供一个静态成员函数GetInstance,返回唯一实例;
  3. 线程安全:确保多个线程同时调用GetInstance时,只创建一个实例。

4.2 线程安全的单例模式实现:双重检查锁定

最常用的线程安全单例模式是双重检查锁定(Double-Checked Locking),核心逻辑:

代码语言:javascript
复制
static Singleton* GetInstance()
{
    if (instance == nullptr) // 第一次检查:避免不必要的锁竞争
    {
        LockGuard lock(mutex); // 加锁
        if (instance == nullptr) // 第二次检查:确保只创建一个实例
        {
            instance = new Singleton();
        }
    }
    return instance;
}

  • 第一次检查:如果实例已创建,直接返回,避免每次调用都加锁,提升性能;
  • 第二次检查:加锁后再次检查,防止多个线程同时通过第一次检查,创建多个实例;
  • 注意:实例指针需要用volatile修饰,避免编译器优化导致的 “指令重排” 问题。

4.3 单例式线程池实现(ThreadPool_Singleton.hpp)

基于之前的固定线程池,修改为线程安全的单例模式:

代码语言:javascript
复制
// ThreadPool_Singleton.hpp 单例模式线程池
#pragma once
#include <iostream>
#include <vector>
#include <queue>
#include <functional>
#include <pthread.h>
#include <unistd.h>
#include <volatile>
#include "Lock.hpp"
#include "Cond.hpp"
#include "Log.hpp"

using namespace LockModule;
using namespace CondModule;
using namespace LogModule;

using Task = std::function<void()>;

template <typename T = Task>
class ThreadPoolSingleton
{
public:
    // 禁用拷贝和赋值
    ThreadPoolSingleton(const ThreadPoolSingleton&) = delete;
    ThreadPoolSingleton& operator=(const ThreadPoolSingleton&) = delete;

    // 静态成员函数:获取唯一实例
    static ThreadPoolSingleton* GetInstance(int thread_num = 10)
    {
        // 第一次检查:避免不必要的锁竞争
        if (_instance == nullptr)
        {
            LockGuard lock(_instance_mutex); // 加锁
            // 第二次检查:确保只创建一个实例
            if (_instance == nullptr)
            {
                _instance = new ThreadPoolSingleton(thread_num);
                _instance->InitThreadPool();
                _instance->Start();
                LOG(LogLevel::INFO, "Create thread pool singleton success, thread num: " + std::to_string(thread_num));
            }
        }
        LOG(LogLevel::DEBUG, "Get thread pool singleton success!");
        return _instance;
    }

    // 提交任务(与之前一致)
    bool Enqueue(const T& task)
    {
        LockGuard lock(_mutex);
        if (!_is_running)
        {
            LOG(LogLevel::ERROR, "ThreadPool has stopped, cannot enqueue task!");
            return false;
        }
        _task_queue.push(task);
        LOG(LogLevel::DEBUG, "Enqueue task success, queue size: " + std::to_string(_task_queue.size()));
        if (_wait_thread_num > 0)
        {
            _cond.Notify();
        }
        return true;
    }

    // 停止线程池(与之前一致)
    void Stop()
    {
        LockGuard lock(_mutex);
        if (!_is_running)
        {
            LOG(LogLevel::INFO, "ThreadPool has already stopped!");
            return;
        }
        _is_running = false;
        _cond.NotifyAll();
        LOG(LogLevel::INFO, "ThreadPool stopping, wake up all threads...");
    }

    // 等待线程退出(与之前一致)
    void Wait()
    {
        for (pthread_t tid : _threads)
        {
            pthread_join(tid, nullptr);
            LOG(LogLevel::INFO, "Thread " + std::to_string(tid) + " exited!");
        }
        _threads.clear();
        LOG(LogLevel::INFO, "All threads exited, thread pool stopped!");
    }

    // 销毁实例(可选,一般在程序退出时调用)
    static void DestroyInstance()
    {
        LockGuard lock(_instance_mutex);
        if (_instance != nullptr)
        {
            if (_instance->_is_running)
            {
                _instance->Stop();
                _instance->Wait();
            }
            delete _instance;
            _instance = nullptr;
            LOG(LogLevel::INFO, "Destroy thread pool singleton success!");
        }
    }

private:
    // 私有化构造函数:只能通过GetInstance创建实例
    explicit ThreadPoolSingleton(int thread_num = 10)
        : _thread_num(thread_num), _is_running(false), _wait_thread_num(0)
    {
        if (_thread_num <= 0)
        {
            LOG(LogLevel::ERROR, "Thread number must be positive!");
            exit(EXIT_FAILURE);
        }
        LOG(LogLevel::INFO, "ThreadPoolSingleton constructed!");
    }

    // 私有化析构函数
    ~ThreadPoolSingleton()
    {
        LOG(LogLevel::INFO, "ThreadPoolSingleton destructed!");
    }

    // 初始化线程池(与之前一致)
    void InitThreadPool()
    {
        _threads.reserve(_thread_num);
        for (int i = 0; i < _thread_num; ++i)
        {
            pthread_t tid;
            if (pthread_create(&tid, nullptr, ThreadFunc, this) != 0)
            {
                LOG(LogLevel::ERROR, "Create thread " + std::to_string(i) + " failed!");
                exit(EXIT_FAILURE);
            }
            _threads.emplace_back(tid);
            LOG(LogLevel::INFO, "Init thread " + std::to_string(i) + " success, tid: " + std::to_string(tid));
        }
    }

    // 启动线程池(与之前一致)
    void Start()
    {
        _is_running = true;
        LOG(LogLevel::INFO, "ThreadPoolSingleton start running...");
    }

    // 静态线程函数(与之前一致)
    static void* ThreadFunc(void* arg)
    {
        ThreadPoolSingleton* pool = static_cast<ThreadPoolSingleton*>(arg);
        pool->HandlerTask();
        return nullptr;
    }

    // 任务处理逻辑(与之前一致)
    void HandlerTask()
    {
        pthread_t tid = pthread_self();
        LOG(LogLevel::INFO, "Thread " + std::to_string(tid) + " started, waiting for tasks...");

        while (true)
        {
            LockGuard lock(_mutex);
            while (_is_running && _task_queue.empty())
            {
                _wait_thread_num++;
                LOG(LogLevel::DEBUG, "Thread " + std::to_string(tid) + " no tasks, wait...");
                _cond.Wait(_mutex);
                _wait_thread_num--;
            }

            if (!_is_running && _task_queue.empty())
            {
                LOG(LogLevel::INFO, "Thread " + std::to_string(tid) + " exit, pool stopped and queue empty!");
                break;
            }

            if (!_task_queue.empty())
            {
                T task = _task_queue.front();
                _task_queue.pop();
                lock.~LockGuard();

                LOG(LogLevel::DEBUG, "Thread " + std::to_string(tid) + " get task, queue size left: " + std::to_string(_task_queue.size()));
                task();
            }
            else
            {
                continue;
            }
        }
    }

private:
    int _thread_num;                // 工作线程数
    std::vector<pthread_t> _threads;// 工作线程ID容器
    std::queue<T> _task_queue;      // 任务队列
    Mutex _mutex;                   // 保护任务队列的互斥量
    Cond _cond;                     // 条件变量
    bool _is_running;               // 运行状态
    int _wait_thread_num;           // 等待任务的线程数

    // 单例相关静态成员
    static volatile ThreadPoolSingleton* _instance; // 唯一实例指针(volatile避免优化)
    static Mutex _instance_mutex;                   // 保护实例创建的互斥量
};

// 静态成员初始化(类外初始化)
template <typename T>
volatile ThreadPoolSingleton<T>* ThreadPoolSingleton<T>::_instance = nullptr;

template <typename T>
Mutex ThreadPoolSingleton<T>::_instance_mutex;

// 简化调用:定义全局宏
#define THREAD_POOL ThreadPoolSingleton<>::GetInstance()

4.4 单例线程池关键改进

  1. 私有化构造 / 析构函数:禁止外部直接创建或销毁实例,只能通过GetInstanceDestroyInstance操作;
  2. 静态实例指针_instance是静态成员,全局唯一,用volatile修饰,避免编译器优化导致的指令重排(比如new操作被拆分为 “分配内存 + 初始化对象 + 赋值指针”,可能被重排为 “分配内存 + 赋值指针 + 初始化对象”,导致其他线程拿到未初始化的实例);
  3. 双重检查锁定GetInstance中两次检查_instance是否为空,结合互斥量_instance_mutex,确保多线程环境下只创建一个实例;
  4. 全局宏定义#define THREAD_POOL ThreadPoolSingleton<>::GetInstance(),简化调用,直接用THREAD_POOL->Enqueue(...)提交任务。

4.5 单例线程池测试

代码语言:javascript
复制
// main_singleton.cpp 单例线程池测试
#include <iostream>
#include <unistd.h>
#include "ThreadPool_Singleton.hpp"

void TestTask(int task_id)
{
    LOG(LogLevel::INFO, "Singleton Task " + std::to_string(task_id) + " started, tid: " + std::to_string(pthread_self()));
    usleep(100000);
    LOG(LogLevel::INFO, "Singleton Task " + std::to_string(task_id) + " finished!");
}

int main()
{
    // 1. 多个线程同时获取单例,验证线程安全
    pthread_t t1, t2, t3;

    auto get_pool_and_enqueue = [](void* arg) {
        int thread_id = *(static_cast<int*>(arg));
        // 多个线程同时获取实例并提交任务
        ThreadPoolSingleton<>* pool = ThreadPoolSingleton<>::GetInstance(3);
        for (int i = 0; i < 2; ++i)
        {
            pool->Enqueue(std::bind(TestTask, thread_id * 10 + i));
            usleep(30000);
        }
        return nullptr;
    };

    int id1 = 1, id2 = 2, id3 = 3;
    pthread_create(&t1, nullptr, get_pool_and_enqueue, &id1);
    pthread_create(&t2, nullptr, get_pool_and_enqueue, &id2);
    pthread_create(&t3, nullptr, get_pool_and_enqueue, &id3);

    // 等待线程提交任务
    sleep(2);

    // 2. 停止线程池并销毁实例
    LOG(LogLevel::INFO, "Start stopping singleton thread pool...");
    ThreadPoolSingleton<>::GetInstance()->Stop();
    ThreadPoolSingleton<>::GetInstance()->Wait();
    ThreadPoolSingleton<>::DestroyInstance();

    pthread_join(t1, nullptr);
    pthread_join(t2, nullptr);
    pthread_join(t3, nullptr);

    return 0;
}

编译运行

代码语言:javascript
复制
g++ main_singleton.cpp -o thread_pool_singleton_test -std=c++11 -lpthread
./thread_pool_singleton_test

预期结果

  • 3 个线程同时调用GetInstance,但只创建一个线程池实例(日志中只有一次 “Create thread pool singleton success”);
  • 共提交 6 个任务,3 个工作线程复用执行;
  • 停止线程池后,实例被正确销毁,无内存泄漏。

五、线程池的进阶优化方向

我们实现的线程池是基础版本,实际生产环境中还可以进行以下优化:

  1. 动态扩容 / 缩容:根据任务队列长度动态增加或减少工作线程,避免线程过多导致调度开销,或线程过少导致任务堆积;
  2. 任务优先级:任务队列改为优先级队列(std::priority_queue),支持高优先级任务先执行;
  3. 任务超时机制:提交任务时指定超时时间,超时未执行的任务自动取消;
  4. 异常处理:任务执行过程中抛出异常时,线程池能捕获异常并记录日志,避免线程崩溃;
  5. 异步任务结果:支持任务执行后返回结果,通过std::futurestd::promise实现;
  6. 线程本地存储(TLS):为每个工作线程分配独立的本地存储,存储线程私有的数据,避免频繁的参数传递。

总结

最后,线程池是 C/C++ 高并发编程的基础组件,掌握其原理和实现,能让你在面试和实际开发中更有竞争力。本文的代码可直接用于项目开发,也可根据需求进行二次优化,建议大家动手敲一遍,加深理解! 如果在实现过程中有任何问题,欢迎在评论区留言讨论,一起学习,一起进步!💪

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2026-03-09,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 目录
  • 前言
  • 一、线程池核心原理:为什么需要线程池?
    • 1.1 线程的 “创建 - 销毁” 开销有多高?
    • 1.2 线程池的核心思想:“线程复用”
    • 1.3 线程池的核心组件
    • 1.4 线程池的工作流程
  • 二、从零实现固定线程池:核心代码拆解
    • 2.1 前置依赖:已封装的工具类
      • 2.1.1 互斥量封装(Lock.hpp)
      • 2.1.2 条件变量封装(Cond.hpp)
      • 2.1.3 日志类简化版(Log.hpp)
    • 2.2 线程池核心实现(ThreadPool.hpp)
    • 2.3 核心代码关键解析
      • 2.3.1 线程函数的设计:静态成员函数
      • 2.3.2 任务处理主循环:HandlerTask
      • 2.3.3 任务提交:Enqueue接口
      • 2.3.4 线程池停止:Stop与Wait
  • 三、线程池测试:验证功能正确性
    • 3.1 测试代码(main.cpp)
    • 3.2 编译与运行
    • 3.3 预期运行结果
  • 四、升级线程池:线程安全的单例模式
    • 4.1 单例模式核心原理
    • 4.2 线程安全的单例模式实现:双重检查锁定
    • 4.3 单例式线程池实现(ThreadPool_Singleton.hpp)
    • 4.4 单例线程池关键改进
    • 4.5 单例线程池测试
  • 五、线程池的进阶优化方向
  • 总结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档