门闩和栅栏

门闩和栅栏是比较简单的线程同步机制,其能使一些线程阻塞,直到计数器变为零时解除阻塞。首先,不要把栅栏和内存栅栏混为一谈。C++ 20/23中,我们假设有三种门闩和栅栏:std::latchstd::barrierstd::flex_barrier

首先,要回答两个问题:

  1. 这三种同步线程的机制有什么不同?std::latch只能使用一次,但是std::barrierstd::flex_barrier可以使用多次。此外,std::flex_barrier允许计数器变为0时执行一个函数。
  2. 哪些支持的门闩和栅栏的用例,在C++11和C++14中无法通过future、线程或条件变量与锁结合来实现呢?门闩和栅栏并不涉及新的用例,但它们使用起来要容易得多。通常是在内部使用无锁机制,所以它们还具有更高的性能。

std::latch

std::latch门闩是一个倒计时器,该值可以在构造函数中设置。门闩可以通过使用latch.count_down_and_wait来减小计数,并阻塞线程,直到计数器变为0。另外,latch.count_down(n)可以将计数器减少n,而不进行阻塞。如果没有给出参数,n默认为1。门闩也有latch.is_ready可以用来检查计数器是否为零,以及latch.wait会阻塞线程,直到计数器变为零。std::latch的计数器不能增加或重置,因此不能复用。

下面是来自N4204提案的一个简短代码段。

  1. void DoWork(threadpool *pool){
  2. latch completion_latch(NTASKS);
  3. for (int i = 0; i < NTASKS; ++i){
  4. pool->add_task([&]{
  5. // perform work
  6. ...
  7. completion_latch.count_down();
  8. });
  9. }
  10. // Block until work is done
  11. completion_latch.wait();
  12. }

std::latch completion_latch在其构造函数中将计数器设置为NTASKS (第2行),线程池执行NTASKS(第4 - 8行)个任务。每个任务结束时(第7行),计数器递减。第11行是运行DoWork函数的线程,以及工作流的栅栏。这样,线程就会阻塞,直到所有任务都完成。

std::barrierstd::latch非常相似。

std::barrier

std::latchstd::barrier之间的区别是,std::barrier计数器可以重置,所以可以多次地使用。计数器变为零之后,立即进入完成阶段。与std::flex_barrier有关,std::barrier有一个空的完成阶段。std::barrier有两个有趣的成员函数:std::arrive_and_waitstd::arrive_and_drop。当std::arrive_and_wait在同步点阻塞时,std::arrive_and_drop会从相关线程集中,删除自己的线程。未指定此函数是否阻塞,直到完成阶段结束。这里没有对函数块进行指定,是否到完成阶段才算结束。

N4204提案

该建议使用vector<thread*>,并将动态分配的线程推给vector:workers.push_back(new thread([&]{ ... }))。这会产生内存泄漏。应该将线程放到std::unique_ptr中,或者直接在vector中进行创建: workers.emplace_back([&]{ ... }),这个适用于std::barrierstd::flex_barrier。本例中使用std::flex_barrier的名称有点迷,例如:std::flex_barrier被称为notifying_barrier。所以我把名字改成flex_barrier,会更容易理解一些。此外,代表线程数量的n_threads没有初始化,我把它初始化为NTASKS。

深入研究std::flex_barrier和完成阶段之前,这里给出一个简短的示例,演示std::barrier的用法。

std::barrier

  1. void DoWork(){
  2. Tasks& tasks;
  3. int n_threads{NTASKS};
  4. vector<thread*> workes;
  5. barrier task_barrier(n_threads);
  6. for (int i = 0; i < n_threads; ++i){
  7. workers.push_back(new thread([&]{
  8. bool active = ture;
  9. while(active){
  10. Task task = tasks.get();
  11. // perform task
  12. ...
  13. task_barrier.arrive_and_wait();
  14. }
  15. });
  16. }
  17. // Read each stage of the task until all stages are complete.
  18. while(!finished()){
  19. GetNextStage(tasks);
  20. }
  21. }

第6行中的barrier用于协调多个执行线程,线程的数量是n_threads(第3行),每个线程通过tasks.get()获取(第12行中)任务,执行该任务并阻塞(第15行),直到所有线程完成其任务为止。之后,在第12行接受一个新任务,active在第11行返回true。

std::barrier不同,std::flex_barrier多一个构造函数。

std::flex_barrier

此构造函数接受在完成阶段调用可调用单元。可调用单元必须返回一个数字,使用这个数字设置计数器的值,返回-1意味着计数器在下一次迭代中保持相同的计数器值,而小于-1的数字是不允许的。

完成阶段会执行以下步骤:

  1. 阻塞全部线程
  2. 任意个线程解除阻塞,并执行可调用单元。
  3. 如果完成阶段已经完成,那么所有线程都将解除阻塞。

下面的段代码展示了std::flex_barrier的用法

  1. void DoWork(){
  2. Tasks& tasks;
  3. int initial_threads;
  4. int n_threads{NTASKS};
  5. atomic<int> current_threads(initial_threads);
  6. vector<thread*> workers;
  7. // Create a flex_barrier, and set a lambda that will be
  8. // invoked every time the barrier counts down. If one or more
  9. // active threads have completed, reduce the number of threads.
  10. std::function rf = [&]{return current_threads;};
  11. flex_barrier task_barrier(n_threads, rf);
  12. for (int i = 0; i < n_threads; ++i){
  13. workers.push_back(new thread([&]{
  14. bool active = true;
  15. while(active) {
  16. Task task = tasks.get();
  17. // perform task
  18. ...
  19. if (finished(task)){
  20. current_threads--;
  21. active = false;
  22. }
  23. task_barrier.arrive_and_wait();
  24. }
  25. }));
  26. }
  27. // Read each stage of the task until all stages are cpmplete.
  28. while(!finished()){
  29. GetNextStage(tasks);
  30. }
  31. }

这个例子采用了与std::barrier类似的策略,不同的是这次std::flex_barrier计数器是在运行时进行调整,所以std::flex_barrier task_barrier在第11行获得一个Lambda函数。这个Lambda函数通过引用获取变量current_thread:[&] { return current_threads; }。变量在第21行进行递减,如果线程完成了任务,则将active设置为false。因此,计数器在完成阶段是递减的。

std::barrierstd::latch相比,std::flex_barrier可以增加计数器。

可以在cppreference.com上阅读关于std::latchstd::barrierstd::flex_barrier的更多细节。