3.1 共享数据带来的问题

线程间潜在问题就是修改共享数据,致使不变量遭到破坏——竞态条件(race condition)。

3.1.1 竞态条件

并发修改一个独立对象,就可能会导致竞态条件。

3.2 使用互斥量保护共享数据

当访问共享数据前,使用互斥量将相关数据锁住。当访问结束后,再将数据解锁。

3.2.1 C++使用互斥量

C++通过实例化std::mutex创建互斥量,通过调用成员函数**lock()**进行上锁**unlock()**进行解锁。不过一般不推荐直接调用成员函数,因为这意味着必须记住在每个函数出口都要调用unlock()
所以C++标准库为互斥量提供了一个RAII语法的模板类**lock_guard**,它在构造的时候提供已锁互斥量并在析构的时候进行解锁,从而保证一个已锁的互斥量总会被正确解锁。

虽然将互斥量定义为全局变量没问题,但是大多数情况下,互斥量和保护的数据放在同一个类中,而不是定义成全局变量。将二者放在同一个类中,就可让它们联系在一起,也可以对类的功能进行封装并进行数据保护。这种方式很容易看出什么时候对互斥量上锁,当所有成员函数都会在调用时对数据上锁,结束时对数据解锁,就保证了数据访问时不变量不被破坏。
HINT:当一个成员函数返回保护数据的指针或引用时,就会破坏对数据的保护。这样可以通过指针或引用访问被保护的数据,而不会被互斥锁限制。所以需要谨慎设计。

3.2.2 精心组织代码来保护共享数据

  1. class some_data{
  2. int a;
  3. std::string b;
  4. public:
  5. void do_something();
  6. };
  7. class data_wrapper{
  8. private:
  9. some_data data;
  10. std::mutex m;
  11. public:
  12. template<typename Function>
  13. void process_data(Function func){
  14. std::lock_guard<std::mutex> l(m);
  15. func(data); // 1 传递“保护”数据给用户函数
  16. }
  17. };
  18. some_data* unprotected;
  19. void malicious_function(some_data& protected_data){
  20. unprotected=&protected_data;
  21. }
  22. data_wrapper x;
  23. void foo(){
  24. x.process_data(malicious_function); // 2 传递一个恶意函数
  25. unprotected->do_something(); // 3 在无保护的情况下访问保护数据
  26. }

调用用户提供的函数func()意味着foo()能够绕过保护机制将函数malicious_function()传递进去,在没有锁定互斥量的情况下调用do_something()
这段代码的问题在于根本没有保护,只是将所有可访问的数据结构代码标记为互斥,函数foo()中调用unprotected->do_something()的代码未能标记为互斥。

所以切记不要将受保护数据的指针或引用传递到互斥锁作用域之外,无论是函数返回值,还是存储在外部课件内存,亦或是以参数形式传递到用户提供的函数

3.2.3 发现接口内在的竞态条件

即便在一个很简单的接口中,依旧有可能遇到竞态条件。例如构建一个类似于std::stack结构的栈,除了构造函数和swap()外,需要对std::stack提供五个操作:push()pop()top()empty()size()。即使修改了top(),使其返回一个拷贝而非引用,对内部数据使用一个互斥量进行保护,不过这个接口仍存在竞态条件。这个问题不仅存在于基于互斥量实现的接口中,在无锁实现的接口中,依旧会产生。这是接口的问题,与其实现方式无关

  1. template<typename T,typename Container=std::deque<T> >
  2. class stack{
  3. public:
  4. explicit stack(const Container&);
  5. explicit stack(Container&& = Container());
  6. template <class Alloc> explicit stack(const Alloc&);
  7. template <class Alloc> stack(const Container&, const Alloc&);
  8. template <class Alloc> stack(Container&&, const Alloc&);
  9. template <class Alloc> stack(stack&&, const Alloc&);
  10. bool empty() const;
  11. size_t size() const;
  12. T& top();
  13. T const& top() const;
  14. void push(T const&);
  15. void push(T&&);
  16. void pop();
  17. void swap(stack&&);
  18. };

虽然empty()size()可能在调用并返回时是正确的,但结果并不可靠。因为返回后,其他线程就可以自由访问栈,并可能进行push()pop()操作。

假设有一个stack<vector<int>>vector是一个动态容器,当拷贝一个vector,标准库会从堆上分配很多内存来完成这次拷贝。当系统出于高负荷或资源限制情况下,内存分配就会失败,vector的拷贝构造函数可能会抛出一个std::bad_alloc异常。当vector中存在大量元素时,这种情况的可能性更大。当pop()函数返回弹出值时,即在栈中将这个值移除。但当拷贝数据时调用函数抛出一个异常会如何?
如果这样,要弹出的数据会丢失:确实从栈上移除了,但是拷贝失败了。std::stack的设计人员于是讲这个操作分为两个部分:先top()pop()。这样在不能安全将元素拷贝出去的情况下,栈中的这个数据还依旧存在,没有丢失。
但是这样的分离却制造了竞态条件。以下修改方式可以参考,但是是有代价的:

  1. 传入一个引用

将变量的引用作为参数,传入pop()函数中获取想要的弹出值

  1. std::vector<int>result;
  2. some_stack.pop(result);

有一个明显缺点:需要临时制造出一个堆中类型的实例,用于接收目标值。这需要一个可以赋值的类型:很多用户自定义类型即使支持移动构造甚至拷贝构造,可能都不支持赋值操作。

  1. 无异常抛出的拷贝构造函数或移动构造函数

对于有返回值的pop()函数来说,只有“异常安全”方面的担忧。很多类型有不会抛出异常的移动构造函数

  1. 返回指向弹出值的指针

返回一个指向弹出元素的指针,而不是直接返回值。指针的优势在于自由拷贝,不会产生异常。可以考虑使用std::shared_ptr,不仅能避免内存泄漏,而且标准库能完全控制内存分配方案。相较于非线程安全版本,这个方案的开销相当大。

定义线程安全的堆栈

  1. #include <exception>
  2. #include <memory>
  3. #include <mutex>
  4. #include <stack>
  5. struct empty_stack: std::exception{
  6. const char* what() const throw() {
  7. return "empty stack!";
  8. };
  9. };
  10. template<typename T>
  11. class threadsafe_stack{
  12. private:
  13. std::stack<T> data;
  14. mutable std::mutex m;
  15. public:
  16. threadsafe_stack()
  17. : data(std::stack<T>()){}
  18. threadsafe_stack(const threadsafe_stack& other){
  19. std::lock_guard<std::mutex> lock(other.m);
  20. data = other.data; // 1 在构造函数体中的执行拷贝
  21. }
  22. threadsafe_stack& operator=(const threadsafe_stack&) = delete;
  23. void push(T new_value){
  24. std::lock_guard<std::mutex> lock(m);
  25. data.push(new_value);
  26. }
  27. std::shared_ptr<T> pop(){
  28. std::lock_guard<std::mutex> lock(m);
  29. if(data.empty()) throw empty_stack(); // 在调用pop前,检查栈是否为空
  30. std::shared_ptr<T> const res(std::make_shared<T>(data.top())); // 在修改堆栈前,分配出返回值
  31. data.pop();
  32. return res;
  33. }
  34. void pop(T& value){
  35. std::lock_guard<std::mutex> lock(m);
  36. if(data.empty()) throw empty_stack();
  37. value=data.top();
  38. data.pop();
  39. }
  40. bool empty() const{
  41. std::lock_guard<std::mutex> lock(m);
  42. return data.empty();
  43. }
  44. };

在拷贝构造函数体中使用互斥量来确保复制结果的正确性,这样的方式比成员初始化列表好。

3.2.4 死锁:问题描述及解决方案

避免死锁的一般建议:让两个互斥量总以相同的顺序上锁。某些情况下可以合阳,因为不同的互斥量用于不同的地方。
不过当有多个互斥量保护同一个类的独立实例时一个操作对同一个类的两个不同实例进行数据的交换操作,为了保证数据交换操作的正确性,就要避免数据被并发修改,并确保每个实例上的互斥量都能锁住自己要保护的区域。不过选择一个固定的顺序可能会适得其反:在参数交换之后,两个线程试图在相同的两个实例间进行数据交换时,又死锁了。

C++标准库有std::lock()可以一次性锁住多个互斥量,并且没有死锁风险。

  1. #include <iostream>
  2. #include <mutex>
  3. #include <thread>
  4. class A {
  5. private:
  6. int x;
  7. std::mutex m;
  8. public:
  9. A(const int& x_) : x(x_) {}
  10. friend void swap(A& lhs, A& rhs) {
  11. if (&lhs == &rhs)
  12. return;
  13. std::lock(lhs.m, rhs.m);
  14. std::lock_guard<std::mutex> lock_a(lhs.m, std::adopt_lock);
  15. std::lock_guard<std::mutex> lock_b(rhs.m, std::adopt_lock);
  16. std::swap(lhs.x, rhs.x);
  17. }
  18. };
  19. void swap(A& lhs, A& rhs);
  20. int main(void) {
  21. A x1(1);
  22. A x2(2);
  23. std::thread t1(swap, std::ref(x1), std::ref(x2));
  24. std::thread t2(swap, std::ref(x2), std::ref(x1));
  25. t1.join();
  26. t2.join();
  27. }

调用std::lock()锁住两个互斥量,并且两个std::lock_guard实例已经创建好,std::adopt_lock参数除了表示std::lock_guard对象已经上锁外,还表示接管已上锁的状态,而非在构造函数中再加锁。
这样就能保证在大多数情况下,函数退出时能被正确解锁。而std::lock()具有原子特性,参数列表中的锁要么全都加锁,要么一个也不加锁(即后面的锁如果获取失败、就释放前面已经获取的锁)。

针对上述场景,C++17提供了新的 RAII 类模板std::scoped_lock<>std::scoped_lock<>std::lock_guard<>完全等价,不过前者是可变参数模板,接收各种互斥型别作为参数列表,还以多个对象作为构造函数的参数列表:

  1. void swap(X& lhs, X& rhs){
  2. if(&lhs == &rhs)
  3. return;
  4. std::scoped_lock guard(lhs.m, rhs.m);
  5. swap(lhs.sth, rhs.sth);
  6. }

使用了C++17的特性:类模板参数推导,C++17具有隐式类模板参数推导,依据传入构造函数的参数对象自动匹配,选择正确的互斥型别。

3.2.5 避免锁的进阶指导

  • 避免嵌套锁:一个线程已获得一个锁时,不要再去获取第二个。当需要多个锁时,使用std::lock来做这件事。
  • 避免在持有锁时调用用户提供的代码:因为无法判断用户代码中是否有获取锁的代码(可能会造成死锁),所以尽量不要在上锁后调用。
  • 使用固定顺序获取锁:…
  • 使用锁的层次结构:略,

3.2.6 std::unique_lock——灵活的锁

std::unique_lock实例不会总与互斥量的数据类型相关,使用起来要比std::lock_guard更灵活。可以将std::adopt_lock作为第二个参数传入构造函数,对互斥量进行管理;也可以将std::defer_lock作为第二个参数传递进去,表明互斥量应该在构造期间保持解锁状态。此外,std::unique_lock会比std::lock_guard稍慢一点,允许**std::unique_lock**不带互斥量,代价是:需要存储并且更新互斥信息。

因为std::unique_lock支持lock()try_lock()unlock()成员函数,所以能将std::unique_lock对象传递到std::lock()中。

3.2.7 不同域中互斥量所有权的传递

std::unique_lock实例没有与自身相关的互斥量,一个互斥量的所有权可以通过移动操作,在不同的实例中传递有时候转移自动发生有些时候需要显式调用**std::move**来执行移动操作。取决于原值是否是左值。
一种使用可能是:允许一个函数去锁住一个互斥量,并将所有权转移到调用者上,所以调用者可以在这个锁保护的范围内执行额外的动作

  1. #include <iostream>
  2. #include <mutex>
  3. #include <thread>
  4. #include <unistd.h>
  5. std::mutex m;
  6. int i = 1;
  7. std::unique_lock<std::mutex> get_lock() {
  8. std::unique_lock<std::mutex> lk(m);
  9. ++i;
  10. return lk;
  11. }
  12. void process_data() {
  13. std::unique_lock<std::mutex> lk(get_lock());
  14. --i;
  15. }
  16. int main(void) {
  17. std::thread t1(get_lock);
  18. sleep(1);
  19. std::cout << i << "\n";
  20. std::thread t2(process_data);
  21. sleep(1);
  22. std::cout << i << "\n";
  23. t1.join();
  24. t2.join();
  25. }

3.2.8 锁的粒度

锁的粒度用于描述锁保护的数据量大小。

  1. void get_and_process_data(){
  2. std::unique_lock<std::mutex> my_lock(the_mutex);
  3. some_class data_to_process=get_next_data_chunk();
  4. my_lock.unlock(); // 1 不要让锁住的互斥量越过process()函数的调用
  5. result_type result=process(data_to_process);
  6. my_lock.lock(); // 2 为了写入数据,对互斥量再次上锁
  7. write_result(data_to_process,result);
  8. }

上述例子表示:在只有一个互斥量保护整个数据结构时,不仅可能会有更多对锁的竞争,锁被持有的时间也很难被减少。较多的操作步骤需要获取同一个互斥量上的锁,所以持有锁的时间会更长。
所以要尽可能保证锁的粒度较细,而且要控制锁的持有时间

3.3 保护共享数据的替代设施

互斥量是最通用的机制,但不是保护共享数据的唯一方式。有替代方式。

一种情况:共享数据在并发访问和初始化时都需要保护,但是之后需要进行隐式同步。这可能是因为数据在一开始是作为只读方式创建,所以没有同步问题;或者因为必要的保护作为对数据操作的一部分,所以隐式的执行。任何情况下,数据初始化后锁住一个互斥量,纯粹是为了保护其初始化过程,这会带来性能下降。

所以C++标准提供了一种纯粹保护共享数据初始化过程的机制:

3.3.1 保护共享数据的初始化过程

使用一个互斥量的延迟初始化(线程安全)过程:

  1. std::shared_ptr<some_resource> resource_ptr;
  2. std::mutex resource_mutex;
  3. void foo(){
  4. std::unique_lock<std::mutex> lk(resource_mutex); // 所有线程在此序列化
  5. if(!resource_ptr){
  6. resource_ptr.reset(new some_resource); // 只有初始化过程需要保护
  7. }
  8. lk.unlock();
  9. resource_ptr->do_something();
  10. }

太浪费了,直接用一个大锁锁住了整个区域。
双重检查锁(Double-Checked Locking),应用于单例模式,但是其实是有硬伤的:

  1. std::shared_ptr<some_resource> resource_ptr;
  2. std::mutex resource_mutex;
  3. void undefined_behaviour_with_double_checked_locking()
  4. {
  5. if(!resource_ptr) // 1
  6. {
  7. std::lock_guard<std::mutex> lk(resource_mutex);
  8. if(!resource_ptr) // 2
  9. {
  10. resource_ptr.reset(new some_resource); // 3
  11. }
  12. }
  13. resource_ptr->do_something(); // 4
  14. }

看上去没什么问题,也确实。但是忽略了代码的乱序执行。编译器不会按照我们编写代码的顺序去解析、运行代码。很有可能③处的指针被初始化了(指向一块区域了),但是那块区域还没有new,所以就有可能产生未定义行为。
上述出现问题的场景如下:

  • 线程 A 进入第一个if,上锁并初始化,但是此时恰好代码乱序执行了
  • 此时线程 B 开始,判断得到resource_ptr不为nullptr,于是直接调用do_something(),但是由于线程 A 中的代码乱序执行了,resource_ptr只是被初始化了,但是没有被指向正确的值,所以这里调用do_something()会带来未定义行为。

    第二版译者注:

image.png

C++标准也认为竞态条件的处理很重要,所以提供了std::once_flagstd::call_once来处理这种情况。它们两个位于<mutex>中。

比起锁住互斥量并显式检查指针,每个线程只需要使用**std::call_once**,在**std::call_once**结束时,就能安全地直到指针已经被其他的线程初始化。使用std::call_once比显式使用互斥量消耗的资源更少,特别是初始化完成后。std::call_once可以和任何函数或可调用对象一起使用:

  1. #include <iostream>
  2. #include <memory>
  3. #include <mutex>
  4. #include <thread>
  5. #include <unistd.h>
  6. #include <vector>
  7. using namespace std;
  8. int* resource_ptr = nullptr;
  9. std::once_flag resource_flag{};
  10. vector<std::thread> vec{};
  11. void init_resource(int i) {
  12. resource_ptr = new int(i);
  13. }
  14. void foo(int i) {
  15. std::call_once(resource_flag, init_resource, i);
  16. cout << *resource_ptr << endl;
  17. }
  18. int main(void) {
  19. for (int i = 1; i < 8; ++i) {
  20. vec.emplace_back(std::thread(foo, i));
  21. }
  22. for (auto& i : vec) {
  23. i.join();
  24. }
  25. return 0;
  26. }

下例使用std::call_once作为类成员的延迟初始化(线程安全):

  1. #include <functional>
  2. #include <iostream>
  3. #include <mutex>
  4. #include <thread>
  5. using namespace std;
  6. class X {
  7. private:
  8. int a;
  9. std::once_flag connection_init_flag;
  10. void open_connection() {
  11. cout << "open_connection\n";
  12. }
  13. public:
  14. X(){}
  15. X(int const& int_) : a(int_) {}
  16. void send_data(X const& data) // 1
  17. {
  18. std::call_once(connection_init_flag, &X::open_connection, this); // 2
  19. }
  20. void receive_data() // 3
  21. {
  22. std::call_once(connection_init_flag, &X::open_connection, this); // 2
  23. }
  24. void m_invoke() {
  25. std::thread t(&X::receive_data, this);
  26. }
  27. };
  28. int main(void) {
  29. X x;
  30. std::thread t1(&X::receive_data, &x);
  31. t1.join();
  32. }

注意std::call_once()如果调用成员函数,需要将this指针传入。
HINT**std::mutex****std::once_flag**的实例不能拷贝和移动,所以如果把它们作为类成员,就得显式定义特殊的成员函数。(拷贝构造之类的)


还有一种情形的初始化过程中潜存竞态条件:其中一个局部变量被声明为**static**类型static变量在声明后就完成初始化,对于多线程调用的函数就存在竞态条件——多个线程抢着去定义这个变量。在C++11标准化中该问题被解决了:初始化以及定义全都只在一个线程中发生并且没有其他线程可在初始化完成前对其进行处理

3.3.3 递归加锁

当一个线程已经获得一个std::mutex后,再对其上锁,这个操作就是错误的,并且继续这样尝试的话就会产生未定义行为。但是某些情况下,一个线程尝试获取同一个互斥量多次而没有对其进行一次释放是可以的,C++标准提供了std::recursive_mutex类,功能与std::mutex类似,但是可以在同一线程中多次获得同一个锁。但是在互斥量锁住其他线程前必须释放所拥有的锁,即lock()的数量与unlock()数量要一致。当然,配合std::lock_guardMstd::recursive_mutex>std::unique_lock<std::recursice_mutex>可以避免这些问题。

嵌套锁一般用于可并发访问的类上:每个公共成员函数都会对互斥量上锁,然后完成对应的功能,之后再解锁互斥量。不过有时成员函数会调用另一个成员函数,这种情况下,第二个成员函数也会试图锁住互斥量,就会导致为定义行为的发生。此时就可以使用嵌套锁std::recursive_mutex