在大多数系统中,将每个任务指定给某个线程是不切实际的,不过可以利用现有的并发性,进行并发执行。线程池就提供了这样的功能:
- 提交到线程池中的任务将并发执行
- 提交的任务将会挂在任务队列上
- 队列中的每一个任务都会被池中的工作线程所获取
- 当任务执行完成后,线程再回到线程池中获取下一个任务
1 最简单的线程池
最简单的线程池中,线程就不需要等待其他线程完成对应任务了。任务没有返回值,不需要阻塞其他线程。
//最简单的线程池,仅用于示例#ifndef THREAD_POOL#define THREAD_POOL#include <thread>#include <atomic>#include <vector>#include "21_threadsafe_queue2.hpp"class join_threads{private:std::vector<std::thread>& threads;public:explicit join_threads(std::vector<std::thread>& threads_): threads(threads_){}~join_threads(){for(unsigned long i=0;i<threads.size();++i){// 析构函数中等待线程结束,RAII方式if(threads[i].joinable())threads[i].join();}}};class thread_pool{private://注意这里声明的顺序,是为了保证析构时能以相反的顺序销毁(很重要)std::atomic_bool done;threadsafe_queue<std::function<void()> > work_queue; // 使用线程安全队列保存任务std::vector<std::thread> threads; // 保存pool里的工作线程join_threads joiner; // join_threads使用RAII方式,保证pool销毁前所有线程能执行结束void worker_thread(){while(!done) // 原子bool设置为true时,保证每个正在运行的线程停止{std::function<void()> task;if(work_queue.try_pop(task)) // 从任务队列获取到一个任务{task(); // 执行任务,函数指针}else{std::this_thread::yield(); // 没有任务,当前线程放弃执行,线程休息}}}public:thread_pool(): done(false),joiner(threads){// pool中线程个数使用硬件支持的最大个数unsigned const thread_count=std::thread::hardware_concurrency();try{for(unsigned i=0;i<thread_count;++i){// 创建工作线程,每个线程都执行worker_thread函数,在此函数中获取任务处理threads.push_back(std::thread(&thread_pool::worker_thread, this));}}catch(...){done=true; // 有异常时,设置done为truethrow;}}~thread_pool(){done=true; // 11}// submit将一个要执行的任务包装成std::function函数指针,加入任务队列template<typename FunctionType>void submit(FunctionType f){work_queue.push(std::function<void()>(f)); // 12}};#endif
2 添加功能:任务返回值
上面的任务独立运行,不需要返回值。但是一种特殊的情况是,执行任务的线程需要返回一个结果到主线程上进行处理。可以使用submit()函数提交任务并返回条件变量或future,用来等待任务的完成,返回任务返回值。
在上面例子的基础上,使用packged_task和future代替std::function,可以返回任务返回值:
//线程池,在29线程池中添加功能:可以返回每个任务的返回值#ifndef thread_pool22#define thread_pool22#include <future>#include <atomic>#include <vector>#include "21_threadsafe_queue2.hpp"#include "29_thread_pool.hpp"/*std::packaged_task<>实例是不可拷贝的,仅是可移动的,所以不能再使用std::function<>来实现任务队列因为std::function<>需要存储可复制构造的函数对象。这里包装一个自定义函数,用来处理只可移动的类型,就是一个带有函数操作符的类型擦除类*/class function_wrapper{private:struct impl_base{virtual void call() = 0;virtual ~impl_base() {}};std::unique_ptr<impl_base> impl;template <typename F>struct impl_type : impl_base{F f;impl_type(F &&f_) : f(std::move(f_)) {}void call() { f(); }};public:template <typename F>function_wrapper(F &&f) : impl(new impl_type<F>(std::move(f))){}function_wrapper() = default;function_wrapper(function_wrapper &&other) : impl(std::move(other.impl)){}function_wrapper &operator=(function_wrapper &&other){impl = std::move(other.impl);return *this;}void operator()() { impl->call(); }function_wrapper(const function_wrapper &) = delete;function_wrapper(function_wrapper &) = delete;function_wrapper &operator=(const function_wrapper &) = delete;};class thread_pool2{private:std::atomic_bool done;threadsafe_queue<function_wrapper> work_queue; // 使用function_wrapper,而非使用std::functionstd::vector<std::thread> threads; // 保存pool里的工作线程join_threads joiner; // join_threads使用RAII方式,保证pool销毁前所有线程能执行结束void worker_thread(){while (!done){function_wrapper task;if (work_queue.try_pop(task)){task();}else{std::this_thread::yield();}}}public:thread_pool2() : done(false), joiner(threads){// pool中线程个数使用硬件支持的最大个数unsigned const thread_count = std::thread::hardware_concurrency();try{for (unsigned i = 0; i < thread_count; ++i){// 创建工作线程,每个线程都执行worker_thread函数,在此函数中获取任务处理threads.push_back(std::thread(&thread_pool2::worker_thread, this));}}catch (...){done = true; // 有异常时,设置done为truethrow;}}~thread_pool2(){done = true;}// submit返回一个保存任务返回值的future,使用result_of获取函数functionType的返回类型template <typename FunctionType>std::future<typename std::result_of<FunctionType()>::type>submit(FunctionType f){// 设置函数functionType的返回类型的别名为result_typetypedef typename std::result_of<FunctionType()>::type result_type;std::packaged_task<result_type()> task(std::move(f)); // 封装一个异步任务,任务执行函数fstd::future<result_type> res(task.get_future()); // 获取异步任务的futurework_queue.push(std::move(task)); // 将任务添加到任务队列中return res; // 返回future给submit函数的调用者}};#endif
当选择“因为能并发执行,最小工作块值的一试”时,就需要谨慎了。向线程池提交任务有一定的开销;让工作线程执行这个任务,并且将返回值保存在std::future<>中,对于太小的任务,这样的开销不划算。如果任务块太小,使用线程池的速度可能都不及单线程。
3 添加功能:手动触发任务执行
最简单的方法就是在thread_pool中添加一个新函数,来执行任务队列上的任务,并对线程池进行管理。高级线程池的实现可能会在等待函数中添加逻辑,或等待其他函数来处理这个任务,优先的任务会让其他的任务进行等待。下面清单中的实现,就展示了一个新run_pending_task()函数:
/*run_pending_task()的实现去掉了在worker_thread()函数的主循环。函数任务队列中有任务的时候执行任务,要是没有的话就会让操作系统对线程进行重新分配。*/void run_pending_task(){function_wrapper task;if (work_queue.try_pop(task)){task();}else{std::this_thread::yield();}}
4 避免队列中的任务竞争
随着处理器的增加,任务队列上就会有很多的竞争(添加任务和多线程获取任务),这会让性能下降。使用无锁队列会让任务没有明显的等待,但乒乓缓存会消耗大量的时间。
为了避免乒乓缓存,每个线程建立独立的任务队列。这样,每个线程就会将新任务放在自己的任务队列上,并且当线程上的任务队列没有任务时,去全局的任务列表中取任务。
class thread_pool3{private:std::atomic_bool done;threadsafe_queue<function_wrapper> work_queue; // 使用function_wrapper,而非使用std::functiontypedef std::queue<function_wrapper> local_queue_type;// unique_ptr指向每个线程本地(thread_local)的工作队列static thread_local std::unique_ptr<local_queue_type> local_work_queue;std::vector<std::thread> threads; // 保存pool里的工作线程// join_threads使用RAII方式,保证pool销毁前所有线程能执行结束join_threads joiner;void worker_thread(){local_work_queue.reset(new local_queue_type); // 3while (!done){run_pending_task();}}public:thread_pool3() : done(false), joiner(threads){// pool中线程个数使用硬件支持的最大个数unsigned const thread_count = std::thread::hardware_concurrency();try{for (unsigned i = 0; i < thread_count; ++i){// 创建工作线程,每个线程都执行worker_thread函数,在此函数中获取任务处理threads.push_back(std::thread(&thread_pool3::worker_thread, this));}}catch (...){done = true; // 有异常时,设置done为truethrow;}}~thread_pool3(){done = true;}// submit返回一个保存任务返回值的future,使用result_of获取函数functionType的返回类型template <typename FunctionType>std::future<typename std::result_of<FunctionType()>::type>submit(FunctionType f){// 设置函数functionType的返回类型的别名为result_typetypedef typename std::result_of<FunctionType()>::type result_type;std::packaged_task<result_type()> task(std::move(f)); // 封装一个异步任务,任务执行函数fstd::future<result_type> res(task.get_future()); // 获取异步任务的futureif (local_work_queue) // 检查当前线程是否具有一个工作队列,如果有则将任务放入本地队列{local_work_queue->push(std::move(task));}else{work_queue.push(std::move(task)); // 将任务添加到全局任务队列中}return res; // 返回future给submit函数的调用者}/*run_pending_task()的实现去掉了在worker_thread()函数的主循环。函数任务队列中有任务的时候执行任务,要是没有的话就会让操作系统对线程进行重新分配。*/void run_pending_task(){function_wrapper task;if (local_work_queue && !(local_work_queue->empty())) // 如果本地队列有任务,则优先处理本地队列的任务{task = std::move(local_work_queue->front());local_work_queue->pop();task();}else if (work_queue.try_pop(task)) //否则,在全局队列获取任务{task();}else{std::this_thread::yield();}}};
本地任务队列能有效的避免竞争,不过当任务分配不均时,造成的结果就是:某个线程本地队列中有很多任务的同时,其他线程无所事事。例如:举一个快速排序的例子,一开始的数据块能在线程池上被处理,因为剩余部分会放在工作线程的本地队列上进行处理,这样的使用方式也违背使用线程池的初衷。
5 从别的线程队列中窃取任务
上面任务分配不均时的困境时有解的,本地工作队列和全局工作队列上没有任务时,可从别的线程队列中窃取任务。为了让没有任务的线程能从其他线程的任务队列中获取任务,就需要本地任务列表可以被其他线程访问,这样才能让run_pending_tasks()窃取任务。需要每个线程在线程池队列上进行注册,或由线程池指定一个线程。同样,还需要保证数据队列中的任务适当的被同步和保护,这样队列的不变量就不会被破坏。
/*线程池,在30线程池基础上增加功能,本篇最终版本:1. 为每个线程建立独立的任务队列,配合全局任务队列使用2. 支持从其他线程的本地队列窃取任务*/#ifndef THREAD_POOL4#define THREAD_POOL4#include <future>#include <atomic>#include <vector>#include <queue>#include "21_threadsafe_queue2.hpp"// 统一管理线程池所有线程,负责等待线程结束和销毁class join_threads{private:std::vector<std::thread> &threads;public:explicit join_threads(std::vector<std::thread> &threads_) : threads(threads_){}~join_threads(){for (unsigned long i = 0; i < threads.size(); ++i){// 析构函数中等待线程结束,RAII方式if (threads[i].joinable())threads[i].join();}}};/*std::packaged_task<>实例是不可拷贝的,仅是可移动的,所以不能再使用std::function<>来实现任务队列因为std::function<>需要存储可复制构造的函数对象。这里包装一个自定义函数,用来处理只可移动的类型,就是一个带有函数操作符的类型擦除类*/class function_wrapper{private:struct impl_base{virtual void call() = 0;virtual ~impl_base() {}};std::unique_ptr<impl_base> impl;template <typename F>struct impl_type : impl_base{F f;impl_type(F &&f_) : f(std::move(f_)) {}void call() { f(); }};public:template <typename F>function_wrapper(F &&f) : impl(new impl_type<F>(std::move(f))){}function_wrapper() = default;function_wrapper(function_wrapper &&other) : impl(std::move(other.impl)){}function_wrapper &operator=(function_wrapper &&other){impl = std::move(other.impl);return *this;}void operator()() { impl->call(); }function_wrapper(const function_wrapper &) = delete;function_wrapper(function_wrapper &) = delete;function_wrapper &operator=(const function_wrapper &) = delete;};/*基于锁的任务窃取队列,代替普通的线程安全队列该队列支持从前端和后端获取数据,即先进先出和后进先出两种模式,后进先出可以用于其他线程窃取任务*/class work_stealing_queue{private:typedef function_wrapper data_type;std::deque<data_type> the_queue; // 使用deque队列保存实际function_wrappermutable std::mutex the_mutex; // 互斥锁用于控制对the_mutex的访问public:work_stealing_queue() {}// 拷贝和赋值构造函数不创建work_stealing_queue(const work_stealing_queue& other) = delete;work_stealing_queue& operator=(const work_stealing_queue& other) = delete;// 由互斥锁控制在队列前端插入数据void push(data_type data){std::lock_guard<std::mutex> lock(the_mutex);the_queue.push_front(std::move(data));}bool empty() const{std::lock_guard<std::mutex> lock(the_mutex);return the_queue.empty();}// 由互斥锁控制在队列前端获取数据bool try_pop(data_type& res){std::lock_guard<std::mutex> lock(the_mutex);if(the_queue.empty()){return false;}res=std::move(the_queue.front());the_queue.pop_front();return true;}// 由互斥锁控制在队列后端获取数据bool try_steal(data_type& res){std::lock_guard<std::mutex> lock(the_mutex);if(the_queue.empty()){return false;}res=std::move(the_queue.back());the_queue.pop_back();return true;}};class thread_pool4{private:typedef function_wrapper task_type;// 注意这里声明的顺序,是为了保证析构时能以相反的顺序销毁(很重要)std::atomic_bool done;threadsafe_queue<task_type> pool_work_queue; // 全局任务队列std::vector<std::unique_ptr<work_stealing_queue>> queues; // 保存每个线程任务队列的全局队列// 保存pool里的工作线程std::vector<std::thread> threads;// join_threads使用RAII方式,保证pool销毁前所有线程能执行结束join_threads joiner;static thread_local work_stealing_queue* local_work_queue; // 每个线程都有一个可以窃取的任务队列,不再使用普通的队列static thread_local unsigned my_index;void worker_thread(unsigned my_index_){my_index = my_index_;// 每个线程根据自己的序号从全局队列中获取自己的任务队列。这意味着根据index可以获取任意线程的任务队列local_work_queue = queues[my_index].get();while (!done){run_pending_task();}}bool pop_task_from_local_queue(task_type& task){return local_work_queue && local_work_queue->try_pop(task);}bool pop_task_from_pool_queue(task_type& task){return pool_work_queue.try_pop(task);}bool pop_task_from_other_thread_queue(task_type& task) // 从其他线程窃取任务{for(unsigned i=0;i < queues.size();++i){// 根据index从其他线程的任务队列后端窃取任务unsigned const index = (my_index + i + 1) % queues.size();if(queues[index]->try_steal(task)){return true;}}return false;}public:thread_pool4() : done(false), joiner(threads){// pool中线程个数使用硬件支持的最大个数unsigned const thread_count = std::thread::hardware_concurrency();try{for (unsigned i = 0; i < thread_count; ++i){// 当每个线程被创建,就创建了一个属于自己的工作队列,放入全局队列中queues.push_back(std::unique_ptr<work_stealing_queue>(new work_stealing_queue));// 创建工作线程,每个线程都执行worker_thread函数,在此函数中获取任务处理threads.push_back(std::thread(&thread_pool4::worker_thread, this, i));//传入参数i为my_index}}catch (...){done = true; // 有异常时,设置done为truethrow;}}~thread_pool4(){done = true;}// submit返回一个保存任务返回值的future,使用result_of获取函数functionType的返回类型template <typename FunctionType>std::future<typename std::result_of<FunctionType()>::type>submit(FunctionType f){// 设置函数functionType的返回类型的别名为result_typetypedef typename std::result_of<FunctionType()>::type result_type;std::packaged_task<result_type()> task(std::move(f)); // 封装一个异步任务,任务执行函数fstd::future<result_type> res(task.get_future()); // 获取异步任务的futureif (local_work_queue) // 检查当前线程是否具有一个工作队列,如果有则将任务放入本地队列{local_work_queue->push(std::move(task)); // 线程本地任务队列}else{pool_work_queue.push(std::move(task)); // 将任务添加到全局任务队列中}return res; // 返回future给submit函数的调用者}/*run_pending_task()的实现去掉了在worker_thread()函数的主循环。函数任务队列中有任务的时候执行任务,要是没有的话就会让操作系统对线程进行重新分配。*/void run_pending_task(){task_type task;if (pop_task_from_local_queue(task) || // 第一优先级:从线程自己的队列获取任务pop_task_from_pool_queue(task) || // 第二优先级:从线程池队列获取任务pop_task_from_other_thread_queue(task)) // 最低优先级:窃取其他线程的任务{task();}else{std::this_thread::yield();}}};#endif
pop_task_from_other_thread_queue()会遍历池中所有线程的任务队列,然后尝试窃取任务。为了避免每个线程都尝试从列表中的第一个线程上窃取任务,每一个线程都会从下一个线程开始遍历,通过自身的线程序号来确定开始遍历的线程序号。
使用线程池有很多好处,还有很多的方式能为某些特殊用法提升性能。特别是还没有探究动态变换大小的线程池,即使线程被阻塞的时候(例如:I/O或互斥锁),程序都能保证CPU最优的使用率。
