参考:https://kaisery.github.io/trpl-zh-cn/ch16-00-concurrency.html
安全且高效的处理并发编程是 Rust 的另一个主要目标。
并发编程Concurrent programming ),代表程序的不同部分相互独立的执行
并行编程parallel programming )代表程序不同部分于同时执行
所有权和类型系统是一系列解决内存安全 并发问题的强有力的工具!通过利用所有权和类型检查,在 Rust 中很多并发错误都是 编译时 错误,而非运行时错误。因此,相比花费大量时间尝试重现运行时并发 bug 出现的特定情况,Rust 会拒绝编译不正确的代码并提供解释问题的错误信息。因此,你可以在开发时修复代码,而不是在部署到生产环境后修复代码。我们给 Rust 的这一部分起了一个绰号 无畏并发fearless concurrency )。无畏并发令你的代码免于出现诡异的 bug 并可以轻松重构且无需担心会引入新的 bug。

注意:出于简洁的考虑,我们将很多问题归类为 并发 ,而不是更准确的区分 并发和(或)并行 。如果这是一本专注于并发和/或并行的书,我们肯定会更加精确的。对于本章,当我们谈到 并发 时,请自行脑内替换为 并发和(或)并行

很多语言所提供的处理并发问题的解决方法都非常有特色。例如:

  • Erlang 有着优雅的消息传递并发功能,但只有模糊不清的在线程间共享状态的方法
  • 对于高级语言来说,只实现可能解决方案的子集是一个合理的策略,因为高级语言所许诺的价值来源于牺牲一些控制来换取抽象
  • 对于底层语言则期望提供在任何给定的情况下有着最高的性能且对硬件有更少的抽象。因此,Rust 提供了多种工具,以符合实际情况和需求的方式来为问题建模。

如下是本章将要涉及到的内容:

  • 如何创建线程来同时运行多段代码。
  • 消息传递Message passing )并发,其中通道(channel)被用来在线程间传递消息。
  • 共享状态Shared state )并发,其中多个线程可以访问同一片数据。
  • SyncSend trait,将 Rust 的并发保证扩展到用户定义的以及标准库提供的类型中。

    thread::spawn:创建线程

    在大部分现代操作系统中,已执行程序的代码在一个 进程process )中运行,操作系统则负责管理多个进程。
    在程序内部,也可以拥有多个同时运行的独立部分。运行这些独立部分的功能被称为 线程threads )。
    将程序中的计算拆分进多个线程可以改善性能,因为程序可以同时进行多个任务,不过这也会增加复杂性。因为线程是同时运行的,所以无法预先保证不同线程中的代码的执行顺序。这会导致诸如此类的问题:

  • 竞争状态(Race conditions),多个线程以不一致的顺序访问数据或资源

  • 死锁(Deadlocks),两个线程相互等待对方停止使用其所拥有的资源,这会阻止它们继续运行
  • 只会发生在特定情况且难以稳定重现和修复的 bug

不同的线程模型:

  1. 1:1 模型:很多操作系统提供了创建新线程的 API,这种由编程语言调用操作系统 API 创建线程的模型有时被称为 1:1 ,一个 OS 线程对应一个语言线程。
  2. M:N 模型:编程语言提供了自己特殊的线程实现,这种线程被称为绿色green)线程,使用绿色线程的语言会在不同数量的 OS 线程的上下文中执行它们。为此,绿色线程模式被称为 M:N 模型:M个绿色线程对应N个 OS 线程,这里 MN 不必相同。

每一个模型都有其优势和取舍。对于 Rust 来说最重要的取舍是运行时支持。运行时Runtime )是一个令人迷惑的概念,其在不同上下文中可能有不同的含义:

  • 在当前上下文中,运行时 代表二进制文件中包含的由语言自身提供的代码。这些代码根据语言的不同可大可小,不过任何非汇编语言都会有一定数量的运行时代码
  • 通常人们说一个语言 “没有运行时”,一般意味着 “小运行时”
  • 更小的运行时拥有更少的功能不过其优势在于更小的二进制输出,这使其易于在更多上下文中与其他语言相结合
  • 虽然很多语言觉得增加运行时来换取更多功能没有什么问题,但是 Rust 需要做到几乎没有运行时,同时为了保持高性能必须能够调用 C 语言,这点也是不能妥协的。

Rust 选取的线程模型:

  • 绿色线程的 M:N 模型需要更大的语言运行时来管理这些线程。因此,Rust 标准库只提供了 1:1 线程模型实现。
  • 由于 Rust 是较为底层的语言,如果你愿意牺牲性能来换取抽象,以获得对线程运行更精细的控制及更低的上下文切换成本,你可以使用实现了 M:N 线程模型的 crate。 ``` use std::thread; use std::time::Duration;

fn main() { // 调用 thread::spawn 函数,创建一个新线程,并传递一个闭包,在闭包中包含希望在新线程运行的代码 let handle = thread::spawn(|| { for i in 1..10 { println!(“hi number {} from the spawned thread!”, i); thread::sleep(Duration::from_millis(1)); } }); // thread::spawn 的返回值类型是 JoinHandle,把返回值储存在变量中来修复新建线程部分没有执行或者完全没有执行的问题

  1. for i in 1..5 {
  2. println!("hi number {} from the main thread!", i);
  3. thread::sleep(Duration::from_millis(1));
  4. }
  5. // JoinHandle 类型是一个拥有所有权的值,当对其调用 join 方法时,它会等待其线程结束
  6. // 如果把这个命令放在 main 函数的 `for` 前面,会让线程运行完在开始 `for` 语句
  7. handle.join().unwrap();

}

  1. 通过调用 handle `join` 会阻塞当前线程直到 handle 所代表的线程结束。**阻塞** _Blocking_ 线程意味着阻止该线程执行工作或退出。<br />输出结果:这两个线程仍然会交替执行,不过主线程会由于 `handle.join()` 调用会等待直到新建线程执行完毕。

hi number 1 from the main thread! hi number 2 from the main thread! hi number 1 from the spawned thread! hi number 3 from the main thread! hi number 2 from the spawned thread! hi number 4 from the main thread! hi number 3 from the spawned thread! hi number 4 from the spawned thread! hi number 5 from the spawned thread! hi number 6 from the spawned thread! hi number 7 from the spawned thread! hi number 8 from the spawned thread! hi number 9 from the spawned thread!

  1. 在参数列表 `||` 前使用 `move` 关键字,可以强制闭包获取其使用的环境值的所有权。这个技巧在创建新线程将值的所有权从一个线程移动到另一个线程时最为实用。

use std::thread;

fn main() { let v = vec![1, 2, 3];

  1. let handle = thread::spawn(move || {
  2. println!("Here's a vector: {:?}", v);
  3. });
  4. // move 关键字通过告诉 Rust 将 v 的所有权移动到新建线程,我们向 Rust 保证主线程不会再使用 v
  5. // 那么当在主线程中使用 v 时就会违反所有权规则
  6. handle.join().unwrap();

}

  1. # `mpsc::Channel`:消息传递
  2. Rust 中一个实现消息传递并发的主要工具是 **通道** _channel_ ),Rust 标准库提供了其实现的编程概念。<br />通道有两部分组成,一个发送者(transmitter)和一个接收者(receiver)。发送者位于上游位置,在这里可以将橡皮鸭放入河中,接收者则位于下游,橡皮鸭最终会漂流至此。代码中的一部分调用发送者的方法以及希望发送的数据,另一部分则检查接收端收到的消息。当发送者或接收者任一被丢弃时可以认为通道被 **关闭**(_closed_)了。

use std::thread; use std::sync::mpsc;

fn main() { // 使用 mpsc::channel 函数创建一个新的通道 // mpsc 是 多个生产者,单个消费者(multiple producer, single consumer)的缩写 // 意味着一个通道可以有多个产生值的 发送(sending)端,但只能有一个消费这些值的 接收(receiving)端 // tx 和 rx 通常作为 发送者(transmitter)和 接收者(receiver)的缩写 let (tx, rx) = mpsc::channel();

  1. // 使用 thread::spawn 来创建一个新线程
  2. // 并使用 move 将 tx 移动到闭包中这样新建线程就拥有 tx 了
  3. // 新建线程需要拥有通道的发送端以便能向通道发送消息
  4. thread::spawn(move || {
  5. let val = String::from("hi");
  6. // send 方法用来获取需要放入通道的值,它返回一个 Result<T, E> 类型
  7. // 如果接收端已经被丢弃了,将没有发送值的目标,所以发送操作会返回错误
  8. tx.send(val).unwrap();
  9. // val 的所有权由 send 方法被转移走了,因此无法再使用 val
  10. });
  11. let received = rx.recv().unwrap();
  12. println!("Got: {}", received);

}

// 打印结果 // Got: hi

  1. 通道的接收端有两个有用的方法:`recv` `try_recv`。这里,我们使用了 `recv`,它是 _receive_ 的缩写。这个方法会阻塞主线程执行直到从通道中接收一个值。一旦发送了一个值,`recv` 会在一个 `Result<T, E>` 中返回它。当通道发送端关闭,`recv` 会返回一个错误表明不会再有新的值到来了。<br />`try_recv` 不会阻塞,相反它立刻返回一个 `Result<T, E>``Ok` 值包含可用的信息,而 `Err` 值代表此时没有任何消息。如果线程在等待消息过程中还有其他工作时使用 `try_recv` 很有用:可以编写一个循环来频繁调用 `try_recv`,在有可用消息时进行处理,其余时候则处理一会其他工作直到再次检查。<br />出于简单的考虑,这个例子使用了 `recv`;主线程中除了等待消息之外没有任何其他工作,所以阻塞主线程是合适的。
  2. ---
  3. 以下面改进的例子证明代码是并发执行的:新建线程现在会发送多个消息并在每个消息之间暂停一秒钟。

use std::thread; use std::sync::mpsc; use std::time::Duration;

fn main() { let (tx, rx) = mpsc::channel();

  1. thread::spawn(move || {
  2. let vals = vec![
  3. String::from("hi"),
  4. String::from("from"),
  5. String::from("the"),
  6. String::from("thread"),
  7. ];
  8. for val in vals {
  9. tx.send(val).unwrap();
  10. thread::sleep(Duration::from_secs(1));
  11. }
  12. });
  13. for received in rx {
  14. println!("Got: {}", received);
  15. }

}

// 打印结果 // Got: hi // Got: from // Got: the // Got: thread

  1. 在新建线程中有一个字符串 vector 希望发送到主线程。我们遍历他们,单独的发送每一个字符串并通过一个 `Duration` 值调用 `thread::sleep` 函数来暂停一秒。<br />在主线程中,不再显式调用 `recv` 函数:而是将 `rx` 当作一个迭代器。对于每一个接收到的值,我们将其打印出来。当通道被关闭时,迭代器也将结束。<br />因为主线程中的 `for` 循环里并没有任何暂停或等待的代码,所以可以说主线程是在等待从新建线程中接收值。
  2. ---
  3. 对通道的发送端调用了 `clone` 方法。这会给我们一个可以传递给第一个新建线程的发送端句柄。我们会将原始的通道发送端传递给第二个新建线程。这样就会有两个线程,每个线程将向通道的接收端发送不同的消息。

use std::sync::mpsc; use std::thread; use std::time::Duration;

fn main() { let (tx, rx) = mpsc::channel();

  1. let tx1 = mpsc::Sender::clone(&tx);
  2. thread::spawn(move || {
  3. let vals = vec![String::from("hi"),
  4. String::from("from"),
  5. String::from("the"),
  6. String::from("thread"),];
  7. for val in vals {
  8. tx1.send(val).unwrap();
  9. thread::sleep(Duration::from_secs(1));
  10. }
  11. });
  12. thread::spawn(move || {
  13. let vals = vec![String::from("more"),
  14. String::from("messages"),
  15. String::from("for"),
  16. String::from("you"),];
  17. for val in vals {
  18. tx.send(val).unwrap();
  19. thread::sleep(Duration::from_secs(1));
  20. }
  21. });
  22. for received in rx {
  23. println!("Got: {}", received);
  24. }

}

  1. 虽然你可能会看到这些值以不同的顺序出现;这依赖于你的系统。这也就是并发既有趣又困难的原因。如果通过 `thread::sleep` 做实验,在不同的线程中提供不同的值,就会发现他们的运行更加不确定,且每次都会产生不同的输出。

Got: hi Got: more Got: from Got: messages Got: for Got: the Got: thread Got: you

  1. # `Mutex`:共享状态
  2. 在某种程度上,任何编程语言中的通道都类似于单所有权,因为一旦将一个值传送到通道中,将无法再使用这个值。<br />共享内存类似于多所有权:多个线程可以同时访问相同的内存位置。Rust 的类型系统和所有权规则极大的协助了正确地管理这些所有权。<br />**互斥器** _mutex_ )是 _mutual exclusion_ 的缩写,也就是说,任意时刻,其只允许一个线程访问某些数据。为了访问互斥器中的数据,线程首先需要通过获取互斥器的 **锁** _lock_ )来表明其希望访问数据。锁是一个作为互斥器一部分的数据结构,它记录谁有数据的排他访问权。因此,我们描述互斥器为通过锁系统 **保护** _guarding_ )其数据。<br />互斥器以难以使用著称,因为你不得不记住:
  3. 1. 在使用数据之前尝试获取锁。
  4. 1. 处理完被互斥器所保护的数据之后,必须解锁数据,这样其他线程才能够获取锁。
  5. 正确的管理互斥器异常复杂,这也是许多人之所以热衷于通道的原因。然而,在 Rust 中,得益于类型系统和所有权,我们不会在锁和解锁上出错。<br />`Mutex<T>` 的核心使用方式:

use std::sync::Mutex; // 出于简单的考虑,在一个单线程上下文中探索 Mutex 的 API fn main() { // 使用关联函数 new 来创建一个 Mutex let m = Mutex::new(5);

  1. {
  2. // 使用 lock 方法获取锁,以访问互斥器中的数据
  3. // 这个调用会阻塞当前线程,直到我们拥有锁为止
  4. // 如果另一个线程拥有锁,并且那个线程 panic 了,则 lock 调用会失败。在这种情况下,没人能够再获取锁
  5. // 一旦获取了锁,就可以将返回值(在这里是 num )视为一个其内部数据的可变引用了
  6. let mut num = m.lock().unwrap();
  7. // lock 调用 返回 一个叫做 MutexGuard 的智能指针
  8. // 这个智能指针实现了 Deref 来指向其内部数据;其也提供了一个 Drop 实现当 MutexGuard 离开作用域时自动释放锁,这正发生于内部作用域的结尾
  9. *num = 6;
  10. } // 我们不会冒忘记释放锁并阻塞互斥器为其它线程所用的风险,因为锁的释放是自动发生的
  11. println!("m = {:?}", m); // m = Mutex { data: 6 }

}

  1. 使用 `Mutex<T>` 在多个线程间共享值:启动十个线程,并在各个线程中对同一个计数器值加一,这样计数器将从 0 变为 10。<br />这里需要用到多所有权,`Arc<T>` `Rc<T>` 有着相同的 API,而 `Arc<T>` **正是** 这么一个类似 `Rc<T>` 并可以安全的用于并发环境的类型。字母 A 代表 **原子性** _atomic_ ),所以这是一个**原子引用计数** _atomically reference counted_ )类型。<br />原子性是另一类这里还未涉及到的并发原语:请查看标准库中 `std::sync::atomic` [文档](254f519e28669d10bf3e4c1b3b50393b) 来获取更多细节。其中的要点就是:原子性类型工作起来类似原始类型,不过可以安全的在线程间共享。

use std::sync::{Mutex, Arc}; use std::thread;

fn main() { let counter = Arc::new(Mutex::new(0)); let mut handles = vec![];

  1. for _ in 0..10 {
  2. let counter = Arc::clone(&counter);
  3. let handle = thread::spawn(move || {
  4. let mut num = counter.lock().unwrap();
  5. *num += 1;
  6. });
  7. handles.push(handle);
  8. }
  9. for handle in handles {
  10. handle.join().unwrap();
  11. }
  12. println!("Result: {}", *counter.lock().unwrap()); // Result: 10

}

`` 使用这个策略,可将计算分成独立的部分,分散到多个线程中,接着使用Mutex` 使用各自的结算结果更新最终的结果。
RefCell/Rc 与 Mutex/Arc 的相似性:

  • 你可能注意到了,因为 counter 是不可变的,不过可以获取其内部值的可变引用;这意味着 Mutex 提供了内部可变性,就像 Cell 系列类型那样。正如使用 RefCell 可以改变 Rc 中的内容那样,同样的可以使用 Mutex 来改变 Arc 中的内容。
  • 另一个值得注意的细节是 Rust 不能避免使用 Mutex的全部逻辑错误。回忆一下使用 Rc就有造成引用循环的风险,这时两个 Rc值相互引用,造成内存泄漏。同理,Mutex也有造成 死锁(deadlock) 的风险。

  • 这发生于当一个操作需要锁住两个资源而两个线程各持一个锁,这会造成它们永远相互等待。

  • todo:Mutex & deadlock# 如果你对这个主题感兴趣,尝试编写一个带有死锁的 Rust 程序,接着研究任何其他语言中使用互斥器的死锁规避策略并尝试在 Rust 中实现他们。标准库中 Mutex和 MutexGuard 的 API 文档会提供有用的信息。

使用 SyncSend trait 的可扩展并发

Rust 的并发模型中一个有趣的方面是:语言本身对并发知之 甚少 。我们之前讨论的几乎所有内容,都属于标准库,而不是语言本身的内容。由于不需要语言提供并发相关的基础设施,并发方案不受标准库或语言所限:我们可以编写自己的或使用别人编写的并发功能。
然而有两个并发概念是内嵌于语言中的:std::marker 中的 SyncSend trait。
Send mark trait 表明类型的所有权可以在线程间传递。几乎所有的 Rust 类型都是Send 的,不过有一些例外:

  1. Rc<T>:这是不能 Send 的,因为如果克隆了 Rc<T> 的值并尝试将克隆的所有权转移到另一个线程,这两个线程都可能同时更新引用计数。
  2. 裸指针(raw pointer)。

Sync mark trait 表明允许多线程访问,即一个实现了 Sync 的类型可以安全的在多个线程中拥有其值的引用。换一种方式来说,对于任意类型 T,如果 &TT 的引用)是 Send 的话 T 就是 Sync 的,这意味着其引用就可以安全的发送到另一个线程。不是 Sync 的类型:

  1. Rc<T> 也不是 Sync 的,出于其不是 Send 相同的原因。
  2. RefCell<T>(第十五章讨论过)和 Cell<T> 系列类型不是 Sync 的。RefCell<T> 在运行时所进行的借用检查也不是线程安全的。Mutex<T>Sync 的,正如 “在线程间共享 Mutex<T> 部分所讲的它可以被用来在多线程中共享访问。

通常并不需要手动实现 SendSync trait,因为由 SendSync 的类型组成的类型,自动就是 SendSync 的。因为他们是标记 trait,甚至都不需要实现任何方法。他们只是用来加强并发相关的不可变性的。
手动实现这些标记 trait 涉及到编写不安全的 Rust 代码,#todo:手动标记 trait#;当前重要的是,在创建新的由不是 SendSync 的部分构成的并发类型时需要多加小心,以确保维持其安全保证。The Rustonomicon 中有更多关于这些保证以及如何维持他们的信息。
为什么说 Rust 是无畏并发的:

  1. Rust 提供了用于消息传递的通道,和像 Mutex<T>Arc<T> 这样可以安全的用于并发上下文的智能指针。
  2. 类型系统和借用检查器会确保这些场景中的代码,不会出现数据竞争和无效的引用。
  3. 一旦代码可以编译了,我们就可以坚信这些代码可以正确的运行于多线程环境,而不会出现其他语言中经常出现的那些难以追踪的 bug。