在大多数系统中,将每个任务指定给某个线程是不切实际的,不过可以利用现有的并发性,进行并发执行。线程池就提供了这样的功能:
- 提交到线程池中的任务将并发执行
- 提交的任务将会挂在任务队列上
- 队列中的每一个任务都会被池中的工作线程所获取
- 当任务执行完成后,线程再回到线程池中获取下一个任务
1 最简单的线程池
最简单的线程池中,线程就不需要等待其他线程完成对应任务了。任务没有返回值,不需要阻塞其他线程。
//最简单的线程池,仅用于示例
#ifndef THREAD_POOL
#define THREAD_POOL
#include <thread>
#include <atomic>
#include <vector>
#include "21_threadsafe_queue2.hpp"
class join_threads
{
private:
std::vector<std::thread>& threads;
public:
explicit join_threads(std::vector<std::thread>& threads_): threads(threads_)
{}
~join_threads()
{
for(unsigned long i=0;i<threads.size();++i)
{
// 析构函数中等待线程结束,RAII方式
if(threads[i].joinable())
threads[i].join();
}
}
};
class thread_pool
{
private:
//注意这里声明的顺序,是为了保证析构时能以相反的顺序销毁(很重要)
std::atomic_bool done;
threadsafe_queue<std::function<void()> > work_queue; // 使用线程安全队列保存任务
std::vector<std::thread> threads; // 保存pool里的工作线程
join_threads joiner; // join_threads使用RAII方式,保证pool销毁前所有线程能执行结束
void worker_thread()
{
while(!done) // 原子bool设置为true时,保证每个正在运行的线程停止
{
std::function<void()> task;
if(work_queue.try_pop(task)) // 从任务队列获取到一个任务
{
task(); // 执行任务,函数指针
}
else
{
std::this_thread::yield(); // 没有任务,当前线程放弃执行,线程休息
}
}
}
public:
thread_pool(): done(false),joiner(threads)
{
// pool中线程个数使用硬件支持的最大个数
unsigned const thread_count=std::thread::hardware_concurrency();
try
{
for(unsigned i=0;i<thread_count;++i)
{
// 创建工作线程,每个线程都执行worker_thread函数,在此函数中获取任务处理
threads.push_back(std::thread(&thread_pool::worker_thread, this));
}
}
catch(...)
{
done=true; // 有异常时,设置done为true
throw;
}
}
~thread_pool()
{
done=true; // 11
}
// submit将一个要执行的任务包装成std::function函数指针,加入任务队列
template<typename FunctionType>
void submit(FunctionType f)
{
work_queue.push(std::function<void()>(f)); // 12
}
};
#endif
2 添加功能:任务返回值
上面的任务独立运行,不需要返回值。但是一种特殊的情况是,执行任务的线程需要返回一个结果到主线程上进行处理。可以使用submit()函数提交任务并返回条件变量或future,用来等待任务的完成,返回任务返回值。
在上面例子的基础上,使用packged_task和future代替std::function,可以返回任务返回值:
//线程池,在29线程池中添加功能:可以返回每个任务的返回值
#ifndef thread_pool22
#define thread_pool22
#include <future>
#include <atomic>
#include <vector>
#include "21_threadsafe_queue2.hpp"
#include "29_thread_pool.hpp"
/*std::packaged_task<>实例是不可拷贝的,仅是可移动的,所以不能再使用std::function<>来实现任务队列
因为std::function<>需要存储可复制构造的函数对象。
这里包装一个自定义函数,用来处理只可移动的类型,就是一个带有函数操作符的类型擦除类
*/
class function_wrapper
{
private:
struct impl_base
{
virtual void call() = 0;
virtual ~impl_base() {}
};
std::unique_ptr<impl_base> impl;
template <typename F>
struct impl_type : impl_base
{
F f;
impl_type(F &&f_) : f(std::move(f_)) {}
void call() { f(); }
};
public:
template <typename F>
function_wrapper(F &&f) : impl(new impl_type<F>(std::move(f)))
{
}
function_wrapper() = default;
function_wrapper(function_wrapper &&other) : impl(std::move(other.impl))
{
}
function_wrapper &operator=(function_wrapper &&other)
{
impl = std::move(other.impl);
return *this;
}
void operator()() { impl->call(); }
function_wrapper(const function_wrapper &) = delete;
function_wrapper(function_wrapper &) = delete;
function_wrapper &operator=(const function_wrapper &) = delete;
};
class thread_pool2
{
private:
std::atomic_bool done;
threadsafe_queue<function_wrapper> work_queue; // 使用function_wrapper,而非使用std::function
std::vector<std::thread> threads; // 保存pool里的工作线程
join_threads joiner; // join_threads使用RAII方式,保证pool销毁前所有线程能执行结束
void worker_thread()
{
while (!done)
{
function_wrapper task;
if (work_queue.try_pop(task))
{
task();
}
else
{
std::this_thread::yield();
}
}
}
public:
thread_pool2() : done(false), joiner(threads)
{
// pool中线程个数使用硬件支持的最大个数
unsigned const thread_count = std::thread::hardware_concurrency();
try
{
for (unsigned i = 0; i < thread_count; ++i)
{
// 创建工作线程,每个线程都执行worker_thread函数,在此函数中获取任务处理
threads.push_back(std::thread(&thread_pool2::worker_thread, this));
}
}
catch (...)
{
done = true; // 有异常时,设置done为true
throw;
}
}
~thread_pool2()
{
done = true;
}
// submit返回一个保存任务返回值的future,使用result_of获取函数functionType的返回类型
template <typename FunctionType>
std::future<typename std::result_of<FunctionType()>::type>
submit(FunctionType f)
{
// 设置函数functionType的返回类型的别名为result_type
typedef typename std::result_of<FunctionType()>::type result_type;
std::packaged_task<result_type()> task(std::move(f)); // 封装一个异步任务,任务执行函数f
std::future<result_type> res(task.get_future()); // 获取异步任务的future
work_queue.push(std::move(task)); // 将任务添加到任务队列中
return res; // 返回future给submit函数的调用者
}
};
#endif
当选择“因为能并发执行,最小工作块值的一试”时,就需要谨慎了。向线程池提交任务有一定的开销;让工作线程执行这个任务,并且将返回值保存在std::future<>
中,对于太小的任务,这样的开销不划算。如果任务块太小,使用线程池的速度可能都不及单线程。
3 添加功能:手动触发任务执行
最简单的方法就是在thread_pool中添加一个新函数,来执行任务队列上的任务,并对线程池进行管理。高级线程池的实现可能会在等待函数中添加逻辑,或等待其他函数来处理这个任务,优先的任务会让其他的任务进行等待。下面清单中的实现,就展示了一个新run_pending_task()函数:
/*
run_pending_task()的实现去掉了在worker_thread()函数的主循环。
函数任务队列中有任务的时候执行任务,要是没有的话就会让操作系统对线程进行重新分配。
*/
void run_pending_task()
{
function_wrapper task;
if (work_queue.try_pop(task))
{
task();
}
else
{
std::this_thread::yield();
}
}
4 避免队列中的任务竞争
随着处理器的增加,任务队列上就会有很多的竞争(添加任务和多线程获取任务),这会让性能下降。使用无锁队列会让任务没有明显的等待,但乒乓缓存会消耗大量的时间。
为了避免乒乓缓存,每个线程建立独立的任务队列。这样,每个线程就会将新任务放在自己的任务队列上,并且当线程上的任务队列没有任务时,去全局的任务列表中取任务。
class thread_pool3
{
private:
std::atomic_bool done;
threadsafe_queue<function_wrapper> work_queue; // 使用function_wrapper,而非使用std::function
typedef std::queue<function_wrapper> local_queue_type;
// unique_ptr指向每个线程本地(thread_local)的工作队列
static thread_local std::unique_ptr<local_queue_type> local_work_queue;
std::vector<std::thread> threads; // 保存pool里的工作线程
// join_threads使用RAII方式,保证pool销毁前所有线程能执行结束
join_threads joiner;
void worker_thread()
{
local_work_queue.reset(new local_queue_type); // 3
while (!done)
{
run_pending_task();
}
}
public:
thread_pool3() : done(false), joiner(threads)
{
// pool中线程个数使用硬件支持的最大个数
unsigned const thread_count = std::thread::hardware_concurrency();
try
{
for (unsigned i = 0; i < thread_count; ++i)
{
// 创建工作线程,每个线程都执行worker_thread函数,在此函数中获取任务处理
threads.push_back(std::thread(&thread_pool3::worker_thread, this));
}
}
catch (...)
{
done = true; // 有异常时,设置done为true
throw;
}
}
~thread_pool3()
{
done = true;
}
// submit返回一个保存任务返回值的future,使用result_of获取函数functionType的返回类型
template <typename FunctionType>
std::future<typename std::result_of<FunctionType()>::type>
submit(FunctionType f)
{
// 设置函数functionType的返回类型的别名为result_type
typedef typename std::result_of<FunctionType()>::type result_type;
std::packaged_task<result_type()> task(std::move(f)); // 封装一个异步任务,任务执行函数f
std::future<result_type> res(task.get_future()); // 获取异步任务的future
if (local_work_queue) // 检查当前线程是否具有一个工作队列,如果有则将任务放入本地队列
{
local_work_queue->push(std::move(task));
}
else
{
work_queue.push(std::move(task)); // 将任务添加到全局任务队列中
}
return res; // 返回future给submit函数的调用者
}
/*
run_pending_task()的实现去掉了在worker_thread()函数的主循环。
函数任务队列中有任务的时候执行任务,要是没有的话就会让操作系统对线程进行重新分配。
*/
void run_pending_task()
{
function_wrapper task;
if (local_work_queue && !(local_work_queue->empty())) // 如果本地队列有任务,则优先处理本地队列的任务
{
task = std::move(local_work_queue->front());
local_work_queue->pop();
task();
}
else if (work_queue.try_pop(task)) //否则,在全局队列获取任务
{
task();
}
else
{
std::this_thread::yield();
}
}
};
本地任务队列能有效的避免竞争,不过当任务分配不均时,造成的结果就是:某个线程本地队列中有很多任务的同时,其他线程无所事事。例如:举一个快速排序的例子,一开始的数据块能在线程池上被处理,因为剩余部分会放在工作线程的本地队列上进行处理,这样的使用方式也违背使用线程池的初衷。
5 从别的线程队列中窃取任务
上面任务分配不均时的困境时有解的,本地工作队列和全局工作队列上没有任务时,可从别的线程队列中窃取任务。为了让没有任务的线程能从其他线程的任务队列中获取任务,就需要本地任务列表可以被其他线程访问,这样才能让run_pending_tasks()窃取任务。需要每个线程在线程池队列上进行注册,或由线程池指定一个线程。同样,还需要保证数据队列中的任务适当的被同步和保护,这样队列的不变量就不会被破坏。
/*线程池,在30线程池基础上增加功能,本篇最终版本:
1. 为每个线程建立独立的任务队列,配合全局任务队列使用
2. 支持从其他线程的本地队列窃取任务
*/
#ifndef THREAD_POOL4
#define THREAD_POOL4
#include <future>
#include <atomic>
#include <vector>
#include <queue>
#include "21_threadsafe_queue2.hpp"
// 统一管理线程池所有线程,负责等待线程结束和销毁
class join_threads
{
private:
std::vector<std::thread> &threads;
public:
explicit join_threads(std::vector<std::thread> &threads_) : threads(threads_)
{
}
~join_threads()
{
for (unsigned long i = 0; i < threads.size(); ++i)
{
// 析构函数中等待线程结束,RAII方式
if (threads[i].joinable())
threads[i].join();
}
}
};
/*std::packaged_task<>实例是不可拷贝的,仅是可移动的,所以不能再使用std::function<>来实现任务队列
因为std::function<>需要存储可复制构造的函数对象。
这里包装一个自定义函数,用来处理只可移动的类型,就是一个带有函数操作符的类型擦除类
*/
class function_wrapper
{
private:
struct impl_base
{
virtual void call() = 0;
virtual ~impl_base() {}
};
std::unique_ptr<impl_base> impl;
template <typename F>
struct impl_type : impl_base
{
F f;
impl_type(F &&f_) : f(std::move(f_)) {}
void call() { f(); }
};
public:
template <typename F>
function_wrapper(F &&f) : impl(new impl_type<F>(std::move(f)))
{
}
function_wrapper() = default;
function_wrapper(function_wrapper &&other) : impl(std::move(other.impl))
{
}
function_wrapper &operator=(function_wrapper &&other)
{
impl = std::move(other.impl);
return *this;
}
void operator()() { impl->call(); }
function_wrapper(const function_wrapper &) = delete;
function_wrapper(function_wrapper &) = delete;
function_wrapper &operator=(const function_wrapper &) = delete;
};
/*
基于锁的任务窃取队列,代替普通的线程安全队列
该队列支持从前端和后端获取数据,即先进先出和后进先出两种模式,后进先出可以用于其他线程窃取任务
*/
class work_stealing_queue
{
private:
typedef function_wrapper data_type;
std::deque<data_type> the_queue; // 使用deque队列保存实际function_wrapper
mutable std::mutex the_mutex; // 互斥锁用于控制对the_mutex的访问
public:
work_stealing_queue() {}
// 拷贝和赋值构造函数不创建
work_stealing_queue(const work_stealing_queue& other) = delete;
work_stealing_queue& operator=(const work_stealing_queue& other) = delete;
// 由互斥锁控制在队列前端插入数据
void push(data_type data)
{
std::lock_guard<std::mutex> lock(the_mutex);
the_queue.push_front(std::move(data));
}
bool empty() const
{
std::lock_guard<std::mutex> lock(the_mutex);
return the_queue.empty();
}
// 由互斥锁控制在队列前端获取数据
bool try_pop(data_type& res)
{
std::lock_guard<std::mutex> lock(the_mutex);
if(the_queue.empty())
{
return false;
}
res=std::move(the_queue.front());
the_queue.pop_front();
return true;
}
// 由互斥锁控制在队列后端获取数据
bool try_steal(data_type& res)
{
std::lock_guard<std::mutex> lock(the_mutex);
if(the_queue.empty())
{
return false;
}
res=std::move(the_queue.back());
the_queue.pop_back();
return true;
}
};
class thread_pool4
{
private:
typedef function_wrapper task_type;
// 注意这里声明的顺序,是为了保证析构时能以相反的顺序销毁(很重要)
std::atomic_bool done;
threadsafe_queue<task_type> pool_work_queue; // 全局任务队列
std::vector<std::unique_ptr<work_stealing_queue>> queues; // 保存每个线程任务队列的全局队列
// 保存pool里的工作线程
std::vector<std::thread> threads;
// join_threads使用RAII方式,保证pool销毁前所有线程能执行结束
join_threads joiner;
static thread_local work_stealing_queue* local_work_queue; // 每个线程都有一个可以窃取的任务队列,不再使用普通的队列
static thread_local unsigned my_index;
void worker_thread(unsigned my_index_)
{
my_index = my_index_;
// 每个线程根据自己的序号从全局队列中获取自己的任务队列。这意味着根据index可以获取任意线程的任务队列
local_work_queue = queues[my_index].get();
while (!done)
{
run_pending_task();
}
}
bool pop_task_from_local_queue(task_type& task)
{
return local_work_queue && local_work_queue->try_pop(task);
}
bool pop_task_from_pool_queue(task_type& task)
{
return pool_work_queue.try_pop(task);
}
bool pop_task_from_other_thread_queue(task_type& task) // 从其他线程窃取任务
{
for(unsigned i=0;i < queues.size();++i)
{
// 根据index从其他线程的任务队列后端窃取任务
unsigned const index = (my_index + i + 1) % queues.size();
if(queues[index]->try_steal(task))
{
return true;
}
}
return false;
}
public:
thread_pool4() : done(false), joiner(threads)
{
// pool中线程个数使用硬件支持的最大个数
unsigned const thread_count = std::thread::hardware_concurrency();
try
{
for (unsigned i = 0; i < thread_count; ++i)
{
// 当每个线程被创建,就创建了一个属于自己的工作队列,放入全局队列中
queues.push_back(std::unique_ptr<work_stealing_queue>(new work_stealing_queue));
// 创建工作线程,每个线程都执行worker_thread函数,在此函数中获取任务处理
threads.push_back(std::thread(&thread_pool4::worker_thread, this, i));//传入参数i为my_index
}
}
catch (...)
{
done = true; // 有异常时,设置done为true
throw;
}
}
~thread_pool4()
{
done = true;
}
// submit返回一个保存任务返回值的future,使用result_of获取函数functionType的返回类型
template <typename FunctionType>
std::future<typename std::result_of<FunctionType()>::type>
submit(FunctionType f)
{
// 设置函数functionType的返回类型的别名为result_type
typedef typename std::result_of<FunctionType()>::type result_type;
std::packaged_task<result_type()> task(std::move(f)); // 封装一个异步任务,任务执行函数f
std::future<result_type> res(task.get_future()); // 获取异步任务的future
if (local_work_queue) // 检查当前线程是否具有一个工作队列,如果有则将任务放入本地队列
{
local_work_queue->push(std::move(task)); // 线程本地任务队列
}
else
{
pool_work_queue.push(std::move(task)); // 将任务添加到全局任务队列中
}
return res; // 返回future给submit函数的调用者
}
/*
run_pending_task()的实现去掉了在worker_thread()函数的主循环。
函数任务队列中有任务的时候执行任务,要是没有的话就会让操作系统对线程进行重新分配。
*/
void run_pending_task()
{
task_type task;
if (pop_task_from_local_queue(task) || // 第一优先级:从线程自己的队列获取任务
pop_task_from_pool_queue(task) || // 第二优先级:从线程池队列获取任务
pop_task_from_other_thread_queue(task)) // 最低优先级:窃取其他线程的任务
{
task();
}
else
{
std::this_thread::yield();
}
}
};
#endif
pop_task_from_other_thread_queue()会遍历池中所有线程的任务队列,然后尝试窃取任务。为了避免每个线程都尝试从列表中的第一个线程上窃取任务,每一个线程都会从下一个线程开始遍历,通过自身的线程序号来确定开始遍历的线程序号。
使用线程池有很多好处,还有很多的方式能为某些特殊用法提升性能。特别是还没有探究动态变换大小的线程池,即使线程被阻塞的时候(例如:I/O或互斥锁),程序都能保证CPU最优的使用率。