条件变量

条件变量通过消息对线程进行同步(需要包含<condition_variable>头文件),一个线程作为发送方,另一个线程作为接收方,接收方等待来自发送方的通知。条件变量的典型用例:发送方-接收方或生产者-消费者模式。

条件变量cv的成员函数

成员函数 函数描述
cv.notify_one() 通知一个等待中的线程
cv.notify_all() 通知所有等待中的线程
cv.wait(lock, ...) 持有std::unique_lock,并等待通知
cv.wait_for(lock, relTime, ...) 持有std::unique_lock,并在给定的时间段内等待通知
cv.wait_until(lock, absTime, ...) 持有std::unique_lock的同时,并在给定的时间点前等待通知
cv.native_handle() 返回条件变量的底层句柄

cv.notify_onecv.notify_all相比较,cv.notify_all会通知所有正在等待的线程,cv.notify_one只通知一个正在等待的线程,其他条件变量依旧保持在等待状态。介绍条件变量的详细信息之前,来看个示例。

  1. // conditionVariable.cpp
  2. #include <iostream>
  3. #include <condition_variable>
  4. #include <mutex>
  5. #include <thread>
  6. std::mutex mutex_;
  7. std::condition_variable condVar;
  8. bool dataReady{ false };
  9. void doTheWork() {
  10. std::cout << "Processing shared data." << std::endl;
  11. }
  12. void waitingForWork() {
  13. std::cout << "Worker: Waiting for work." << std::endl;
  14. std::unique_lock<std::mutex> lck(mutex_);
  15. condVar.wait(lck, [] {return dataReady; });
  16. doTheWork();
  17. std::cout << "Work done." << std::endl;
  18. }
  19. void setDataReady() {
  20. {
  21. std::lock_guard<std::mutex> lck(mutex_);
  22. dataReady = true;
  23. }
  24. std::cout << "Sender: Data is ready." << std::endl;
  25. condVar.notify_one();
  26. }
  27. int main() {
  28. std::cout << std::endl;
  29. std::thread t1(waitingForWork);
  30. std::thread t2(setDataReady);
  31. t1.join();
  32. t2.join();
  33. std::cout << std::endl;
  34. }

该程序有两个子线程:t1t2。第38行和第39行中,线程得到工作包waitingForWorksetDataReadsetDataReady使用条件变量condVar通知其他线程准备工作已经完成:condVar.notify_one()。当持有锁时,线程t1等待它的通知:condVar.wait(lck, []{ return dataReady; })。发送方和接收方需要一个锁,对于发送方,std::lock_guard就足够了,因为lockunlock只调用一次;对于接收方来说,std::unique_lock是必需的,因为它需要锁定和解锁互斥锁。

程序的输出如下:

条件变量 - 图1

std::condition_variable_any

std::condition_variable只能等待类型为std::unique_lock<mutex>的对象,但是std::condition_variable_any可以等待符合BasicLockable原则的锁类型。std::condition_variable_anystd::condition_variable支持的接口相同。

谓词

在没有谓词的情况下也可以调用wait,那么读者朋友应该很想知道,为什么调用wait需要谓词。

等待使用谓词与否都是可以的,先来看个例子。

  1. // conditionVariableBlock.cpp
  2. #include <iostream>
  3. #include <condition_variable>
  4. #include <mutex>
  5. #include <thread>
  6. std::mutex mutex_;
  7. std::condition_variable condVar;
  8. void waitingForWork() {
  9. std::cout << "Worker: Waiting for work." << std::endl;
  10. std::unique_lock<std::mutex> lck(mutex_);
  11. condVar.wait(lck);
  12. // do the work
  13. std::cout << "Work done." << std::endl;
  14. }
  15. void setDataReady() {
  16. std::cout << "Sender: Data is ready." << std::endl;
  17. condVar.notify_one();
  18. }
  19. int main() {
  20. std::cout << std::endl;
  21. std::thread t1(setDataReady);
  22. std::thread t2(waitingForWork);
  23. t1.join();
  24. t2.join();
  25. std::cout << std::endl;
  26. }

程序的第一次运行正常,但第二次阻塞是因为通知(第25行)发生在线程t2(第34行)进入等待状态(第16行)之前。

条件变量 - 图2

现在就很清楚了,谓词是无状态条件变量,所以等待过程中总是检查谓词。条件变量有两个已知有害现象:未唤醒和伪唤醒。

未唤醒和伪唤醒

未唤醒

该现象是发送方在接收方到达其等待状态之前发送通知,结果是通知丢失了。C++标准将条件变量描述为同步机制:“条件变量类是同步原语,可用于阻塞一个线程,或同时阻塞多个线程……”所以通知丢失了,接收者就会持续等待……

伪唤醒

还有一种情况,就会没有发通知,但接收方会被唤醒。使用POSIX ThreadsWindows API时,都会出现这样的现象。伪唤醒的真相,很可能是本来就没有处于休眠状态。这意味着,在被唤醒的线程有机会运行之前,另一个线程早就等候多时了。

等待线程的工作流程

等待线程的工作流程相当复杂。

下面是来自前面示例conditionVariable.cpp的19和20行。

  1. std::unique_lock<std::mutex> lck(mutex_);
  2. condVar.wait(lck, []{ return dataReady; });

上面两行与下面四行等价:

  1. std::unique_lock<std::mutex> lck(mutex_);
  2. while ( ![]{ return dataReady; }() {
  3. condVar.wait(lck);
  4. }

首先,必须区分std::unique_lock<std::mutex> lck(mutex_)的第一次调用与条件变量的通知:condVar.wait(lck)

  • std::unique_lock<std::mutex> lck(mutex_) : 初始化阶段,线程就将互斥量锁定,并对谓词函数[]{ return dataReady;}进行检查。
    • 谓词返回值:
      • true : 线程继续等待。
      • false : condVar.wait()解锁互斥量,并将线程置为等待(阻塞)状态。
  • condVar.wait(lck) : 如果condition_variable condVar处于等待状态,并获得通知或伪唤醒处于运行状态,则执行以下步骤:
    • 线程解除阻塞,重新获得互斥锁。
    • 检查谓词函数。
    • 当谓词函数返回值为:
      • true : 线程继续工作。
      • false : condVar.wait()解锁互斥量,并将线程置为等待(阻塞)状态。

即使共享变量是原子的,也必须在互斥锁保护下进行修改,以便将正确地内容告知等待的线程。

使用互斥锁来保护共享变量

即使将dataReady设置为原子变量,也必须在互斥锁的保护下进行修改;如果没有,对于等待线程来说dataReady的内容就可能是错的,此竞争条件可能导致死锁。让我们再次查看下等待的工作流,并假设deadReady是一个原子变量,在不受互斥量mutex_保护时进行修改的情况。

  1. std::unique_lock<std::mutex> lck(mutex_);
  2. while ( ![]{ return dataReady.load(); }() {
  3. // time window
  4. condVar.wait(lck);
  5. }

假设在条件变量condVar,在不处于等待状态时发送通知。这样,线程执行到第2行和第4行之间时(参见注释时间窗口)会丢失通知。之后,线程返回到等待状态,可能会永远休眠。

如果dataReady被互斥锁保护,就不会发生这种情况。由于与互斥锁能够同步线程,只有在接收线程处于等待状态的情况下才会发送通知。

大多数用例中,可以使用任务,用简单的方式同步线程。“任务-通知”章节中,将条件变量和任务进行了对比。