门闩和栅栏
门闩和栅栏是比较简单的线程同步机制,其能使一些线程阻塞,直到计数器变为零时解除阻塞。首先,不要把栅栏和内存栅栏混为一谈。C++ 20/23中,我们假设有三种门闩和栅栏:std::latch
、std::barrier
和std::flex_barrier
。
首先,要回答两个问题:
- 这三种同步线程的机制有什么不同?
std::latch
只能使用一次,但是std::barrier
和std::flex_barrier
可以使用多次。此外,std::flex_barrier
允许计数器变为0时执行一个函数。 - 哪些支持的门闩和栅栏的用例,在C++11和C++14中无法通过future、线程或条件变量与锁结合来实现呢?门闩和栅栏并不涉及新的用例,但它们使用起来要容易得多。通常是在内部使用无锁机制,所以它们还具有更高的性能。
std::latch
std::latch
门闩是一个倒计时器,该值可以在构造函数中设置。门闩可以通过使用latch.count_down_and_wait
来减小计数,并阻塞线程,直到计数器变为0。另外,latch.count_down(n)
可以将计数器减少n,而不进行阻塞。如果没有给出参数,n默认为1。门闩也有latch.is_ready
可以用来检查计数器是否为零,以及latch.wait
会阻塞线程,直到计数器变为零。std::latch
的计数器不能增加或重置,因此不能复用。
下面是来自N4204提案的一个简短代码段。
void DoWork(threadpool *pool){
latch completion_latch(NTASKS);
for (int i = 0; i < NTASKS; ++i){
pool->add_task([&]{
// perform work
...
completion_latch.count_down();
});
}
// Block until work is done
completion_latch.wait();
}
std::latch completion_latch
在其构造函数中将计数器设置为NTASKS (第2行),线程池执行NTASKS(第4 - 8行)个任务。每个任务结束时(第7行),计数器递减。第11行是运行DoWork函数的线程,以及工作流的栅栏。这样,线程就会阻塞,直到所有任务都完成。
std::barrier
与std::latch
非常相似。
std::barrier
std::latch
和std::barrier
之间的区别是,std::barrier
计数器可以重置,所以可以多次地使用。计数器变为零之后,立即进入完成阶段。与std::flex_barrier
有关,std::barrier
有一个空的完成阶段。std::barrier
有两个有趣的成员函数:std::arrive_and_wait
和std::arrive_and_drop
。当std::arrive_and_wait
在同步点阻塞时,std::arrive_and_drop
会从相关线程集中,删除自己的线程。未指定此函数是否阻塞,直到完成阶段结束。这里没有对函数块进行指定,是否到完成阶段才算结束。
N4204提案
该建议使用
vector<thread*>
,并将动态分配的线程推给vector:workers.push_back(new thread([&]{ ... }))
。这会产生内存泄漏。应该将线程放到std::unique_ptr
中,或者直接在vector中进行创建:workers.emplace_back([&]{ ... })
,这个适用于std::barrier
和std::flex_barrier
。本例中使用std::flex_barrier
的名称有点迷,例如:std::flex_barrier
被称为notifying_barrier
。所以我把名字改成flex_barrier
,会更容易理解一些。此外,代表线程数量的n_threads
没有初始化,我把它初始化为NTASKS。
深入研究std::flex_barrier
和完成阶段之前,这里给出一个简短的示例,演示std::barrier
的用法。
std::barrier
void DoWork(){
Tasks& tasks;
int n_threads{NTASKS};
vector<thread*> workes;
barrier task_barrier(n_threads);
for (int i = 0; i < n_threads; ++i){
workers.push_back(new thread([&]{
bool active = ture;
while(active){
Task task = tasks.get();
// perform task
...
task_barrier.arrive_and_wait();
}
});
}
// Read each stage of the task until all stages are complete.
while(!finished()){
GetNextStage(tasks);
}
}
第6行中的barrier
用于协调多个执行线程,线程的数量是n_threads
(第3行),每个线程通过tasks.get()
获取(第12行中)任务,执行该任务并阻塞(第15行),直到所有线程完成其任务为止。之后,在第12行接受一个新任务,active
在第11行返回true。
与std::barrier
不同,std::flex_barrier
多一个构造函数。
std::flex_barrier
此构造函数接受在完成阶段调用可调用单元。可调用单元必须返回一个数字,使用这个数字设置计数器的值,返回-1意味着计数器在下一次迭代中保持相同的计数器值,而小于-1的数字是不允许的。
完成阶段会执行以下步骤:
- 阻塞全部线程
- 任意个线程解除阻塞,并执行可调用单元。
- 如果完成阶段已经完成,那么所有线程都将解除阻塞。
下面的段代码展示了std::flex_barrier
的用法
void DoWork(){
Tasks& tasks;
int initial_threads;
int n_threads{NTASKS};
atomic<int> current_threads(initial_threads);
vector<thread*> workers;
// Create a flex_barrier, and set a lambda that will be
// invoked every time the barrier counts down. If one or more
// active threads have completed, reduce the number of threads.
std::function rf = [&]{return current_threads;};
flex_barrier task_barrier(n_threads, rf);
for (int i = 0; i < n_threads; ++i){
workers.push_back(new thread([&]{
bool active = true;
while(active) {
Task task = tasks.get();
// perform task
...
if (finished(task)){
current_threads--;
active = false;
}
task_barrier.arrive_and_wait();
}
}));
}
// Read each stage of the task until all stages are cpmplete.
while(!finished()){
GetNextStage(tasks);
}
}
这个例子采用了与std::barrier
类似的策略,不同的是这次std::flex_barrier
计数器是在运行时进行调整,所以std::flex_barrier task_barrier
在第11行获得一个Lambda函数。这个Lambda函数通过引用获取变量current_thread:[&] { return current_threads; }
。变量在第21行进行递减,如果线程完成了任务,则将active
设置为false。因此,计数器在完成阶段是递减的。
与std::barrier
或std::latch
相比,std::flex_barrier
可以增加计数器。
可以在cppreference.com上阅读关于std::latch、std::barrier、std::flex_barrier的更多细节。