1 条件变量condition_variable

当一个线程等待另一个线程完成任务时,它会有很多选择:

  1. 可以持续的检查共享数据标志(用于做保护工作的互斥量),直到另一线程完成工作时对这个标志进行重设。
  2. 在等待线程在检查间隙,使用std::this_thread::sleep_for()进行周期性的间歇: ```cpp bool flag; std::mutex m;

void wait_for_flag() { std::unique_lock lk(m); while(!flag) { lk.unlock(); // 1 解锁互斥量 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 2 线程休眠100ms lk.lock(); // 3 再锁互斥量 } }

  1. 3. **最优选择**:使用C++标准库提供的工具**条件变量**去等待事件的发生。
  2. C++标准库对条件变量有两套实现:`std::condition_variable``std::condition_variable_any`。这两个实现都包含在`<``condition_variable``>`头文件的声明中。两者都需要与一个互斥量一起才能工作(互斥量是为了同步);前者仅限于与`std::mutex`一起工作,而后者可以和任何满足最低标准的互斥量一起工作,从而加上了*_any*的后缀。因为`std::condition_variable_any`更加通用,这就可能从体积、性能,以及系统资源的使用方面产生额外的开销,所以`std::condition_variable`一般作为首选的类型,当对灵活性有硬性要求时,我们才会去考虑`std::condition_variable_any`
  3. 示例如下:
  4. ```cpp
  5. #include <iostream>
  6. #include <string>
  7. #include <thread>
  8. #include <mutex>
  9. #include <condition_variable>
  10. std::mutex m;
  11. std::condition_variable cv;//定义条件变量用于同步
  12. std::string data;
  13. bool ready = false;
  14. bool processed = false;
  15. void worker_thread()
  16. {
  17. // 等待直至 main() 发送数据
  18. std::unique_lock<std::mutex> lk(m);
  19. //条件变量设置监控对象和等待条件lambda表达式,条件为ready变为true,否则阻塞本线程一直等待
  20. cv.wait(lk, [] {return ready; });
  21. // 等待后,我们占有锁。
  22. std::cout << "Worker thread is processing data\n";
  23. data += " after processing";
  24. // 发送数据回 main()
  25. processed = true;
  26. std::cout << "Worker thread signals data processing completed\n";
  27. // 通知前完成手动解锁,以避免等待线程才被唤醒就阻塞
  28. lk.unlock();
  29. cv.notify_one();//通知一个等待的线程,显然这里是通知主线程
  30. }
  31. int main()
  32. {
  33. std::thread worker(worker_thread);
  34. data = "Example data";
  35. // 发送数据到 worker 线程
  36. {
  37. std::lock_guard<std::mutex> lk(m);
  38. ready = true;
  39. std::cout << "main() signals data ready for processing\n";
  40. }
  41. cv.notify_one();//ready已经设置好,通知子线程
  42. // 等候 worker
  43. {
  44. std::unique_lock<std::mutex> lk(m);
  45. cv.wait(lk, [] {return processed; });//主线程设置条件变量,等到子线程将processed改为true
  46. }
  47. std::cout << "Back in main(), data = " << data << '\n';
  48. worker.join();
  49. }
  • wait()会去检查这些条件(通过调用所提供的lambda函数),当条件满足(lambda函数返回true)时返回。如果条件不满足(lambda函数返回false),wait()函数将解锁互斥量,并且将这个线程置于阻塞或等待状态
  • 当准备数据的线程调用notify_one()或notify_all()通知条件变量时,处理数据的线程从睡眠状态中苏醒,重新获取互斥锁,并且再次检查条件是否满足。在条件满足的情况下,从wait()返回并继续持有锁;当条件不满足时,线程将对互斥量解锁,并且重新开始等待。
  • 推荐使用std::unique_lock,等待中的线程必须在等待期间解锁互斥量,并在这之后对互斥量再次上锁,而std::lock_guard没有这么灵活。

2 期望值future**

C++标准库模型提供了访问异步操作结果的机制。当线程需要等待特定的一次性事件时,某种程度上来说就需要知道这个事件在未来的期望结果。之后,这个线程会周期性(较短的周期)的等待或检查,事件是否触发;检查期间也会执行其他任务。

C++标准库中,有两种期望值,使用两种类型模板实现,声明在<future>头文件中:

  • 唯一期望值(std::future<>):只能与一个指定事件相关联
  • 共享期望值(std::shared_future<>):关联多个事件。所有实例会在同时变为就绪状态,并且他们可以访问与事件相关的任何数据。

std::future有局限性。很多线程在等待的时候,只有一个线程能获取等待结果。当多个线程需要等待相同的事件的结果,就需要使用std::shared_future来替代std::future了。

2.1 async与thread返回值

默认std::thread 执行的任务不能有返回值,不过可以使用期望值来传递返回值。
当不着急要任务结果时,可以使用std::async启动一个异步任务。与std::thread对象等待的方式不同,std::async会返回一个std::future对象,这个对象持有最终计算出来的结果。当需要这个值时,只需要调用这个对象的get()成员函数;并且会阻塞线程直到期望值状态为就绪为止;之后,返回计算结果。

  1. #include <future>
  2. #include <iostream>
  3. int find_the_answer(int a, int b)
  4. {
  5. return a + b;
  6. }
  7. void do_other_stuff()
  8. {
  9. std::cout << "main thread do something" << std::endl;
  10. }
  11. int main()
  12. {
  13. // 来自 async() 的 future,异步任务
  14. std::future<int> the_answer = std::async(find_the_answer, 1, 2);
  15. do_other_stuff();
  16. std::cout << "The answer is " << the_answer.get() << std::endl;//返回值通过future get()方法获得
  17. }

在函数调用之前向std::async传递一个额外参数,这个参数的类型可以是:

  • std::launch::async 表明函数必须在其所在的独立线程上执行
  • std::launch::defered,表明函数调用被延迟到wait()或get()函数调用时才执行
    1. auto f6=std::async(std::launch::async,Y(),1.2); // 在新线程上执行
    2. auto f7=std::async(std::launch::deferred,baz,std::ref(x)); // 在wait()或get()调用时执行
    3. auto f8=std::async(
    4. std::launch::deferred | std::launch::async,
    5. baz,std::ref(x)); // 实现选择执行方式

2.2 packaged_task与期望值

std::packaged_task<>对一个函数或可调用对象,使得能异步调用它,并绑定一个期望值。当调用std::packaged_task<> 对象时,它就会调用相关函数或可调用对象,将期望状态置为就绪,返回值也会被存储。

  • 当构造出一个std::packaged_task<>实例时,就必须传入一个函数或可调用对象;这个函数或可调用的对象,需要能接收指定的参数和返回可转换为指定返回类型的值。
  • 函数签名的返回类型可以用来标识从get_future()返回的std::future<>的类型,而函数签名的参数列表,可用来指定packaged_task的函数调用操作符。

packaged_task的三种具体的初始化和运行方式如下:

  1. #include <future>
  2. #include <iostream>
  3. #include <thread>
  4. int find_the_answer(int a, int b)
  5. {
  6. return a + b;
  7. }
  8. int main()
  9. {
  10. //2.1传入函数,在调用task时传入参数
  11. std::packaged_task<int(int, int)> task1(find_the_answer); //传入函数,模板中指定返回值和输入参数的类型
  12. std::future<int> f1 = task1.get_future(); // 获取 future
  13. task1(2, 3);//直接运行task,同时传入内部函数需要参数
  14. std::cout << "The task1 answer is " << f1.get() << std::endl;//打印返回值
  15. //2.2 使用std::thread初始化时为task输入参数
  16. std::packaged_task<int(int, int)> task2(find_the_answer); //传入函数,模板中指定返回值和输入参数的类型
  17. std::future<int> f2 = task2.get_future();
  18. std::thread task2_th(std::move(task2), 3, 4);//传入参数
  19. task2_th.join();
  20. std::cout << "The task1 answer is " << f2.get() << std::endl;
  21. //2.3 使用std::bind包装函数和输入参数
  22. std::packaged_task<int()> task3(std::bind(find_the_answer, 4, 5));//模板中只需要指定返回值类型
  23. std::future<int> f3 = task3.get_future();
  24. task3();//直接运行task,不再需要传递参数
  25. std::cout << "The task1 answer is " << f3.get() << std::endl;
  26. }

2.3 promise与期望值

std::promise<T>提供设定值的方式(类型为T),这个类型会和后面看到的std::future<T> 对象相关联。一对std::promise/std::future会为这种方式提供一个可行的机制;期望值可以阻塞等待线程,同时,提供数据的线程可以使用组合中的承诺值来对相关值进行设置,并将期望值的状态置为“就绪”。

  • 可以通过一个给定的std::promise的get_future()成员函数来获取与之相关的std::future对象,跟std::packaged_task的用法类似。
  • 当承诺值已经设置完毕(使用set_value()成员函数),对应期望值的状态变为“就绪”,并且可用于检索已存储的值。
  • 当在设置值之前销毁std::promise,将会存储一个异常。
  1. #include <future>
  2. #include <iostream>
  3. #include <thread>
  4. void calculate_sum(int a, int b, std::promise<int> p)
  5. {
  6. int sum = a + b;
  7. p.set_value(sum);//设置承诺值,通知future
  8. }
  9. void main()
  10. {
  11. //使用std::promise和future作为线程间信号
  12. std::promise<int> sum_promise;
  13. std::future<int> promise_future = sum_promise.get_future();
  14. std::thread promise_th(calculate_sum, 6, 7, std::move(sum_promise));//构建线程,传入函数,参数和promise对象
  15. promise_th.join();
  16. // // future::get() 将等待直至该 future 拥有合法结果并取得它, 无需在 get() 前调用 wait()
  17. std::cout << "The promise_th answer is " << promise_future.get() << std::endl;
  18. }

2.4 异常保存到期望值中

三种future使用方式中抛出异常的处理:

  • 函数作为std::async的一部分时,当调用抛出一个异常时,这个异常就会存储到期望值中,之后期望值的状态被置为“就绪”,之后调用get()会抛出这个已存储异常(注意:标准级别没有指定重新抛出的这个异常是原始的异常对象,还是一个拷贝;不同的编译器和库将会在这方面做出不同的选择)。
  • 将函数打包入std::packaged_task任务包中后,到任务被调用时,同样的事情也会发生;打包函数抛出一个异常,这个异常将被存储在期望值中,准备在get()调用时再次抛出。
  • 通过函数的显式调用,std::promise也能提供同样的功能。当存入的是一个异常而非一个数值时,就需要调用set_exception()成员函数,而非set_value()。

另外,在没有调用承诺值上的任何设置函数前,或正在调用包装好的任务时,销毁与std::promisestd::packaged_task相关的期望值对象。任何情况下,当期望值的状态还不是“就绪”时,调用std::promisestd::packaged_task的析构函数,将会存储一个与std::future_errc::broken_promise错误状态相关的std::future_error异常。

下面是一个简单的future携带异常信息的示例:

  1. #include <iostream>
  2. #include <future>
  3. double square_root(double x)
  4. {
  5. if (x < 0)
  6. {
  7. throw std::out_of_range("x < 0");
  8. }
  9. return sqrt(x);
  10. }
  11. int main()
  12. {
  13. std::future<double> f = std::async(square_root, -1);
  14. try
  15. {
  16. //调用get()函数,如果没有问题,则返回函数值;如果存在异常,则future会抛出保存到异常,被下面的catch捕获到
  17. double y = f.get();
  18. }
  19. catch (const std::exception& e)
  20. {
  21. std::cout << e.what() << std::endl;//打印结果:"x < 0"
  22. }
  23. return 0;
  24. }

相较于串行算法,并行算法常会格外要求注意异常问题。当一个操作在串行算法中抛出一个异常,算法只需要考虑对其本身进行处理,以避免资源泄露和损坏不变量;这里允许异常传递给调用者,由调用者对异常进行处理。在并行算法中很多操作要运行在独立的线程上。这种情况下,异常就不再允许被传播,因为这将会使调用堆栈出现问题。

2.5 shared_future

std::future可以处理所有在线程间数据转移的同步,但是调用某一特殊std::future对象的成员函数,就会让这个线程的数据和其他线程的数据不同步。多线程在没有额外同步的情况下,访问一个独立的std::future对象时,就会有数据竞争和未定义的行为。这是因为std::future模型独享同步结果的所有权,并且通过调用get()函数,一次性的获取数据,这就让并发访问变的毫无意义——只有一个线程可以获取结果值,因为在第一次调用get()后,就没有值可以再获取了

std::shared_future可以来帮你解决。因为std::future是只移动的,所以其所有权可以在不同的实例中互相传递,但是只有一个实例可以获得特定的同步结果,而std::shared_future实例是可拷贝的,所以多个对象可以引用同一关联期望值的结果。
优先使用的办法:为了替代只有一个拷贝对象的情况,可以让每个线程都拥有自己对应的拷贝对象。这样,当每个线程都通过自己拥有的std::shared_future对象获取结果,那么多个线程访问共享同步结果就是安全的。
4 条件变量与future - 图1

shared_future的构造需要基于已有的future:

  1. //方法1,std::move将future的所有权转移给shared_future
  2. std::promise<int> p;
  3. std::future<int> f(p.get_future());
  4. assert(f.valid()); // 期望值 f 是合法的
  5. std::shared_future<int> sf(std::move(f));
  6. assert(!f.valid()); // 期望值 f 现在是不合法的
  7. assert(sf.valid()); // sf 现在是合法的
  8. //方法2,隐式转移所有权
  9. std::promise<std::string> p;
  10. std::shared_future<std::string> sf(p.get_future()); //隐式转移所有权
  11. //方法3,std::future的share()成员函数
  12. std::promise<std::string> p;
  13. auto sf=p.get_future().share();//自动推导类型

3 线程等待时间

之前介绍过阻塞调用,会阻塞一段不确定的时间,将线程挂起到等待的事件发生。很多情况下,这样的方式很不错,但是在一些情况下,就需要限制一下线程等待的时间。
两种指定超时方式:是

  • “时延”:需要指定一段时间(例如,30毫秒)。例如std::condition_variable的成员函数wait_for()(wait_for的返回结果见3.2节)
  • “绝对时间点”:指定一个时间点(例如,世界标准时间[UTC]17:30:15.045987023,2011年11月30日)。例如std::condition_variable的两个成员函数wait_until()

可设置等待时间的函数汇总如下:

类型/命名空间 函数 返回值
std::this_thread
- sleep_for(duration)
- sleep_until(time_point)
线程暂停多少时间,没有返回值
std::condition_variable 或 std::condition_variable_any
- wait_for(lock, duration)
- wait_until(lock, time_point)
std::cv_status::time_out 或 std::cv_status::no_timeout

- wait_for(lock, duration, predicate)
- wait_until(lock, duration, predicate)
bool —— 当唤醒时,返回谓词的结果
std::timed_mutex 或 std::recursive_timed_mutex
- try_lock_for(duration)
- try_lock_until(time_point)
bool —— 获取锁时返回true,否则返回fasle
std::unique_lock unique_lock(lockable, duration) N/A —— 对新构建的对象调用owns_lock();
unique_lock(lockable, time_point) 当获取锁时返回true,否则返回false

- try_lock_for(duration)
- try_lock_until(time_point)
bool —— 当获取锁时返回true,否则返回false
std::future或std::shared_future
- wait_for(duration)
- wait_until(time_point)

- 当等待超时,返回std::future_status::timeout
- 当期望值持有一个为启动的延迟函数,返回std::future_status::deferred
- 当期望值准备就绪时,返回std::future_status::ready

3.1 C++标准库时钟

对于C++标准库std::chrono来说,时钟就是时间信息源。并且,时钟是一个类,提供了四种不同的信息:

  • 当前时间std::chrono::system_clock::now()是将返回系统时钟的当前时间
  • 时间类型:特定的时间点类型可以通过time_point的数据typedef成员来指定。例如:some_clock::now()的类型就是some_clock::time_point。
  • 时钟节拍:被指定为1/x(x在不同硬件上有不同的值)秒,这是由时间周期所决定。例如:一个时钟一秒有25个节拍,因此一个周期为std::ratio<1, 25>
  • 通过时钟节拍的分布,判断时钟是否稳定:当时钟节拍均匀分布(无论是否与周期匹配),并且不可调整,这种时钟就称为稳定时钟。is_steady静态数据成员为true时,表明这个时钟就是稳定的;否则,就是不稳定的。

C++11提供了如下几个常用的时钟:

  • std::chrono::system_clock是来自系统范畴的实时时钟,不稳定的,因为时钟是可调的。
  • std::chrono::steady_clock绝不会调整的时钟,稳定时钟
  • std::chrono::high_resolution_clock :标准库中提供的具有最小节拍周期(因此具有最高的精度)的时钟

C++20又新增了几种特定用途的时钟:utc_clock,tai_clock, gps_clock, file_clock等。

3.2 时延

std::chrono::duration<>函数模板能够对时延进行处理(线程库使用到的所有C++时间处理工具,都在std::chrono命名空间内)。

  • 第一个模板参数是一个类型表示(比如,int,long或double)
  • 第二个模板参数是定制部分,表示每一个单元所用秒数。标准库在std::chrono命名空间内,为延时变量提供一系列预定义类型:nanoseconds[纳秒] , microseconds[微秒] , milliseconds[毫秒] , seconds[秒] , minutes[分]和hours[时]。
  1. std::future<int> f=std::async(some_task);
  2. if(f.wait_for(std::chrono::milliseconds(35))==std::future_status::ready)
  3. do_something_with(f.get());

示例代码中调用future的wait_for函数等待固定的35毫秒,可能有如下三种结果:

  1. 当函数等待超时时,会返回std::future_status::timeout
  2. 当期望值状态改变,函数会返回std::future_status::ready
  3. 当与期望值相关的任务延迟了,函数会返回std::future_status::deferred

3.3 绝对时间点

时间点可以用std::chrono::time_point<>类型模板来表示

  • 第一个参数用来指定所要使用的时钟
  • 第二个函数参数用来表示时间的计量单位

示例代码:

  1. #include <condition_variable>
  2. #include <mutex>
  3. #include <chrono>
  4. std::condition_variable cv;
  5. bool done;
  6. std::mutex m;
  7. bool wait_loop()
  8. {
  9. //当前时间加上500毫秒得到一个未来的时间点
  10. auto const timeout= std::chrono::steady_clock::now()+std::chrono::milliseconds(500);
  11. std::unique_lock<std::mutex> lk(m);
  12. while(!done)
  13. {
  14. if(cv.wait_until(lk,timeout)==std::cv_status::timeout)
  15. break;
  16. }
  17. return done;
  18. }

这里调用的条件变量的wait_until函数,返回两种可能的结果:

  • 当函数等待超时时,会返回std::cv_status::timeout
  • 当条件变量条件达成,函数会返回std::cv_status::no_timeout