C++ 并发

并行基础

**std::thread** 用于创建一个执行的线程实例,所以它是一切并发编程的基础,使用时需要包含 **<thread>** 头文件, 它提供了很多基本的线程操作,例如 **get_id()** 来获取所创建线程的线程 ID,使用 **join()** 来加入一个线程等等,例如:

  1. #include <iostream>
  2. #include <thread>
  3. int main() {
  4. std::thread t([](){
  5. std::cout << "hello world." << std::endl;
  6. });
  7. t.join();
  8. return 0;
  9. }

互斥量与临界区

在操作系统、亦或是数据库的相关知识中已经了解过了有关并发技术的基本知识,mutex 就是其中的核心之一。C++11 引入了 mutex 相关的类,其所有相关的函数都放在 **<mutex>** 头文件中。
**std::mutex** 是 C++11 中最基本的 mutex 类,通过实例化 **std::mutex** 可以创建互斥量, 而通过其成员函数 lock() 可以进行上锁,unlock() 可以进行解锁。
但是在实际编写代码的过程中,最好不去直接调用成员函数, 因为调用成员函数就需要在每个临界区的出口处调用 unlock(),当然,还包括异常。
这时候 C++11 还为互斥量提供了一个 RAII 语法的模板类 std::lock_guard。RAII 在不失代码简洁性的同时,很好的保证了代码的异常安全性。
在 RAII 用法下,对于临界区的互斥量的创建只需要在作用域的开始部分,例如:

  1. #include <iostream>
  2. #include <thread>
  3. int v = 1;
  4. void critical_section(int change_v) {
  5. static std::mutex mtx;
  6. std::lock_guard<std::mutex> lock(mtx);
  7. // 执行竞争操作
  8. v = change_v;
  9. // 离开此作用域后 mtx 会被释放
  10. }
  11. int main() {
  12. std::thread t1(critical_section, 2), t2(critical_section, 3);
  13. t1.join();
  14. t2.join();
  15. std::cout << v << std::endl;
  16. return 0;
  17. }

由于 C++ 保证了所有栈对象在生命周期结束时会被销毁,所以这样的代码也是异常安全的。无论 critical_section() 正常返回、还是在中途抛出异常,都会引发堆栈回退,也就自动调用了 unlock()
**std::unique_lock** 则相对于 **std::lock_guard** 出现的,**std::unique_lock** 更加灵活, **std::unique_lock** 的对象会以独占所有权(没有其他的 unique_lock 对象同时拥有某个 mutex 对象的所有权) 的方式管理 mutex 对象上的上锁和解锁的操作。
所以在并发编程中,推荐使用 **std::unique_lock**
**std::lock_guard** 不能显式的调用 lock 和 **unlock**, 而 **std::unique_lock** 可以在声明后的任意位置调用, 可以缩小锁的作用范围,提供更高的并发度
如果用到了条件变量 std::condition_variable::wait 则必须使用 std::unique_lock 作为参数。
例如:

  1. #include <iostream>
  2. #include <thread>
  3. int v = 1;
  4. void critical_section(int change_v) {
  5. static std::mutex mtx;
  6. std::unique_lock<std::mutex> lock(mtx);
  7. // 执行竞争操作
  8. v = change_v;
  9. std::cout << v << std::endl;
  10. // 将锁进行释放
  11. lock.unlock();
  12. // 在此期间,任何人都可以抢夺 v 的持有权
  13. // 开始另一组竞争操作,再次加锁
  14. lock.lock();
  15. v += 1;
  16. std::cout << v << std::endl;
  17. }
  18. int main() {
  19. std::thread t1(critical_section, 2), t2(critical_section, 3);
  20. t1.join();
  21. t2.join();
  22. return 0;
  23. }

期物

期物(Future)表现为 **std::future**,它提供了一个访问异步操作结果的途径,这句话很不好理解。为了理解这个特性,需要先理解一下在 C++11 之前的多线程行为。
试想,如果主线程 A 希望新开辟一个线程 B 去执行某个预期的任务,并返回一个结果。
而这时候,线程 A 可能正在忙其他的事情,无暇顾及 B 的结果, 所以很自然的希望能够在某个特定的时间获得线程 B 的结果
在 C++11 的 std::future 被引入之前,通常的做法是:创建一个线程 A,在线程 A 里启动任务 B,当准备完毕后发送一个事件,并将结果保存在全局变量中。
而主函数线程 A 里正在做其他的事情,当需要结果的时候,调用一个线程等待函数来获得执行的结果。
而 C++11 提供的 **std::future** 简化了这个流程,可以用来获取异步任务的结果。自然地,很容易能够想象到把它作为一种简单的线程同步手段,即屏障(barrier)。
为了看一个例子,这里额外使用 std::packaged_task,它可以用来封装任何可以调用的目标,从而用于实现异步的调用。举例来说:

  1. #include <iostream>
  2. #include <future>
  3. #include <thread>
  4. int main() {
  5. // 将一个返回值为7的 lambda 表达式封装到 task 中
  6. // std::packaged_task 的模板参数为要封装函数的类型
  7. std::packaged_task<int()> task([](){return 7;});
  8. // 获得 task 的期物
  9. std::future<int> result = task.get_future(); // 在一个线程中执行 task
  10. std::thread(std::move(task)).detach();
  11. std::cout << "waiting...";
  12. result.wait(); // 在此设置屏障,阻塞到期物的完成
  13. // 输出执行结果
  14. std::cout << "done!" << std:: endl << "future result is " << result.get() << std::endl;
  15. return 0;
  16. }

在封装好要调用的目标后,可以使用 **get_future()** 来获得一个 **std::future** 对象,以便之后实施线程同步。

条件变量

条件变量 **std::condition_variable** 是为了解决死锁而生,当互斥操作不够用而引入的。比如,线程可能需要等待某个条件为真才能继续执行, 而一个忙等待循环中可能会导致所有其他线程都无法进入临界区使得条件为真时,就会发生死锁。
所以,condition_variable 实例被创建出现主要就是用于唤醒等待线程从而避免死锁
std::condition_variablenotify_one() 用于唤醒一个线程;notify_all() 则是通知所有线程。下面是一个生产者和消费者模型的例子:

  1. #include <queue>
  2. #include <chrono>
  3. #include <mutex>
  4. #include <thread>
  5. #include <iostream>
  6. #include <condition_variable>
  7. int main() {
  8. std::queue<int> produced_nums;
  9. std::mutex mtx;
  10. std::condition_variable cv;
  11. bool notified = false; // 通知信号
  12. // 生产者
  13. auto producer = [&]() {
  14. for (int i = 0; ; i++) {
  15. std::this_thread::sleep_for(std::chrono::milliseconds(900));
  16. std::unique_lock<std::mutex> lock(mtx);
  17. std::cout << "producing " << i << std::endl;
  18. produced_nums.push(i);
  19. notified = true;
  20. cv.notify_all(); // 此处也可以使用 notify_one
  21. }
  22. };
  23. // 消费者
  24. auto consumer = [&]() {
  25. while (true) {
  26. std::unique_lock<std::mutex> lock(mtx);
  27. while (!notified) { // 避免虚假唤醒
  28. cv.wait(lock);
  29. }
  30. // 短暂取消锁,使得生产者有机会在消费者消费空前继续生产
  31. lock.unlock();
  32. std::this_thread::sleep_for(std::chrono::milliseconds(1000)); // 消费者慢于生产者
  33. lock.lock();
  34. while (!produced_nums.empty()) {
  35. std::cout << "consuming " << produced_nums.front() << std::endl;
  36. produced_nums.pop();
  37. }
  38. notified = false;
  39. }
  40. };
  41. // 分别在不同的线程中运行
  42. std::thread p(producer);
  43. std::thread cs[2];
  44. for (int i = 0; i < 2; ++i) {
  45. cs[i] = std::thread(consumer);
  46. }
  47. p.join();
  48. for (int i = 0; i < 2; ++i) {
  49. cs[i].join();
  50. }
  51. return 0;
  52. }

值得一提的是,在生产者中虽然可以使用 **notify_one()**,但实际上并不建议在此处使用, 因为在多消费者的情况下,消费者实现中简单放弃了锁的持有,这使得可能让其他消费者 争夺此锁,从而更好的利用多个消费者之间的并发。
话虽如此,但实际上因为 std::mutex 的排他性, 根本无法期待多个消费者能真正意义上的并行消费队列的中生产的内容,仍需要粒度更细的手段。