在大多数系统中,将每个任务指定给某个线程是不切实际的,不过可以利用现有的并发性,进行并发执行。线程池就提供了这样的功能:

  • 提交到线程池中的任务将并发执行
  • 提交的任务将会挂在任务队列上
  • 队列中的每一个任务都会被池中的工作线程所获取
  • 当任务执行完成后,线程再回到线程池中获取下一个任务


1 最简单的线程池

最简单的线程池中,线程就不需要等待其他线程完成对应任务了。任务没有返回值,不需要阻塞其他线程。

  1. //最简单的线程池,仅用于示例
  2. #ifndef THREAD_POOL
  3. #define THREAD_POOL
  4. #include <thread>
  5. #include <atomic>
  6. #include <vector>
  7. #include "21_threadsafe_queue2.hpp"
  8. class join_threads
  9. {
  10. private:
  11. std::vector<std::thread>& threads;
  12. public:
  13. explicit join_threads(std::vector<std::thread>& threads_): threads(threads_)
  14. {}
  15. ~join_threads()
  16. {
  17. for(unsigned long i=0;i<threads.size();++i)
  18. {
  19. // 析构函数中等待线程结束,RAII方式
  20. if(threads[i].joinable())
  21. threads[i].join();
  22. }
  23. }
  24. };
  25. class thread_pool
  26. {
  27. private:
  28. //注意这里声明的顺序,是为了保证析构时能以相反的顺序销毁(很重要)
  29. std::atomic_bool done;
  30. threadsafe_queue<std::function<void()> > work_queue; // 使用线程安全队列保存任务
  31. std::vector<std::thread> threads; // 保存pool里的工作线程
  32. join_threads joiner; // join_threads使用RAII方式,保证pool销毁前所有线程能执行结束
  33. void worker_thread()
  34. {
  35. while(!done) // 原子bool设置为true时,保证每个正在运行的线程停止
  36. {
  37. std::function<void()> task;
  38. if(work_queue.try_pop(task)) // 从任务队列获取到一个任务
  39. {
  40. task(); // 执行任务,函数指针
  41. }
  42. else
  43. {
  44. std::this_thread::yield(); // 没有任务,当前线程放弃执行,线程休息
  45. }
  46. }
  47. }
  48. public:
  49. thread_pool(): done(false),joiner(threads)
  50. {
  51. // pool中线程个数使用硬件支持的最大个数
  52. unsigned const thread_count=std::thread::hardware_concurrency();
  53. try
  54. {
  55. for(unsigned i=0;i<thread_count;++i)
  56. {
  57. // 创建工作线程,每个线程都执行worker_thread函数,在此函数中获取任务处理
  58. threads.push_back(std::thread(&thread_pool::worker_thread, this));
  59. }
  60. }
  61. catch(...)
  62. {
  63. done=true; // 有异常时,设置done为true
  64. throw;
  65. }
  66. }
  67. ~thread_pool()
  68. {
  69. done=true; // 11
  70. }
  71. // submit将一个要执行的任务包装成std::function函数指针,加入任务队列
  72. template<typename FunctionType>
  73. void submit(FunctionType f)
  74. {
  75. work_queue.push(std::function<void()>(f)); // 12
  76. }
  77. };
  78. #endif

2 添加功能:任务返回值

上面的任务独立运行,不需要返回值。但是一种特殊的情况是,执行任务的线程需要返回一个结果到主线程上进行处理。可以使用submit()函数提交任务并返回条件变量或future,用来等待任务的完成,返回任务返回值。
在上面例子的基础上,使用packged_task和future代替std::function,可以返回任务返回值:

  1. //线程池,在29线程池中添加功能:可以返回每个任务的返回值
  2. #ifndef thread_pool22
  3. #define thread_pool22
  4. #include <future>
  5. #include <atomic>
  6. #include <vector>
  7. #include "21_threadsafe_queue2.hpp"
  8. #include "29_thread_pool.hpp"
  9. /*std::packaged_task<>实例是不可拷贝的,仅是可移动的,所以不能再使用std::function<>来实现任务队列
  10. 因为std::function<>需要存储可复制构造的函数对象。
  11. 这里包装一个自定义函数,用来处理只可移动的类型,就是一个带有函数操作符的类型擦除类
  12. */
  13. class function_wrapper
  14. {
  15. private:
  16. struct impl_base
  17. {
  18. virtual void call() = 0;
  19. virtual ~impl_base() {}
  20. };
  21. std::unique_ptr<impl_base> impl;
  22. template <typename F>
  23. struct impl_type : impl_base
  24. {
  25. F f;
  26. impl_type(F &&f_) : f(std::move(f_)) {}
  27. void call() { f(); }
  28. };
  29. public:
  30. template <typename F>
  31. function_wrapper(F &&f) : impl(new impl_type<F>(std::move(f)))
  32. {
  33. }
  34. function_wrapper() = default;
  35. function_wrapper(function_wrapper &&other) : impl(std::move(other.impl))
  36. {
  37. }
  38. function_wrapper &operator=(function_wrapper &&other)
  39. {
  40. impl = std::move(other.impl);
  41. return *this;
  42. }
  43. void operator()() { impl->call(); }
  44. function_wrapper(const function_wrapper &) = delete;
  45. function_wrapper(function_wrapper &) = delete;
  46. function_wrapper &operator=(const function_wrapper &) = delete;
  47. };
  48. class thread_pool2
  49. {
  50. private:
  51. std::atomic_bool done;
  52. threadsafe_queue<function_wrapper> work_queue; // 使用function_wrapper,而非使用std::function
  53. std::vector<std::thread> threads; // 保存pool里的工作线程
  54. join_threads joiner; // join_threads使用RAII方式,保证pool销毁前所有线程能执行结束
  55. void worker_thread()
  56. {
  57. while (!done)
  58. {
  59. function_wrapper task;
  60. if (work_queue.try_pop(task))
  61. {
  62. task();
  63. }
  64. else
  65. {
  66. std::this_thread::yield();
  67. }
  68. }
  69. }
  70. public:
  71. thread_pool2() : done(false), joiner(threads)
  72. {
  73. // pool中线程个数使用硬件支持的最大个数
  74. unsigned const thread_count = std::thread::hardware_concurrency();
  75. try
  76. {
  77. for (unsigned i = 0; i < thread_count; ++i)
  78. {
  79. // 创建工作线程,每个线程都执行worker_thread函数,在此函数中获取任务处理
  80. threads.push_back(std::thread(&thread_pool2::worker_thread, this));
  81. }
  82. }
  83. catch (...)
  84. {
  85. done = true; // 有异常时,设置done为true
  86. throw;
  87. }
  88. }
  89. ~thread_pool2()
  90. {
  91. done = true;
  92. }
  93. // submit返回一个保存任务返回值的future,使用result_of获取函数functionType的返回类型
  94. template <typename FunctionType>
  95. std::future<typename std::result_of<FunctionType()>::type>
  96. submit(FunctionType f)
  97. {
  98. // 设置函数functionType的返回类型的别名为result_type
  99. typedef typename std::result_of<FunctionType()>::type result_type;
  100. std::packaged_task<result_type()> task(std::move(f)); // 封装一个异步任务,任务执行函数f
  101. std::future<result_type> res(task.get_future()); // 获取异步任务的future
  102. work_queue.push(std::move(task)); // 将任务添加到任务队列中
  103. return res; // 返回future给submit函数的调用者
  104. }
  105. };
  106. #endif

当选择“因为能并发执行,最小工作块值的一试”时,就需要谨慎了。向线程池提交任务有一定的开销;让工作线程执行这个任务,并且将返回值保存在std::future<>中,对于太小的任务,这样的开销不划算。如果任务块太小,使用线程池的速度可能都不及单线程。

3 添加功能:手动触发任务执行

最简单的方法就是在thread_pool中添加一个新函数,来执行任务队列上的任务,并对线程池进行管理。高级线程池的实现可能会在等待函数中添加逻辑,或等待其他函数来处理这个任务,优先的任务会让其他的任务进行等待。下面清单中的实现,就展示了一个新run_pending_task()函数:

  1. /*
  2. run_pending_task()的实现去掉了在worker_thread()函数的主循环。
  3. 函数任务队列中有任务的时候执行任务,要是没有的话就会让操作系统对线程进行重新分配。
  4. */
  5. void run_pending_task()
  6. {
  7. function_wrapper task;
  8. if (work_queue.try_pop(task))
  9. {
  10. task();
  11. }
  12. else
  13. {
  14. std::this_thread::yield();
  15. }
  16. }

4 避免队列中的任务竞争

随着处理器的增加,任务队列上就会有很多的竞争(添加任务和多线程获取任务),这会让性能下降。使用无锁队列会让任务没有明显的等待,但乒乓缓存会消耗大量的时间。
为了避免乒乓缓存,每个线程建立独立的任务队列。这样,每个线程就会将新任务放在自己的任务队列上,并且当线程上的任务队列没有任务时,去全局的任务列表中取任务

  1. class thread_pool3
  2. {
  3. private:
  4. std::atomic_bool done;
  5. threadsafe_queue<function_wrapper> work_queue; // 使用function_wrapper,而非使用std::function
  6. typedef std::queue<function_wrapper> local_queue_type;
  7. // unique_ptr指向每个线程本地(thread_local)的工作队列
  8. static thread_local std::unique_ptr<local_queue_type> local_work_queue;
  9. std::vector<std::thread> threads; // 保存pool里的工作线程
  10. // join_threads使用RAII方式,保证pool销毁前所有线程能执行结束
  11. join_threads joiner;
  12. void worker_thread()
  13. {
  14. local_work_queue.reset(new local_queue_type); // 3
  15. while (!done)
  16. {
  17. run_pending_task();
  18. }
  19. }
  20. public:
  21. thread_pool3() : done(false), joiner(threads)
  22. {
  23. // pool中线程个数使用硬件支持的最大个数
  24. unsigned const thread_count = std::thread::hardware_concurrency();
  25. try
  26. {
  27. for (unsigned i = 0; i < thread_count; ++i)
  28. {
  29. // 创建工作线程,每个线程都执行worker_thread函数,在此函数中获取任务处理
  30. threads.push_back(std::thread(&thread_pool3::worker_thread, this));
  31. }
  32. }
  33. catch (...)
  34. {
  35. done = true; // 有异常时,设置done为true
  36. throw;
  37. }
  38. }
  39. ~thread_pool3()
  40. {
  41. done = true;
  42. }
  43. // submit返回一个保存任务返回值的future,使用result_of获取函数functionType的返回类型
  44. template <typename FunctionType>
  45. std::future<typename std::result_of<FunctionType()>::type>
  46. submit(FunctionType f)
  47. {
  48. // 设置函数functionType的返回类型的别名为result_type
  49. typedef typename std::result_of<FunctionType()>::type result_type;
  50. std::packaged_task<result_type()> task(std::move(f)); // 封装一个异步任务,任务执行函数f
  51. std::future<result_type> res(task.get_future()); // 获取异步任务的future
  52. if (local_work_queue) // 检查当前线程是否具有一个工作队列,如果有则将任务放入本地队列
  53. {
  54. local_work_queue->push(std::move(task));
  55. }
  56. else
  57. {
  58. work_queue.push(std::move(task)); // 将任务添加到全局任务队列中
  59. }
  60. return res; // 返回future给submit函数的调用者
  61. }
  62. /*
  63. run_pending_task()的实现去掉了在worker_thread()函数的主循环。
  64. 函数任务队列中有任务的时候执行任务,要是没有的话就会让操作系统对线程进行重新分配。
  65. */
  66. void run_pending_task()
  67. {
  68. function_wrapper task;
  69. if (local_work_queue && !(local_work_queue->empty())) // 如果本地队列有任务,则优先处理本地队列的任务
  70. {
  71. task = std::move(local_work_queue->front());
  72. local_work_queue->pop();
  73. task();
  74. }
  75. else if (work_queue.try_pop(task)) //否则,在全局队列获取任务
  76. {
  77. task();
  78. }
  79. else
  80. {
  81. std::this_thread::yield();
  82. }
  83. }
  84. };

本地任务队列能有效的避免竞争,不过当任务分配不均时,造成的结果就是:某个线程本地队列中有很多任务的同时,其他线程无所事事。例如:举一个快速排序的例子,一开始的数据块能在线程池上被处理,因为剩余部分会放在工作线程的本地队列上进行处理,这样的使用方式也违背使用线程池的初衷。

5 从别的线程队列中窃取任务

上面任务分配不均时的困境时有解的,本地工作队列和全局工作队列上没有任务时,可从别的线程队列中窃取任务。为了让没有任务的线程能从其他线程的任务队列中获取任务,就需要本地任务列表可以被其他线程访问,这样才能让run_pending_tasks()窃取任务。需要每个线程在线程池队列上进行注册,或由线程池指定一个线程。同样,还需要保证数据队列中的任务适当的被同步和保护,这样队列的不变量就不会被破坏。

  1. /*线程池,在30线程池基础上增加功能,本篇最终版本:
  2. 1. 为每个线程建立独立的任务队列,配合全局任务队列使用
  3. 2. 支持从其他线程的本地队列窃取任务
  4. */
  5. #ifndef THREAD_POOL4
  6. #define THREAD_POOL4
  7. #include <future>
  8. #include <atomic>
  9. #include <vector>
  10. #include <queue>
  11. #include "21_threadsafe_queue2.hpp"
  12. // 统一管理线程池所有线程,负责等待线程结束和销毁
  13. class join_threads
  14. {
  15. private:
  16. std::vector<std::thread> &threads;
  17. public:
  18. explicit join_threads(std::vector<std::thread> &threads_) : threads(threads_)
  19. {
  20. }
  21. ~join_threads()
  22. {
  23. for (unsigned long i = 0; i < threads.size(); ++i)
  24. {
  25. // 析构函数中等待线程结束,RAII方式
  26. if (threads[i].joinable())
  27. threads[i].join();
  28. }
  29. }
  30. };
  31. /*std::packaged_task<>实例是不可拷贝的,仅是可移动的,所以不能再使用std::function<>来实现任务队列
  32. 因为std::function<>需要存储可复制构造的函数对象。
  33. 这里包装一个自定义函数,用来处理只可移动的类型,就是一个带有函数操作符的类型擦除类
  34. */
  35. class function_wrapper
  36. {
  37. private:
  38. struct impl_base
  39. {
  40. virtual void call() = 0;
  41. virtual ~impl_base() {}
  42. };
  43. std::unique_ptr<impl_base> impl;
  44. template <typename F>
  45. struct impl_type : impl_base
  46. {
  47. F f;
  48. impl_type(F &&f_) : f(std::move(f_)) {}
  49. void call() { f(); }
  50. };
  51. public:
  52. template <typename F>
  53. function_wrapper(F &&f) : impl(new impl_type<F>(std::move(f)))
  54. {
  55. }
  56. function_wrapper() = default;
  57. function_wrapper(function_wrapper &&other) : impl(std::move(other.impl))
  58. {
  59. }
  60. function_wrapper &operator=(function_wrapper &&other)
  61. {
  62. impl = std::move(other.impl);
  63. return *this;
  64. }
  65. void operator()() { impl->call(); }
  66. function_wrapper(const function_wrapper &) = delete;
  67. function_wrapper(function_wrapper &) = delete;
  68. function_wrapper &operator=(const function_wrapper &) = delete;
  69. };
  70. /*
  71. 基于锁的任务窃取队列,代替普通的线程安全队列
  72. 该队列支持从前端和后端获取数据,即先进先出和后进先出两种模式,后进先出可以用于其他线程窃取任务
  73. */
  74. class work_stealing_queue
  75. {
  76. private:
  77. typedef function_wrapper data_type;
  78. std::deque<data_type> the_queue; // 使用deque队列保存实际function_wrapper
  79. mutable std::mutex the_mutex; // 互斥锁用于控制对the_mutex的访问
  80. public:
  81. work_stealing_queue() {}
  82. // 拷贝和赋值构造函数不创建
  83. work_stealing_queue(const work_stealing_queue& other) = delete;
  84. work_stealing_queue& operator=(const work_stealing_queue& other) = delete;
  85. // 由互斥锁控制在队列前端插入数据
  86. void push(data_type data)
  87. {
  88. std::lock_guard<std::mutex> lock(the_mutex);
  89. the_queue.push_front(std::move(data));
  90. }
  91. bool empty() const
  92. {
  93. std::lock_guard<std::mutex> lock(the_mutex);
  94. return the_queue.empty();
  95. }
  96. // 由互斥锁控制在队列前端获取数据
  97. bool try_pop(data_type& res)
  98. {
  99. std::lock_guard<std::mutex> lock(the_mutex);
  100. if(the_queue.empty())
  101. {
  102. return false;
  103. }
  104. res=std::move(the_queue.front());
  105. the_queue.pop_front();
  106. return true;
  107. }
  108. // 由互斥锁控制在队列后端获取数据
  109. bool try_steal(data_type& res)
  110. {
  111. std::lock_guard<std::mutex> lock(the_mutex);
  112. if(the_queue.empty())
  113. {
  114. return false;
  115. }
  116. res=std::move(the_queue.back());
  117. the_queue.pop_back();
  118. return true;
  119. }
  120. };
  121. class thread_pool4
  122. {
  123. private:
  124. typedef function_wrapper task_type;
  125. // 注意这里声明的顺序,是为了保证析构时能以相反的顺序销毁(很重要)
  126. std::atomic_bool done;
  127. threadsafe_queue<task_type> pool_work_queue; // 全局任务队列
  128. std::vector<std::unique_ptr<work_stealing_queue>> queues; // 保存每个线程任务队列的全局队列
  129. // 保存pool里的工作线程
  130. std::vector<std::thread> threads;
  131. // join_threads使用RAII方式,保证pool销毁前所有线程能执行结束
  132. join_threads joiner;
  133. static thread_local work_stealing_queue* local_work_queue; // 每个线程都有一个可以窃取的任务队列,不再使用普通的队列
  134. static thread_local unsigned my_index;
  135. void worker_thread(unsigned my_index_)
  136. {
  137. my_index = my_index_;
  138. // 每个线程根据自己的序号从全局队列中获取自己的任务队列。这意味着根据index可以获取任意线程的任务队列
  139. local_work_queue = queues[my_index].get();
  140. while (!done)
  141. {
  142. run_pending_task();
  143. }
  144. }
  145. bool pop_task_from_local_queue(task_type& task)
  146. {
  147. return local_work_queue && local_work_queue->try_pop(task);
  148. }
  149. bool pop_task_from_pool_queue(task_type& task)
  150. {
  151. return pool_work_queue.try_pop(task);
  152. }
  153. bool pop_task_from_other_thread_queue(task_type& task) // 从其他线程窃取任务
  154. {
  155. for(unsigned i=0;i < queues.size();++i)
  156. {
  157. // 根据index从其他线程的任务队列后端窃取任务
  158. unsigned const index = (my_index + i + 1) % queues.size();
  159. if(queues[index]->try_steal(task))
  160. {
  161. return true;
  162. }
  163. }
  164. return false;
  165. }
  166. public:
  167. thread_pool4() : done(false), joiner(threads)
  168. {
  169. // pool中线程个数使用硬件支持的最大个数
  170. unsigned const thread_count = std::thread::hardware_concurrency();
  171. try
  172. {
  173. for (unsigned i = 0; i < thread_count; ++i)
  174. {
  175. // 当每个线程被创建,就创建了一个属于自己的工作队列,放入全局队列中
  176. queues.push_back(std::unique_ptr<work_stealing_queue>(new work_stealing_queue));
  177. // 创建工作线程,每个线程都执行worker_thread函数,在此函数中获取任务处理
  178. threads.push_back(std::thread(&thread_pool4::worker_thread, this, i));//传入参数i为my_index
  179. }
  180. }
  181. catch (...)
  182. {
  183. done = true; // 有异常时,设置done为true
  184. throw;
  185. }
  186. }
  187. ~thread_pool4()
  188. {
  189. done = true;
  190. }
  191. // submit返回一个保存任务返回值的future,使用result_of获取函数functionType的返回类型
  192. template <typename FunctionType>
  193. std::future<typename std::result_of<FunctionType()>::type>
  194. submit(FunctionType f)
  195. {
  196. // 设置函数functionType的返回类型的别名为result_type
  197. typedef typename std::result_of<FunctionType()>::type result_type;
  198. std::packaged_task<result_type()> task(std::move(f)); // 封装一个异步任务,任务执行函数f
  199. std::future<result_type> res(task.get_future()); // 获取异步任务的future
  200. if (local_work_queue) // 检查当前线程是否具有一个工作队列,如果有则将任务放入本地队列
  201. {
  202. local_work_queue->push(std::move(task)); // 线程本地任务队列
  203. }
  204. else
  205. {
  206. pool_work_queue.push(std::move(task)); // 将任务添加到全局任务队列中
  207. }
  208. return res; // 返回future给submit函数的调用者
  209. }
  210. /*
  211. run_pending_task()的实现去掉了在worker_thread()函数的主循环。
  212. 函数任务队列中有任务的时候执行任务,要是没有的话就会让操作系统对线程进行重新分配。
  213. */
  214. void run_pending_task()
  215. {
  216. task_type task;
  217. if (pop_task_from_local_queue(task) || // 第一优先级:从线程自己的队列获取任务
  218. pop_task_from_pool_queue(task) || // 第二优先级:从线程池队列获取任务
  219. pop_task_from_other_thread_queue(task)) // 最低优先级:窃取其他线程的任务
  220. {
  221. task();
  222. }
  223. else
  224. {
  225. std::this_thread::yield();
  226. }
  227. }
  228. };
  229. #endif

pop_task_from_other_thread_queue()会遍历池中所有线程的任务队列,然后尝试窃取任务。为了避免每个线程都尝试从列表中的第一个线程上窃取任务,每一个线程都会从下一个线程开始遍历,通过自身的线程序号来确定开始遍历的线程序号。

使用线程池有很多好处,还有很多的方式能为某些特殊用法提升性能。特别是还没有探究动态变换大小的线程池,即使线程被阻塞的时候(例如:I/O或互斥锁),程序都能保证CPU最优的使用率。