手撕线程池-学习C++11
目的
- 学习C++11的部分特性
- 清晰手撕代码时的思考过程
阅读顺序
需要了解的C++11特性:
template
模板...Args
可变类型参数STL emplace、emplace_back
容器操作方法Lambda
表达式std::move
移动语义、std::forward
完美转发std::shared
智能指针、std::thread
C++多线程std::future、std::promise
线程执行的结果传递std::mutex、std::condition_variable
互斥量与条件变量std::function、std::bind、std::packaged_task
函数封装
按照如下顺序阅读,语言特性不懂的地方,一定根据参考文章了解清楚后,再继续阅读
- 测试代码
- 成员变量
- 构造函数
- 入队函数
- 析构函数
手撕思路
- 定义
std::thread
线程列表、std::queue
任务队列,记住任务队列模板类型,条件变量,互斥量,进程池状态 - 构造函数中循环创建线程,线程中条件等待队列状态和线程池状态,取出任务并执行
- 入队函数
enqueue
:
- 根据模板参数
typename F
与typename ...Args
包装可调用函数对象std::bind
作为右值传递给std::packaged_task
- 返回值使用
std::result_of(F(Args...))::type
作为std::future
的模板参数创建临时返回值对象后,加锁,入队 - 条件变量通知线程
- 析构函数中,停止线程池状态,通知线程,循环join等待线程执行结束
参考
拓展
- 所有任务执行完成通知
- 根据CPU核心数动态调整线程数量
- 增加任务优先级,调整任务执行顺序
- 双队列设计
- 。。。。。
源代码
测试代码
// g++ TEST_ThreadPool.cpp -lpthread #include "ThreadPool.h" #include <iostream> std::mutex coutMtx; void Func(const size_t& i) { { std::lock_guard<std::mutex> lck(coutMtx); std::cout << "Hello ThreadPool: " << std::this_thread::get_id() << ", Task: " << i << std::endl; } std::this_thread::sleep_for(std::chrono::milliseconds(1)); } int main() { ThreadPool pool(4); for (size_t i = 0; i < 10000; i++) { pool.enqueue(Func, i); } return 0; }
ThreadPool.h
/* Copyright (c) 2012 Jakob Progsch, Václav Zeman This software is provided 'as-is', without any express or implied warranty. In no event will the authors be held liable for any damages arising from the use of this software. Permission is granted to anyone to use this software for any purpose, including commercial applications, and to alter it and redistribute it freely, subject to the following restrictions: 1. The origin of this software must not be misrepresented; you must not claim that you wrote the original software. If you use this software in a product, an acknowledgment in the product documentation would be appreciated but is not required. 2. Altered source versions must be plainly marked as such, and must not be misrepresented as being the original software. 3. This notice may not be removed or altered from any source distribution. Modifications Copyright (c) 2025 Sun Xing * Description of modifications: * - Added code comments. */ #pragma once #include <queue> #include <thread> #include <future> #include <vector> #include <functional> #include <condition_variable> class ThreadPool { public: ThreadPool(size_t); ~ThreadPool(); template <typename F, typename ...Args> auto enqueue(F&& f, Args&& ...args) -> std::future<typename std::result_of<F(Args...)>::type>; private: /* 1.成员变量解析 threads 线程列表,根据参数初始化线程池包含的线程数量,决定了有多少线程参与任务的执行 tasts 任务队列,包含了想要线程需要运行的函数,同时需要处理传入参数和能够获取返回值;std::queue中保存的对象为std::function<void()>, 这里使用void()作为std::function的模板参数,是为了兼容不同可调用对象的参数和返回值,后边的enqueue传递可调用对象会介绍 mtx 互斥量,用来保证任务队列的线程同步 cond 条件变量,为了实现通知机制,即有任务时及时通知线程执行 stop,代表线程池的运行状态 */ std::vector<std::thread> threads; std::queue<std::function<void()>> tasks; std::mutex mtx; std::condition_variable cond; volatile bool stop; }; /* 2.ThreadPool(size_t) 构造函数解析 1)size_t 表示线程池的数量,循环创建线程并定义线程内部执行的函数,使用Lambda直接传入一个可调用对象,捕获了this指针用来调用线程池中的mtx、cond、tasks、stop 2)线程内部执行的是一个死循环,不断地从tasks任务队列中取出任务并执行 3)在取任务的过程中,通过cond.wait来等待如下两个条件:a)任务队列中是否还有需要执行的任务 b)线程池是否停止,线程中没有可执行的任务并且线程池停止后,线程将退出 4)std::function<void()>来保存从任务队列中取出来的可执行对象,这里没有任何参数和返回值是因为都被包装在了这个可执行对象中,稍后的enqueue函数会介绍 5)使用了std::move移动语义防止了可执行对象的拷贝操作 */ ThreadPool::ThreadPool(size_t num) : stop(false) { for (size_t i = 0; i < num; i++) { threads.emplace_back([this]() -> void { while (true) { std::function<void()> task; { std::unique_lock<std::mutex> lcok(this->mtx); this->cond.wait(lcok, [this]() -> bool { return this->stop || !this->tasks.empty(); }); if (this->stop && this->tasks.empty()) { return; } task = std::move(tasks.front()); tasks.pop(); } task(); } }); } } /* 3.template <typename F, typename ...Args> auto ThreadPool::enqueue(F&& f, Args&& ...args) -> std::future<typename std::result_of<F(Args...)>::type> 入队函数解析 1)模板参数:F可理解为可调用的函数对象,...Args可理解为函数对象的参数 2)auto 即 std::future<typename std::result_of<F(Args...)>::type> 可理解为,enqueue函数执行完成后,返回一个std::future对象,异步传递任务的执行结果, std::result_of<F(Args...)>::type作为std::future对象的模板参数,表示可调用对象F在给定参数Args...下的返回值类型 3)使用using定义return_type为typename std::result_of<F(Args...)>::type的别名 4)std::bind(std::forward<F>(f), std::forward<Args>(args)...),将函数对象及其参数绑定到一起,生成一个新的可调用对象,std::forward可以将函数对象及其参数完整传递 5)std::packaged_task<return_type()>,将新的可调用对象封装起来,与std::future关联,并且使用shared_ptr管理新的可调用对象 6)task->get_future()保存新的可调用对象的返回值 7)加锁,tasks.emplace([task]() { (*task)(); }),将可调用对象再次定义为Lambda表达式并入队,实际会在tasks.front()获取之后主动执行 8)cond.notify_one()通知到线程调用cond.wait等待的地方 9)当线程池停止运行时返回一个无效的std::future对象 */ template <typename F, typename ...Args> auto ThreadPool::enqueue(F&& f, Args&& ...args) -> std::future<typename std::result_of<F(Args...)>::type> { using return_type = typename std::result_of<F(Args...)>::type; auto task = std::make_shared<std::packaged_task<return_type()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...)); std::future<return_type> result = task->get_future(); { std::unique_lock<std::mutex> lck(mtx); if (stop) return std::future<return_type>(); tasks.emplace([task]() { (*task)(); }); } cond.notify_one(); return result; } /* 3.ThreadPool::~ThreadPool() 析构函数解析 1)改变线程池状态 2)通知所有线程不能在运行任务了 3)等待所有线程退出 */ ThreadPool::~ThreadPool() { { std::unique_lock<std::mutex> lck(mtx); stop = true; } cond.notify_all(); for (auto& thread : threads) { thread.join(); } }#手撕代码##面试##社招##C++#