1. 数据共享带来的问题
想象一下,你和你的朋友合租一个公寓,公寓中只有一个厨房和一个卫生间。当你的朋友在卫生间时,你就会不能使用了。同样的问题,也困扰着线程。当线程在访问共享数据的时候,必须定一些规矩,用来限定线程可访问的数据位。还有,一个线程更新了共享数据,需要对其他线程进行通知。当一个或多个线程要修改共享数据时,就会产生很多麻烦。这种情况下,就必须小心谨慎,才能确保所有线程都工作正常。
当你以写多线程程序为生,条件竞争就会成为你的梦魇;编写软件时,我们会使用大量复杂的操作,用来避免恶性条件竞争。
为了避免条件竞争,两个线程就需要一定的执行顺序:
- 第一种方式是前面介绍的互斥量来确定访问的顺序
- 第二种方式是使用原子操作(后面介绍)。
2. 互斥量
2.1 std::mutex和std::lock_guard
C++使用互斥锁的两种方式:
std::mutex:创建互斥量实例,通过成员函数lock()对互斥量上锁,unlock()进行解锁。缺点是需要每次加锁解锁的时候调用,包括异常情况,容易漏掉造成死锁。std::lock_guard:构造时提供定义好的互斥量,会自动加锁,并在析构的时候进行解锁,从而保证了一个已锁互斥量能被正确解锁。
/*使用互斥量mutex和lock_guard进行加锁解锁操作。lock_guard可以自动进行加锁解锁,不需要手动调用lock和unlock*/#include <iostream>#include <thread>#include <mutex>int gloabl_v = 0;//全局变量std::mutex g_v_mutex;//定义一个互斥量用于保护全局变量void add_global_v(){//定义一个lock_guard,自动加锁std::lock_guard<std::mutex> my_lock(g_v_mutex);gloabl_v++;std::cout << std::this_thread::get_id() << " : " << gloabl_v << std::endl;//my_lock被释放时,互斥量会自动解锁}int main(){std::cout << "main : " << gloabl_v << std::endl;std::thread t1(add_global_v);t1.join();std::thread t2(add_global_v);t2.join();std::cout << "main : " << gloabl_v << std::endl;return 0;}
在大多数情况下,互斥量通常会与需要保护的数据放在同一类中,而不是定义成全局变量。这是面向对象设计的准则:将其放在一个类中,就可让他们联系在一起,也可对类的功能进行封装,并进行数据保护。
Note:当其中一个函数返回的是保护数据的指针或引用时,会破坏数据。具有访问能力的指针或引用可以访问(并可能修改)被保护的数据,而不会被互斥锁限制。这就需要对接口有相当谨慎的设计,要确保互斥量能锁住数据的访问,并且不留后门。所以,切勿将受保护数据的指针或引用传递到互斥锁作用域之外,无论是函数返回值,还是存储在外部可见内存,亦或是以参数的形式传递到用户提供的函数中去。
2.2 死锁问题和std::lock
一对线程需要对他们所有的互斥量做一些操作,其中每个线程都有一个互斥量,且等待另一个解锁。这样没有线程能工作,因为他们都在等待对方释放互斥量。这种情况就是死锁,它的最大问题就是由两个或两个以上的互斥量来锁定一个操作。
避免死锁的一般建议(并不总是实用),就是让两个互斥量总以相同的顺序上锁:总在互斥量B之前锁住互斥量A,就永远不会死锁。
重点来了:
- C++标准库
std::lock,可以一次性锁住多个(两个以上)的互斥量,并且没有副作用(死锁风险)。lock可以锁定给定的可锁定 (Lockable) 对象lock1、lock2、...、lockn,用免死锁算法避免死锁。std::lock要么将两个锁都锁住,要不一个都不锁 std::scoped_lock提供此函数的 RAII 包装,在C++17中支持,等于lock+lock_guard的功能之和。
下面示例在交换操作中使用std::lock()和std::lock_guard
/*使用std::lock避免死锁*/#include <mutex>#include <thread>#include <iostream>#include <vector>#include <functional>#include <chrono>#include <string>struct Employee {Employee(std::string id) : id(id) {}std::string id;std::vector<std::string> lunch_partners;//午餐伙伴,互斥的std::mutex m;//互斥锁std::string output() const{std::string ret = "Employee " + id + " has lunch partners: ";for (const auto& partner : lunch_partners)ret += partner + " ";return ret;}};void send_mail(Employee&, Employee&){// 模拟耗时的发信操作std::this_thread::sleep_for(std::chrono::seconds(1));}//分配伙伴void assign_lunch_partner(Employee& e1, Employee& e2){static std::mutex io_mutex;{std::lock_guard<std::mutex> lk(io_mutex);//用于读输入参数内容的互斥锁,读完立刻释放std::cout << e1.id << " and " << e2.id << " are waiting for locks" << std::endl;}{// 用 std::lock获得二个employee的锁,而不担心对 assign_lunch_partner 的其他调用会死锁我们std::lock(e1.m, e2.m);std::lock_guard<std::mutex> lk1(e1.m, std::adopt_lock);std::lock_guard<std::mutex> lk2(e2.m, std::adopt_lock);// C++17 中可用的较优解法// std::scoped_lock lk(e1.m, e2.m);{std::lock_guard<std::mutex> lk(io_mutex);//用于读输入参数内容的互斥锁,读完立刻释放std::cout << e1.id << " and " << e2.id << " got locks" << std::endl;}e1.lunch_partners.push_back(e2.id);//在lock的保护之下,可以放心的插入数据e2.lunch_partners.push_back(e1.id);}send_mail(e1, e2);send_mail(e2, e1);}int main(){//定义四个员工Employee alice("alice"), bob("bob"), christina("christina"), dave("dave");// 创建四个线程指派伙伴,因为发邮件给用户告知午餐指派,会消耗长时间std::vector<std::thread> threads;threads.emplace_back(assign_lunch_partner, std::ref(alice), std::ref(bob));threads.emplace_back(assign_lunch_partner, std::ref(christina), std::ref(bob));threads.emplace_back(assign_lunch_partner, std::ref(christina), std::ref(alice));threads.emplace_back(assign_lunch_partner, std::ref(dave), std::ref(bob));//每个线程调用joinfor (auto& thread : threads) thread.join();//打印输出std::cout << alice.output() << '\n' << bob.output() << '\n'<< christina.output() << '\n' << dave.output() << '\n';}
2.3 避免死锁的其他原则
- 避免嵌套锁:一个线程已获得一个锁时,再别去获取第二个。因为每个线程只持有一个锁,锁上就不会产生死锁
- 避免在持有锁时调用用户提供的代码:因为代码是用户提供的,你没有办法确定用户要做什么;用户程序可能做任何事情,包括获取锁。你在持有锁的情况下,调用用户提供的代码;如果用户代码要获取一个锁,就会违反第一个指导意见,并造成死锁
- 使用固定顺序获取锁:当硬性条件要求你获取两个或两个以上的锁,并且不能使用
std::lock单独操作来获取它们;那么最好在每个线程上,用固定的顺序获取它们(锁) - 使用锁的层次结构:对你的应用进行分层,并且识别在给定层上所有可上锁的互斥量。当代码试图对一个互斥量上锁,在该层锁已被低层持有时,上锁是不允许的。
2.4 std::unique_lock——灵活的锁
std::unqiue_lock使用更为自由的不变量,不会总与互斥量的数据类型相关,使用起来要比std:lock_guard更加灵活。**unique_lock** 可以移动,但是不能复制(唯一)。
- 可将
std::adopt_lock作为第二个参数传入构造函数,对互斥量进行管理 - 也可以将
std::defer_lock作为第二个参数传递进去,表明互斥量应保持解锁状态。
这样,就可以被std::unique_lock对象(不是互斥量)的lock()函数所获取,或传递std::unique_lock对象到std::lock()中。
当std::lock_guard已经能够满足你的需求时,还是建议你继续使用它。当需要更加灵活的锁时,最好选择std::unique_lock,因为它更适合于你的任务。
/*std::unique_lock代替lock_guard,更具有灵活性*/#include <iostream>#include <thread>#include <mutex>struct Box {explicit Box(int num) : num_things(num) {}// explicit用于构造函数,意为不允许隐式类型转换int num_things;std::mutex m;};void transfer(Box& from, Box& to, int num){//定义unique_lockstd::unique_lock<std::mutex> lock1(from.m, std::defer_lock);std::unique_lock<std::mutex> lock2(to.m, std::defer_lock);//使用std::lock获取两个锁,避免死锁std::lock(lock1, lock2);from.num_things -= num;to.num_things += num;std::cout << "from:" << from.num_things << ", to:" << to.num_things << std::endl;//lock1, lock2析构的时候自动解锁}int main(){Box b1(100);Box b2(50);std::thread t1(transfer, std::ref(b1), std::ref(b2), 10);std::thread t2(transfer, std::ref(b1), std::ref(b2), 5);t1.join();t2.join();return 0;}
2.5 锁的粒度
锁的粒度,用来描述通过一个锁保护着的数据量大小。一个细粒度锁(a fine-grained lock)能够保护较小的数据量,一个粗粒度锁(a coarse-grained lock)能够保护较多的数据量。
- 只有一个互斥量保护整个数据结构时:不仅会有更多对锁的竞争,也会增加持锁的时间。较多的操作步骤需要获取同一个互斥量上的锁,所以持有锁的时间会更长。成本上的双重打击也算是为向细粒度锁转移提供了双重激励和可能。
锁不仅是能锁住合适粒度的数据,还要控制锁的持有时间,以及哪些操作在执行的同时能够拥有锁。一般情况下,执行必要的操作时,尽可能将持有锁的时间缩减到最小。
有时,没有一个合适粒度级别,因为并不是所有对数据结构的访问都需要同一级的保护。这时就需要寻找一个合适的机制,去替换std::mutex。
3. 其他保护手段
3.1 std::call_once和std::once_flag
std::call_once准确调用函数 func 一次,即使从多个线程调用。函数 func 的完成与先前或后继的用同一 once_flag 对象,进行 call_once 调用同步。使用std::call_once比显式使用互斥量消耗的资源更少。
如示例,即使有多个线程操作,但只会真正调用函数一次:
/*使用std::onece和std::once_flag控制只调用一次的锁*/#include <iostream>#include <thread>#include <mutex>//定义once_flagstatic std::once_flag resource_flag;void do_one(){std::cout << "call " << __FUNCTION__ << " once" << std::endl;}void func(){std::call_once(resource_flag, do_one);}int main(){std::thread t1(func);std::thread t2(func);t1.join();t2.join();return 0;}
最后打印结果只会打印一次,表明do_one函数只被调用了一次。
3.2 共享锁
使用std::mutex来保护数据结构,显的有些反应过度(因为在没有发生修改时,它将削减并发读取数据的可能性)。这里需要另一种不同的互斥量,这种互斥量常被称为“读者-作者锁”,因为其允许两种不同的使用方式:一个“作者”线程独占访问和共享访问,让多个“读者”线程并发访问。
C++提供了两种共享锁的类:std::shared_mutex和std::shared_timed_mutex的不同点在于
std::shared_timed_mutex支持更多的操作方式std::shared_mutex有更高的性能优势,从而不支持更多的操作。
两者在C++17中被支持,C++14只支持std::shared_timed_mutex
#include <mutex>#include <shared_mutex>#include <thread>class ThreadSafeCounter {public:ThreadSafeCounter() = default;// 多个线程/读者能同时读计数器的值。unsigned int get() const {std::shared_lock<std::shared_mutex> lock(mutex_);return value_;}// 只有一个线程/写者能增加/写线程的值。void increment() {std::unique_lock<std::shared_mutex> lock(mutex_);value_++;}// 只有一个线程/写者能重置/写线程的值。void reset() {std::unique_lock<std::shared_mutex> lock(mutex_);value_ = 0;}private:mutable std::shared_mutex mutex_;//定义共享锁unsigned int value_ = 0;};
3.3 嵌套锁
当一个线程已经获取一个std::mutex时(已经上锁),并对其再次上锁,这个操作就是错误的,并且继续尝试这样做的话,就会产生未定义行为。然而,在某些情况下,一个线程尝试获取同一个互斥量多次,而没有对其进行一次释放是可以的。之所以可以,是因为C++标准库提供了std::recursive_mutex类。可以对同一线程的单个实例上获取多个锁(lock多次),其他功能与std::mutex相同。互斥量锁住其他线程前,必须释放拥有的所有锁,所以当调用lock()三次后,也必须调用unlock()三次。正确使用std::lock_guard<std::recursive_mutex>和std::unique_lock<std::recursive_mutex>可以帮你处理这些问题。
大多数情况下,当需要嵌套锁时,就要对代码设计进行改动,不推荐这样的使用方式,因为过于草率,并且不合理。
