基本认知

直接先上 Go 语言中的经典名言来感受一下为什么用消息传递机制来进行通信,而不是共享内存

Do not communicate by sharing memory

instead , share memory by communicating

多线程间有多种方式可以共享传递数据,最常用的方式就是通过消息传递或者将锁和Arc联合使用,而对于消息传递,在编程界还有一个大名鼎鼎的 Actor 线程模型 为其背书,典型的有 Erlang 语言,还有上述 Go 的名言

单发送者|单接收者管道

基本使用

标准库提供了通道 std::sync::mpsc ,其中 mpsc 是 multiple producer, single consumer 的缩写,代表了该通道支持多个发送者,但是只支持唯一的接收者。 当然,支持多个发送者也意味着支持单个发送者,我们先来看看单发送者、单接收者的简单例子

  1. use std::sync::mpsc;
  2. use std::thread;
  3. fn main() {
  4. // 创建一个消息通道, 返回一个元组:(发送者,接收者)
  5. let (tx, rx) = mpsc::channel();
  6. // 创建线程,并发送消息
  7. thread::spawn(move || {
  8. // 发送数字1, send方法返回Result<T,E> , 通过unwrap进行快速错误处理
  9. tx.send(1).unwrap();
  10. // 下面代码将报错,因为编译器自动推导出通道传递的值是i32类型,那么Option<i32>类型将产生不匹配错误
  11. // tx.send(Some(1)).unwrap()
  12. });
  13. // 在主线程中接收子线程发送的消息并输出
  14. println!("receive {}", rx.recv().unwrap());
  15. }

以上代码并不复杂,但需要注意以下几点

  • tx,rx 对应发送者和接收者,它们的类型编译器自动推导。tx.send(1) 发送了整数,一旦类型被推导确定,该通道就只能传递对应类型的值,例如上述例子被注释的代码报错
  • 接收消息的操作 rx.recv() 会阻塞当前线程,直到取到值或者通道被关闭
  • 需要使用 move 将 tx 的所有权转移到子线程的闭包中
  • send 方法返回 Result ,因为如果接收者 被 drop 导致发送的值不被任何人接收,此时毫无意义,因此返回一个错误最为合适。但是在实际项目中别偷懒用 unwrap() 处理
  • 同样的,对于 recv 方法来说,当发送者关闭时,它也会接收一个错误,用于说明不会再有任何值发送过来了

不阻塞的 try_recv 方法

除了上述 recv 方法,还可以使用 try_recv 尝试接收一次消息,该方法并不会阻塞线程,当通道中没有消息时,它会立刻返回一个错误

线程通信:消息传递(管道) - 图1

由于子线程的创建需要时间,因此 println! 和 try_recv 方法会先执行,而此时子线程的消息还未被发出。try_recv 会尝试立即读取一次消息,因为消息没有发出,此次读取最终会报错,且主线程运行结束(可悲的是,相对于主线程中的代码,子线程的创建速度实在是过慢,直到主线程结束,都无法完成子线程的初始化)

线程通信:消息传递(管道) - 图2

如上, try_recv 返回了一个错误,错误内容是 Empty ,代表通道并没有消息。如果你尝试把 println! 复制一些行,就会发现一个有趣的输出

线程通信:消息传递(管道) - 图3

如上,当子线程创建成功且发送消息后,主线程会接收到 Ok(1) 的消息内容,紧接着子线程结束,发送者也随着被 drop ,此时接收者又会报错,但是这次错误原因有所不同, Disconnected 代表发送者已经被关闭

传输具有所有权的数据

使用通道来传输数据,一样要遵循 rust 的所有权规则

  • 若值的类型实现了 Copy 特征,则直接复制一份该值,然后传输过去,例如之前的 i32 类型
  • 若值没有实现 Copy ,则它的所有权会被转移给接收端,在发送端继续使用该值将报错

来看看第二种情况

线程通信:消息传递(管道) - 图4

以上代码中,String 底层的字符串是存储在堆上,并没有实现 Copy 特征,当它被发送后,会将所有权从发送端的 s 转移给接收端的 received ,之后 s 将无法被使用

线程通信:消息传递(管道) - 图5

各种细节不禁令人感叹:Rust 还是安全!假如没有所有权的保护,String 字符串将被两个线程同时持有,任何一个线程对字符串内容的修改都会导致另外一个线程持有的字符串被改变,除非你故意这么设计,否则这就是不安全的隐患

使用 for 进行循环接收

下面来看看如何连续接收通道中的值

线程通信:消息传递(管道) - 图6

在上面代码中,主线程和子线程是并发运行的,子线程在不停的发送消息->休眠1秒,与此同时,主线程使用 for 循环阻塞从 rx 迭代器中接收消息,当子线程运行完成时,发送者 tx 会随之被 drop ,此时 for 循环将被终止,最终 main 线程成功结束

多发送者|单接收者管道

这个模式下,由于子线程会拿走发送者的所有权,因此我们必须对发送者进行克隆,然后让每个线程拿走它的一份拷贝

  1. use std::sync::mpsc;
  2. use std::thread;
  3. fn main() {
  4. let (tx, rx) = mpsc::channel();
  5. let tx1 = tx.clone();
  6. thread::spawn(move || {
  7. tx.send(String::from("hi from raw tx")).unwrap();
  8. });
  9. thread::spawn(move || {
  10. tx1.send(String::from("hi from cloned tx")).unwrap();
  11. });
  12. for received in rx {
  13. println!("Got: {}", received);
  14. }
  15. }

代码并无太大区别,就多了一个对发送者的克隆 let tx1 = tx.clone(); ,然后一个子线程拿走 tx 的所有权,另一个子线程拿走 tx1 的所有权。但是有几点需要注意

  • 需要所有的发送者都被 drop 掉后,接收者 rx 才会收到错误,进而跳出 for 循环,最终结束主线程
  • 这里虽然用了 clone 但是并不会影响性能,因为它并不在热点代码路径中,仅仅会被执行一次
  • 由于两个子线程谁先创建完成是未知的,因此哪条消息先发送也是未知的,最终主线程的输出顺序也不确定
  • 特别注意:上述第三点的消息顺序仅仅是因为线程创建引起的,并不代表通道中的消息是无序的,对于通道而言,消息的发送顺序和接收顺序是一致的,满足 FIFO 原则(先进先出)

多发送者|互斥量的多消费者管道

前面提到过, Rust 通道是多生产者、单消费者的。或者更具体地说,一个通道只有一个Receiver。任何线程池都不能有多个线程使用一个 mpsc 通道共享工作成果。不过,有个非常简单的方式可以绕过这个限制,只需要使用标准库。可以为 Receiver 添加一个 Mutex ,然后再共享。下面是一个实现它的模块

  1. pub mod shared_channel {
  2. use std::sync::{Arc, Mutex};
  3. use std::sync::mpsc::{channel, Sender, Receiver};
  4. /// 对 Receiver 的线程安全的封装
  5. #[derive(Clone)]
  6. pub struct SharedReceiver<T>(Arc<Mutex<Receiver<T>>>);
  7. impl<T> Iterator for SharedReceiver<T> {
  8. type Item = T;
  9. /// 从封装的接收者获取下一项
  10. fn next(&mut self) -> Option<T> {
  11. let guard = self.0.lock().unwrap();
  12. guard.recv().ok()
  13. }
  14. }
  15. /// 创建一个新通道,其接收者可以跨线程共享。这会返回一个发送者和一个接收者
  16. /// 与stdlib的channel()类似,有时候可以直接代替它使用
  17. pub fn shared_channel<T>() -> (Sender<T>, SharedReceiver<T>) {
  18. let (sender, receiver) = channel();
  19. (sender, SharedReceiver(Arc::new(Mutex::new(receiver))))
  20. }
  21. }

这里使用了一个 Arc>> ,其中泛型被嵌套了很多层。这种情况在 Rust 中比在 C++ 中更常见。只要依次读出它们的名字就可以理解其含义,如下图所示

线程通信:消息传递(管道) - 图7

更好的性能之 mpmc (多发多收)

如果你需要 mpmc (多发送者,多接收者)或者需要更高的性能,可以考虑第三方库

同步和异步管道

Rust 标准库的 mpsc 管道其实分为两种类型:同步和异步

异步管道

之前我们使用的都是异步通道:无论接收者是否正在接收消息,消息发送者在发送消息时都不会被阻塞

线程通信:消息传递(管道) - 图8

运行后输出如下

线程通信:消息传递(管道) - 图9

主线程因为睡眠阻塞了3秒,因此并没有进行消息接收,而子线程却在此期间轻松完成了消息的发送。等主线程睡眠结束后,才姗姗来迟的从通道中接收了子线程老早之前发送的消息。从输出可以看出,发送之前和发送之后是连续输出的,没有受到接收端主线程的任何影响,因此通过 mpsc::channel** 创建的通道是异步通道**

同步管道

与异步通道相反,同步通道发送消息是阻塞的,只有在消息被接收后才解除阻塞,例如

线程通信:消息传递(管道) - 图10

运行后输出如下

线程通信:消息传递(管道) - 图11

可以看出,主线程由于睡眠被阻塞导致无法接收消息,因此子线程的发送也一直被阻塞,直到主线程结束睡眠并成功接收消息后,发送才成功:发送之后的输出是在 receive 1 之后,说明只有接收消息彻底成功后,发送消息才算完成

消息缓存(管道缓存)

细心的读者可能已经发现在创建同步通道时,我们传递了一个参数0mpsc::sync_channel(0); 这是什么意思呢?先将0改成1,然后再运行试试

线程通信:消息传递(管道) - 图12

竟然得到了和异步通道一样的效果:根本没有等待主线程的接收开始,消息发送就立即完成了!难道同步通道变成了异步通道?别急,将子线程中的代码修改下试试

线程通信:消息传递(管道) - 图13

在子线程中,我们又多发了一条消息,此时输出如下

线程通信:消息传递(管道) - 图14

  • 更奇怪的事出现了,第一条消息瞬间发送完成,没有阻塞,而发送第二条消息时却符合同步通道的特点,阻塞了,直到主线程接收后,才发送完成
  • 其实一切关键就在于 1 上,该值可以用来指定同步通道的消息缓存条数,当你设定为N时,发送者就可以无阻塞的往通道中发送 N 条消息,当消息缓冲队列满了后,新的消息发送将被阻塞(如果没有接收者消费缓冲队列中的消息,那么第 N-1 条消息就将触发发送阻塞)
  • 问题又来了,异步通道创建时完全没有这个缓冲值参数 mpsc::channel()** ,它的缓冲值怎么设置呢?都异步了,都可以无限发送了**,事实上异步通道的缓冲上限取决于你的内存大小,不要撑爆就行
  • 因此使用异步消息虽然能非常高效且不会造成发送线程的阻塞,但是存在消息未及时消费,最终内存过大的问题。在实际项目中,可以考虑使用一个带缓冲值的同步通道来避免这种风险

关闭管道

之前我们数次提到了通道关闭,并且提到了当通道关闭后,发送消息或接收消息将会报错。那么如何关闭通道呢?很简单,所有发送者被 drop 或所有接收者被 drop 后通道自动关闭。Rust 的 Drop trait没有性能损耗,原因如下

  • 零开销抽象:Rust 语言的设计原则之一是尽可能减少运行时的开销。Drop trait 的实现方式是允许程序员在对象被销毁时执行一些代码,但这些代码的执行并不会对程序的运行时性能产生影响
  • 优化编译器:Rust 的编译器对 Drop trait 进行了优化。当一个对象被销毁时,编译器会生成相应的代码来调用该对象的 Drop trait 中的代码。这个过程是编译时确定的,因此不会在运行时产生额外的开销
  • 避免内存泄漏:Rust 的内存管理机制可以自动管理内存的分配和释放。使用 Drop trait 可以确保在对象被销毁时释放相关的资源,从而避免内存泄漏

传输多种类型的数据

之前提到过,一个消息通道只能传输一种类型的数据,如果你想要传输多种类型的数据

  • 可以为每个类型创建一个通道
  • 你也可以使用枚举类型来实现

线程通信:消息传递(管道) - 图15

如上所示,枚举类型还能让我们带上想要传输的数据,但是有一点需要注意,Rust 会按照枚举中占用内存最大的哪个成员进行内存对齐,这意味着就算你传输的是枚举中占用内存最小的成员,它占用的内存依然和最大的成员相同,因此会造成内存上的浪费

新手容易遇到的坑:一直阻塞

mpsc 虽然相当简洁明了,但是在使用起来还是可能存在坑

  1. use std::sync::mpsc;
  2. fn main() {
  3. use std::thread;
  4. let (send, recv) = mpsc::channel();
  5. let num_threads = 3;
  6. for i in 0..num_threads {
  7. let thread_send = send.clone();
  8. thread::spawn(move || {
  9. thread_send.send(i).unwrap();
  10. println!("thread {:?} finished", i);
  11. });
  12. }
  13. // 在这里 drop send ...
  14. for x in recv {
  15. println!("Got: {}", x);
  16. }
  17. println!("finished iterating");
  18. }
  • 以上代码看起来非常正常,但是运行后主线程会一直阻塞,最后一行打印输出也不会被执行,原因在于,子线程拿走的是复制后的send的所有权,这些拷贝会在子线程结束后被 drop ,因此无需担心,但是send本身却直到 main 函数的结束才会被 drop
  • 之前提到,通道关闭的两个条件:发送者者接收者全部被 drop,要结束 for 循环显然是要求发送者全部 drop,但是由于 send 自身没有被 drop,会导致该循环永远无法结束,最终主线程会一直被阻塞
  • 解决办法很简单,drop 掉 send 即可,在代码中的注释下面添加一行 drop(send)