基本认知

在多线程编程中,同步性极其的重要,当你需要同时访问一个资源、控制不同线程的执行次序时,都需要使用的到同步性。在 Rust 中有多种方式可以实现同步性。例如

  • 消息传递就是同步性的一种方式,通过消息传递来控制不同线程的执行次序
  • 还可以使用共享内存(锁、条件变量、信号量)来实现同步性,例如通过锁和原子操作等并发原语来实现多个线程同时且安全的去访问一个资源

该如何选择

  • 共享内存可以说是同步的灵魂,因为消息传递的底层实际上也是通过共享内存来实现,两者的区别如下
    • 共享内存相对消息传递能节省多次内存拷贝的成本
    • 共享内存的实现简洁得多
    • 共享内存的锁竞争更多
    • 使用共享内存(并发原语)的场景往往就比较简单粗暴:需要简洁的实现以及更高的性能时
    • 共享内存类似于一个多所有权的系统:多个线程可以同时访问同一个值
  • 消息传递适用的场景很多,如以下几个主要的使用场景
    • 需要可靠和简单的(简单不等于简洁)实现时
    • 需要模拟现实世界,例如用消息去通知某个目标执行相应的操作时
    • 需要一个任务处理流水线(管道)时,等等
    • 消息传递类似一个单所有权的系统:一个值同时只能有一个所有者,如果另一个线程需要该值的所有权,需要将所有权通过消息传递进行转移

互斥锁 Mutex

既然是共享内存,那并发原语自然是重中之重,先来一起看看皇冠上的明珠:互斥锁 Mutex**(mutual exclusion)** 。Mutex 让多个线程并发的访问同一个值变成了排队访问:同一时间,只允许一个线程A访问该值,其它线程需要等待A访问完成后才能继续

单线程中使用 Mutex

先来看看单线程中 Mutex 该如何使用

线程同步:共享内存(锁) - 图1

  1. 在注释中,已经大致的描述了代码的功能,不过有一点需要注意的是:和 Box 类似,数据被 Mutex 所拥有,要访问内部的数据,需要使用方法 m.lock() 向 m 申请一个锁,该方法会阻塞当前的线程,直到获取到锁,因此当多个线程同时访问该数据时,只有一个线程能获取到锁,其他线程只能阻塞等待,这样就能保证数据被安全的修改
  2. m.lock() 方法也可能报错,例如当前正在持有锁的线程 panic了。在这种情况下,其他线程不可能再获得锁,因此 lock 方法会返回一个错误。这里你可能奇怪, m.lock 明明返回一个锁,怎么就变成我们的数值 num 了?聪明的读者可能会想到智能指针,没错,因为 Mutex 是一个智能指针,准确的说是 m.lock() 返回一个智能指针 MutexGuard
    1. 它实现了 Deref 特征,会被自动解引用后获得一个引用类型,该引用指向 Mutex 内部的数据
    2. 它还实现了 Drop 特征,在超出作用域后,自动释放锁,以便其他线程能继续获得锁
  3. 正因为智能指针的使用,使得我们无需任何操作就能获取其中的数据。如果释放锁,你需要做的仅仅是做好锁的作用域管理,例如上述代码的内部花括号的使用。建议尝试去掉内部的花括号,然后尝试获取第二个锁 num1 ,看看会发生什么,友情提示,不会报错,但是主线程会永远阻塞,因为不幸发生了死锁

线程同步:共享内存(锁) - 图2

多线程中使用 Mutex

单线程中使用锁,说实话纯粹是为了演示功能,毕竟多线程才是锁的舞台。 现在,我们再来看看,如何在多线程下使用 Mutex 来访问同一个资源

无法运行的 Rc

  1. use std::rc::Rc;
  2. use std::sync::Mutex;
  3. use std::thread;
  4. fn main() {
  5. // 通过 Rc 实现 Mutex 的多所有权
  6. let counter = Rc::new(Mutex::new(0));
  7. let mut handles = vec![];
  8. for _ in 0..10 {
  9. let counter = Rc::clone(&counter);
  10. //创建子线程,并将 Mutex 的所有权拷贝传入到子线程中
  11. let handle = thread::spawn(move || {
  12. let mut num = counter.lock().unwrap();
  13. *num += 1;
  14. });
  15. handles.push(handle);
  16. }
  17. //等待所有子线程完成
  18. for handle in handles {
  19. handle.join().unwrap();
  20. }
  21. //输出最终的计数结果
  22. println!("Result: {}", *counter.lock().unwrap());
  23. }

由于子线程需要通过 move 拿走锁的所有权,因此我们需要使用多所有权来保证每个线程都拿到数据的独立所有权,恰好智能指针 Rc 可以做到,事实上,上面的代码会报错

线程同步:共享内存(锁) - 图3

错误中提到了一个关键点: Rc 无法在线程中传输,因为它没有实现 Send 特征,而该特征可以确保数据在线程中安全的传输

多线程安全的 Arc

好在有 Arc> ,得益于它的内部计数器是多线程安全的,因此可以在多线程环境中使用

  1. use std::sync::{Arc, Mutex};
  2. use std::thread;
  3. fn main() {
  4. let counter = Arc::new(Mutex::new(0));
  5. let mut handles = vec![];
  6. for _ in 0..10 {
  7. let counter = Arc::clone(&counter);
  8. //创建子线程,并将 Mutex 的所有权拷贝传入到子线程中
  9. let handle = thread::spawn(move || {
  10. let mut num = counter.lock().unwrap();
  11. *num += 1;
  12. });
  13. handles.push(handle);
  14. }
  15. //等待所有子线程完成
  16. for handle in handles {
  17. handle.join().unwrap();
  18. }
  19. //输出最终的计数结果
  20. println!("Result: {}", *counter.lock().unwrap());
  21. }
  22. // 以上代码可以顺利运行: Result:10

中毒的互斥量

  1. Mutex::lock() 返回一个 Result,原因与 JoinHandle::join() 一样:如果另一个线程诧异了,则可以优雅地收场(失败)。在写handle.join().unwrap() 时,我们是在告诉 Rust 把诧异从一个线程传播到另一个线程。常用的 mutex.lock().unwrap() 也一样
  2. 如果线程在持有 Mutex 时诧异了,那么 Rust 会将 Mutex 标记为已中毒。后续想要锁住这个受污染的 Mutex 的尝试都会得到一个错误结果。我们的 .unwrap() 调用告诉 Rust 在这种情况下要诧异,把其他线程的诧异传播到当前线程。诧异是安全的。诧异的线程保证了程序其余部分处在安全状态
  3. 基于诧异毒化互斥量并不是因为害怕未定义行为,而是担心你有可能正在使用不变性编程。由于程序诧异了,没有完成应该做的事就脱离了临界区,因此可能已经更新了受保护数据的某些字段,但尚未更新其他字段。为此,原先的不变性可能已经遭到破坏。 Rust 通过毒化这个互斥量来防止其他线程在不经意间也出现这种局面,从而避免把问题搞得更糟糕。在完全互斥的情况下,还是可以锁住中毒的互斥量并访问其中数据的。具体信息可以参考 PoisonError::into_inner() 的在线文档。不过你不会意外做到这一点的

内部可变性

提到过内部可变性

  • 其中 RcRefCell 的结合,可以实现单线程的内部可变性
  • 由于 Mutex 可以支持修改内部数据,当结合 Arc 一起使用时,可以实现多线程的内部可变性
  • Rc/RefCell 用于单线程内部可变性
  • Arc/Mutex 用于多线程内部可变性

Cell

std::cell::Cell 简单地包装了 T ,但允许通过共享引用进行更改。为避免未定义的行为,它只允许您将值复制出来(如果 T 为 Copy ),或将其整体替换为另一个值。此外,它只能在单线程内使用

线程同步:共享内存(锁) - 图4

Cell 上的限制并不总是很容易处理。由于它不能直接让我们借用它持有的值,我们需要移出一个值(在原处留下一些东西),修改它,然后再放回去,以改变它的内容

线程同步:共享内存(锁) - 图5

RefCell

与常规的 Cell 不同, std::cell::RefCell 允许您以较小的运行时成本**借用其内容。 RefCell 不仅包含 T ,而且还包含一个计数器,用于跟踪任何未完成的借用。如果你试图在它已经被可变地借用时借用它,它会发生恐慌,从而避免未定义的行为。就像 Cell 一样, RefCell 只能在单个线程中使用。通过调用 borrowborrow_mut** 来借用 RefCell 的内容

线程同步:共享内存(锁) - 图6

虽然 Cell 和 RefCell 可能非常有用,但当我们需要用多线程做一些事情时,它们就变得毫无用处。那么继续讨论与并发相关的类型

UnsafeCell

UnsafeCell 是内部可变性的原始构建模块。UnsafeCell 包装 T,但没有任何条件或限制来避免未定义行为。相反,它的 get() 方法只是提供一个指向它包装的值的原始指针,它只能在 unsafe 块中有意义的使用,它让用户以不会导致任何未定义行为的方式使用它。最常见的是, UnsafeCell 不直接使用,而是包装在另一种通过有限接口提供安全性的类型中,例如 Cell 或 Mutex 。所具有内部可变性的类型,包括上面讨论的所有类型都建立在 UnsafeCell 之上

读/写锁 RwLock

基本使用

服务器程序经常有一些配置信息,这些信息只需加载一次,之后很少会改变。大多数线程只查询这个配置,但由于配置还是有可能会改变(比如请求服务器从磁盘重新加载其配置),因此必须通过一个锁来保护。在类似这样的情况下,使用互斥量是可以的,但是个不必要的瓶颈。如果不是正在被修改,线程就不应该排队来查询配置。即 Mutex 会对每次读写都进行加锁,但某些时候,我们需要大量的并发读,Mutex 就无法满足需求了,此时就可以使用 RwLock


互斥量(锁)有一个 lock 方法,读 / 写锁则有两个:readwrite

  1. RwLock::write 方法与 Mutex::lock 类似,都是等待获取对受保护数据的专有 mut 访问
  2. RwLock::read 方法提供了非 mut 访问,其优点是不太可能需要等待,因为多个线程可以同时安全读取数据
  3. 在使用互斥量的情况下,在任意给定时刻,受保护数据只能有一个读取器或写入器(或者两者都没有)
  4. 在使用读 / 写锁的情况下,则可以有一个写入器或多个读取器,这看起来非常像 Rust 的引用

线程同步:共享内存(锁) - 图7

RwLock 在使用上和 Mutex 区别不大,需要注意的是,当读写同时发生时程序会直接panic(本例是单线程,实际上多个线程中也是如此),因为会发生死锁

线程同步:共享内存(锁) - 图8

好在我们可以使用 try_writetry_read尝试进行一次写和读,若失败返回错误

Mutex 还是 RwLock

  1. 首先简单性上 Mutex 完胜,因为使用 RwLock 你得操心几个问题
    1. 读和写不能同时发生,如果使用 try_xxx 解决,就必须做大量的错误处理和失败重试机制
    2. 当读多写少时。写操作可能会因为一直无法获得锁导致连续多次失败(写饥饿)
    3. RwLock 其实是操作系统提供的,实现原理比 Mutex 复杂得多,因此单就锁得性能而言,比不上原生实现的 Mutex
  2. 简单总结下两者的使用场景
    1. 追求高并发读取时,使用 RwLock ,因为 Mutex 一次只允许一个线程去读取(不管读还是写,都需要获取锁)
    2. 如果要保证写操作的成功性,使用 Mutex
    3. 不知道哪个不合适,统一使用 Mutex
  3. 需要注意的是, RwLock 虽然看上去貌似提供了高并发读取的能力,但这个不能说明它的性能比 Mutex 高,事实上 Mutex 的性能要好不少, Mutex 的唯一问题也仅仅在于不能并发读取
  4. 一个常见的、错误的使用 RwLock 的场景就是使用 HashMap 进行简单读写,因为 HashMap 的读和写都非常快, RwLock 的复杂实现和相对低的性能反而会导致整体性能的降低,因此一般来说更适合使用 Mutex
  5. 总之,如果你要使用 RwLock 要确保满足以下两个条件:并发读,且需要对读到的资源进行“长时间”的操作。HashMap 也许满足了并发读的需求,但是往往并不能满足后者:“长时间” 的操作
  6. Benchmark**(基准测试)**永远是你在迷茫时最好的朋友

三方库提供的锁实现

标准库在设计时总会存在取舍,因为往往性能不是最好的,如果你追求性能,可以使用第三方库提供的并发原语

  • parking_lot,功能更完善、稳定,社区较为活跃,star较多,更新较为活跃
  • spin,在多数场景中性能比 parking_lot 高一点,最近没怎么更新。如果不是追求特别极致的性能,建议选择前者

线程停放(Thread Parking )

  1. 等待来自另一个线程的通知的一种方法称为线程停放(thread parking)。线程可以自行停放(park),使其进入睡眠状态,从而停止消耗任何CPU周期,然后另一个线程可以取消停放的线程,将其从睡眠中唤醒
  2. 线程停放可以通过 std::thread::park() 函数获得。对于 unparking ,你可以在表示要 unpark 的线程的 Thread 对象上调用 unpark() 方法。这样的对象可以从 spawn 返回的 join handle 中获取,也可以通过 std::thread::current() 由线程自己获取

让我们深入研究一个使用互斥锁两个线程之间**共享队列**的示例。在下面的示例中,一个新生成的线程将使用队列中的项目,而主线程将每秒向队列中插入一个新项目。线程停放用于在队列为空时让消费线程等待

  1. use std::collections::VecDeque;
  2. fn main() {
  3. let queue = Mutex::new(VecDeque::new());
  4. thread::scope(|s| {
  5. // Consuming thread
  6. let t = s.spawn(|| loop {
  7. let item = queue.lock().unwrap().pop_front();
  8. if let Some(item) = item {
  9. dbg!(item);
  10. } else {
  11. thread::park();
  12. }
  13. });
  14. // Producing thread
  15. for i in 0.. {
  16. queue.lock().unwrap().push_back(i);
  17. t.thread().unpark();
  18. thread::sleep(Duration::from_secs(1));
  19. }
  20. });
  21. }
  • 消费线程运行一个无限循环,在该循环中它从队列中弹出项目以使用 dbg 宏显示它们。当队列为空时,它会停止并使用 park() 函数进入休眠状态。如果未停放, park() 调用返回, loop 继续,再次从队列中弹出项目,直到它为空。等等
  • 生产线程通过将其推入队列来每秒产生一个新数字。每次它添加一个项目时,它都会在引用消费线程的 Thread 对象上使用 unpark() 方法来取消它。这样,消费线程就会被唤醒以处理新元素
  • 这里要注意的是,如果我们取消停放,这个程序在理论上仍然是正确的,尽管效率低下。这很重要,因为 park() 不保证它只会因为匹配的 unpark() 而返回。虽然有点罕见,但它可能会有虚假的唤醒。我们的示例处理得很好,因为消费线程将锁定队列,看到它是空的,然后直接解锁并再次停放自己
  • 线程停放的一个重要属性是在线程停放自身之前对 unpark() 的调用不会丢失。取消停放的请求仍然被记录下来,下次线程尝试停放自己时,它会清除该请求并直接继续,而不会真正进入睡眠状态

为了了解为什么这对正确操作至关重要,让我们来看看两个线程执行的步骤的可能顺序

  1. 1. 消费线程,我们称它为 C ,锁定队列
  2. 2. C 试图从队列中弹出一个项目,但它是空的,导致 None
  3. 3. C 解锁队列
  4. 4. 生产线程,我们称之为 P,锁定队列
  5. 5. P 将一个新项目推送到队列中
  6. 6. P 再次解锁队列
  7. 7. P 调用 unpark() 通知 C 有新项目
  8. 8. C 调用 park() 进入睡眠状态,等待更多项目
  1. 虽然在第3步释放队列和第8步停放之间很可能只有很短的时间,但第4步到第7步可能发生在线程停放自身之前的那一刻。如果线程未停放, unpark() 将不执行任何操作,则通知将丢失。消费线程仍将等待,即使队列中有一个项目。由于保存了 unpark 请求以供将来调用 park() ,我们不必担心这一点
  2. 但是 unpark 请求不会叠加。调用 unpark() 两次然后再调用 park() 两次仍然会导致线程进入休眠状态。第一个 park() 清除请求直接返回,但是第二个照常休眠。这意味着在我们上面的例子中,重要的是我们只在我们看到队列为空时才停放线程,而不是在每个处理的项目之后停放它。虽然由于长时间( 1s )的休眠,此示例极不可能发生这种情况,但多个 unpark() 调用可能只唤醒一个 park() 调用
  3. 不幸的是,这确实意味着如果在 park() 返回之后立即调用 unpark() ,但在队列被锁定和清空之前,unpark() 调用是不必要的,但仍会导致下一个 park() 调用立即返回。这导致(空)队列被额外的锁定和解锁。虽然这不会影响程序的正确性,但会影响其效率和性能。这种机制适用于我们示例中的简单情况,但当事情变得复杂时很快就会崩溃。例如,如果我们有多个消费者线程从同一个队列中获取项目,生产者将无法知道哪个消费者实际上正在等待并且应该被唤醒。生产者必须确切的知道消费者何时在等待,以及它在等待什么条件

条件变量 Condvar

一个线程经常需要等待某个条件变为 true

  • 在服务器关机期间,主线程可能需要等待所有其他线程完全退出
  • 工作线程在没什么事可做时,需要等待要处理的数据
  • 实现分布式共识协议的线程可能需要等待足够多对等线程的响应

有时候,针对某个需要等待的条件会有方便的阻塞 API ,比如对服务器关机的例子有 JoinHandle::join 。但有时候没有内置的阻塞 API 。此时程序可以使用条件变量(condition variable)来构建自己的 API 。在 Rust 中, std::sync::Condvar 类型实现了条件变量。Condvar 有方法 .wait().notify_all() ,其中 .wait() 可以阻塞到某些线程调用 .notify_all() 。不过问题还会更复杂一些,因为条件变量始终代表由某个 Mutex 保护的数据或真或假的条件。为此,Mutex 和 Condvar 是有关系的

信号量 Semaphore

在多线程中,另一个重要的概念就是信号量,使用它可以让我们精准的控制当前正在运行的任务最大数量。想象一下,当一个游戏刚开服时,往往会控制游戏内玩家的同时在线人数,一旦超过某个临界值,就开始进行排队。而在实际使用中,也有很多时候,我们需要通过信号量来控制最大并发数,防止服务器资源被撑爆。本来 Rust 在标准库中有提供一个信号量实现,但是由于各种原因这个库现在已经不再推荐使用了,因此我们推荐使用 tokio 中提供的实现

  1. use std::sync::Arc;
  2. use tokio::sync::Semaphore
  3. #[tokio::main]
  4. async fn main() {
  5. let semaphore = Arc::new(Semaphore::new(3));
  6. let mut join_handles = Vec::new();
  7. for _ in 0..5 {
  8. let permit = semaphore.clone().acquire_owned().await.unwrap();
  9. join_handles.push(tokio::spawn(async move {
  10. //
  11. //在这里执行任务。。。
  12. //
  13. drop(permit);
  14. });
  15. }
  16. for handle in join_handles {
  17. handle.await.unwrap();
  18. }
  19. }
  • 上面代码创建了一个容量为 3 的信号量,当正在执行的任务超过 3 时,剩下的任务是需要等待正在执行任务完成并减少信号量到 3 以内时,才能继续执行
  • 这里的关键其实说白了在于:信号量的申请和归还,使用前需要申请信号量,如果容量满了,就需要等待;使用后需要释放信号量,以便其它等待者可以继续