1. 数据共享带来的问题
想象一下,你和你的朋友合租一个公寓,公寓中只有一个厨房和一个卫生间。当你的朋友在卫生间时,你就会不能使用了。同样的问题,也困扰着线程。当线程在访问共享数据的时候,必须定一些规矩,用来限定线程可访问的数据位。还有,一个线程更新了共享数据,需要对其他线程进行通知。当一个或多个线程要修改共享数据时,就会产生很多麻烦。这种情况下,就必须小心谨慎,才能确保所有线程都工作正常。
当你以写多线程程序为生,条件竞争就会成为你的梦魇;编写软件时,我们会使用大量复杂的操作,用来避免恶性条件竞争。
为了避免条件竞争,两个线程就需要一定的执行顺序:
- 第一种方式是前面介绍的互斥量来确定访问的顺序
- 第二种方式是使用原子操作(后面介绍)。
2. 互斥量
2.1 std::mutex和std::lock_guard
C++使用互斥锁的两种方式:
std::mutex
:创建互斥量实例,通过成员函数lock()对互斥量上锁,unlock()进行解锁。缺点是需要每次加锁解锁的时候调用,包括异常情况,容易漏掉造成死锁。std::lock_guard
:构造时提供定义好的互斥量,会自动加锁,并在析构的时候进行解锁,从而保证了一个已锁互斥量能被正确解锁。
/*
使用互斥量mutex和lock_guard进行加锁解锁操作。
lock_guard可以自动进行加锁解锁,不需要手动调用lock和unlock
*/
#include <iostream>
#include <thread>
#include <mutex>
int gloabl_v = 0;//全局变量
std::mutex g_v_mutex;//定义一个互斥量用于保护全局变量
void add_global_v()
{
//定义一个lock_guard,自动加锁
std::lock_guard<std::mutex> my_lock(g_v_mutex);
gloabl_v++;
std::cout << std::this_thread::get_id() << " : " << gloabl_v << std::endl;
//my_lock被释放时,互斥量会自动解锁
}
int main()
{
std::cout << "main : " << gloabl_v << std::endl;
std::thread t1(add_global_v);
t1.join();
std::thread t2(add_global_v);
t2.join();
std::cout << "main : " << gloabl_v << std::endl;
return 0;
}
在大多数情况下,互斥量通常会与需要保护的数据放在同一类中,而不是定义成全局变量。这是面向对象设计的准则:将其放在一个类中,就可让他们联系在一起,也可对类的功能进行封装,并进行数据保护。
Note:当其中一个函数返回的是保护数据的指针或引用时,会破坏数据。具有访问能力的指针或引用可以访问(并可能修改)被保护的数据,而不会被互斥锁限制。这就需要对接口有相当谨慎的设计,要确保互斥量能锁住数据的访问,并且不留后门。所以,切勿将受保护数据的指针或引用传递到互斥锁作用域之外,无论是函数返回值,还是存储在外部可见内存,亦或是以参数的形式传递到用户提供的函数中去。
2.2 死锁问题和std::lock
一对线程需要对他们所有的互斥量做一些操作,其中每个线程都有一个互斥量,且等待另一个解锁。这样没有线程能工作,因为他们都在等待对方释放互斥量。这种情况就是死锁,它的最大问题就是由两个或两个以上的互斥量来锁定一个操作。
避免死锁的一般建议(并不总是实用),就是让两个互斥量总以相同的顺序上锁:总在互斥量B之前锁住互斥量A,就永远不会死锁。
重点来了:
- C++标准库
std::lock
,可以一次性锁住多个(两个以上)的互斥量,并且没有副作用(死锁风险)。lock可以锁定给定的可锁定 (Lockable) 对象lock1
、lock2
、...
、lockn
,用免死锁算法避免死锁。std::lock
要么将两个锁都锁住,要不一个都不锁 std::scoped_lock
提供此函数的 RAII 包装,在C++17中支持,等于lock+lock_guard的功能之和。
下面示例在交换操作中使用std::lock()
和std::lock_guard
/*
使用std::lock避免死锁
*/
#include <mutex>
#include <thread>
#include <iostream>
#include <vector>
#include <functional>
#include <chrono>
#include <string>
struct Employee {
Employee(std::string id) : id(id) {}
std::string id;
std::vector<std::string> lunch_partners;//午餐伙伴,互斥的
std::mutex m;//互斥锁
std::string output() const
{
std::string ret = "Employee " + id + " has lunch partners: ";
for (const auto& partner : lunch_partners)
ret += partner + " ";
return ret;
}
};
void send_mail(Employee&, Employee&)
{
// 模拟耗时的发信操作
std::this_thread::sleep_for(std::chrono::seconds(1));
}
//分配伙伴
void assign_lunch_partner(Employee& e1, Employee& e2)
{
static std::mutex io_mutex;
{
std::lock_guard<std::mutex> lk(io_mutex);//用于读输入参数内容的互斥锁,读完立刻释放
std::cout << e1.id << " and " << e2.id << " are waiting for locks" << std::endl;
}
{
// 用 std::lock获得二个employee的锁,而不担心对 assign_lunch_partner 的其他调用会死锁我们
std::lock(e1.m, e2.m);
std::lock_guard<std::mutex> lk1(e1.m, std::adopt_lock);
std::lock_guard<std::mutex> lk2(e2.m, std::adopt_lock);
// C++17 中可用的较优解法
// std::scoped_lock lk(e1.m, e2.m);
{
std::lock_guard<std::mutex> lk(io_mutex);//用于读输入参数内容的互斥锁,读完立刻释放
std::cout << e1.id << " and " << e2.id << " got locks" << std::endl;
}
e1.lunch_partners.push_back(e2.id);//在lock的保护之下,可以放心的插入数据
e2.lunch_partners.push_back(e1.id);
}
send_mail(e1, e2);
send_mail(e2, e1);
}
int main()
{
//定义四个员工
Employee alice("alice"), bob("bob"), christina("christina"), dave("dave");
// 创建四个线程指派伙伴,因为发邮件给用户告知午餐指派,会消耗长时间
std::vector<std::thread> threads;
threads.emplace_back(assign_lunch_partner, std::ref(alice), std::ref(bob));
threads.emplace_back(assign_lunch_partner, std::ref(christina), std::ref(bob));
threads.emplace_back(assign_lunch_partner, std::ref(christina), std::ref(alice));
threads.emplace_back(assign_lunch_partner, std::ref(dave), std::ref(bob));
//每个线程调用join
for (auto& thread : threads) thread.join();
//打印输出
std::cout << alice.output() << '\n' << bob.output() << '\n'
<< christina.output() << '\n' << dave.output() << '\n';
}
2.3 避免死锁的其他原则
- 避免嵌套锁:一个线程已获得一个锁时,再别去获取第二个。因为每个线程只持有一个锁,锁上就不会产生死锁
- 避免在持有锁时调用用户提供的代码:因为代码是用户提供的,你没有办法确定用户要做什么;用户程序可能做任何事情,包括获取锁。你在持有锁的情况下,调用用户提供的代码;如果用户代码要获取一个锁,就会违反第一个指导意见,并造成死锁
- 使用固定顺序获取锁:当硬性条件要求你获取两个或两个以上的锁,并且不能使用
std::lock
单独操作来获取它们;那么最好在每个线程上,用固定的顺序获取它们(锁) - 使用锁的层次结构:对你的应用进行分层,并且识别在给定层上所有可上锁的互斥量。当代码试图对一个互斥量上锁,在该层锁已被低层持有时,上锁是不允许的。
2.4 std::unique_lock——灵活的锁
std::unqiue_lock
使用更为自由的不变量,不会总与互斥量的数据类型相关,使用起来要比std:lock_guard
更加灵活。**unique_lock**
可以移动,但是不能复制(唯一)。
- 可将
std::adopt_lock
作为第二个参数传入构造函数,对互斥量进行管理 - 也可以将
std::defer_lock
作为第二个参数传递进去,表明互斥量应保持解锁状态。
这样,就可以被std::unique_lock
对象(不是互斥量)的lock()函数所获取,或传递std::unique_lock
对象到std::lock()
中。
当std::lock_guard
已经能够满足你的需求时,还是建议你继续使用它。当需要更加灵活的锁时,最好选择std::unique_lock
,因为它更适合于你的任务。
/*
std::unique_lock代替lock_guard,更具有灵活性
*/
#include <iostream>
#include <thread>
#include <mutex>
struct Box {
explicit Box(int num) : num_things(num) {}// explicit用于构造函数,意为不允许隐式类型转换
int num_things;
std::mutex m;
};
void transfer(Box& from, Box& to, int num)
{
//定义unique_lock
std::unique_lock<std::mutex> lock1(from.m, std::defer_lock);
std::unique_lock<std::mutex> lock2(to.m, std::defer_lock);
//使用std::lock获取两个锁,避免死锁
std::lock(lock1, lock2);
from.num_things -= num;
to.num_things += num;
std::cout << "from:" << from.num_things << ", to:" << to.num_things << std::endl;
//lock1, lock2析构的时候自动解锁
}
int main()
{
Box b1(100);
Box b2(50);
std::thread t1(transfer, std::ref(b1), std::ref(b2), 10);
std::thread t2(transfer, std::ref(b1), std::ref(b2), 5);
t1.join();
t2.join();
return 0;
}
2.5 锁的粒度
锁的粒度,用来描述通过一个锁保护着的数据量大小。一个细粒度锁(a fine-grained lock)能够保护较小的数据量,一个粗粒度锁(a coarse-grained lock)能够保护较多的数据量。
- 只有一个互斥量保护整个数据结构时:不仅会有更多对锁的竞争,也会增加持锁的时间。较多的操作步骤需要获取同一个互斥量上的锁,所以持有锁的时间会更长。成本上的双重打击也算是为向细粒度锁转移提供了双重激励和可能。
锁不仅是能锁住合适粒度的数据,还要控制锁的持有时间,以及哪些操作在执行的同时能够拥有锁。一般情况下,执行必要的操作时,尽可能将持有锁的时间缩减到最小。
有时,没有一个合适粒度级别,因为并不是所有对数据结构的访问都需要同一级的保护。这时就需要寻找一个合适的机制,去替换std::mutex
。
3. 其他保护手段
3.1 std::call_once和std::once_flag
std::call_once
准确调用函数 func
一次,即使从多个线程调用。函数 func
的完成与先前或后继的用同一 once_flag
对象,进行 call_once
调用同步。使用std::call_once
比显式使用互斥量消耗的资源更少。
如示例,即使有多个线程操作,但只会真正调用函数一次:
/*
使用std::onece和std::once_flag控制只调用一次的锁
*/
#include <iostream>
#include <thread>
#include <mutex>
//定义once_flag
static std::once_flag resource_flag;
void do_one()
{
std::cout << "call " << __FUNCTION__ << " once" << std::endl;
}
void func()
{
std::call_once(resource_flag, do_one);
}
int main()
{
std::thread t1(func);
std::thread t2(func);
t1.join();
t2.join();
return 0;
}
最后打印结果只会打印一次,表明do_one函数只被调用了一次。
3.2 共享锁
使用std::mutex
来保护数据结构,显的有些反应过度(因为在没有发生修改时,它将削减并发读取数据的可能性)。这里需要另一种不同的互斥量,这种互斥量常被称为“读者-作者锁”,因为其允许两种不同的使用方式:一个“作者”线程独占访问和共享访问,让多个“读者”线程并发访问。
C++提供了两种共享锁的类:std::shared_mutex
和std::shared_timed_mutex
的不同点在于
std::shared_timed_mutex
支持更多的操作方式std::shared_mutex
有更高的性能优势,从而不支持更多的操作。
两者在C++17中被支持,C++14只支持std::shared_timed_mutex
#include <mutex>
#include <shared_mutex>
#include <thread>
class ThreadSafeCounter {
public:
ThreadSafeCounter() = default;
// 多个线程/读者能同时读计数器的值。
unsigned int get() const {
std::shared_lock<std::shared_mutex> lock(mutex_);
return value_;
}
// 只有一个线程/写者能增加/写线程的值。
void increment() {
std::unique_lock<std::shared_mutex> lock(mutex_);
value_++;
}
// 只有一个线程/写者能重置/写线程的值。
void reset() {
std::unique_lock<std::shared_mutex> lock(mutex_);
value_ = 0;
}
private:
mutable std::shared_mutex mutex_;//定义共享锁
unsigned int value_ = 0;
};
3.3 嵌套锁
当一个线程已经获取一个std::mutex
时(已经上锁),并对其再次上锁,这个操作就是错误的,并且继续尝试这样做的话,就会产生未定义行为。然而,在某些情况下,一个线程尝试获取同一个互斥量多次,而没有对其进行一次释放是可以的。之所以可以,是因为C++标准库提供了std::recursive_mutex
类。可以对同一线程的单个实例上获取多个锁(lock多次),其他功能与std::mutex
相同。互斥量锁住其他线程前,必须释放拥有的所有锁,所以当调用lock()三次后,也必须调用unlock()三次。正确使用std::lock_guard<std::recursive_mutex>
和std::unique_lock<std::recursive_mutex>
可以帮你处理这些问题。
大多数情况下,当需要嵌套锁时,就要对代码设计进行改动,不推荐这样的使用方式,因为过于草率,并且不合理。