• 并发(Concurrent):程序的不同部分相互独立执行
  • 并行(Parallel):程序不同部分同时执行

1 线程

1.1 基本使用

  1. use std::thread;
  2. use std::time::Duration;
  3. fn main() {
  4. let handle = thread::spawn(|| {
  5. for i in 1..10 {
  6. println!("hi number {} from the spawned thread!", i);
  7. thread::sleep(Duration::from_millis(1));
  8. }
  9. });
  10. for i in 1..5 {
  11. println!("hi number {} from the main thread!", i);
  12. thread::sleep(Duration::from_millis(1));
  13. }
  14. handle.join().unwrap();
  15. }
  • thread::spawn()在创建线程失败时会panic,可以用std::thread::Builder::spawn()来创建线程,失败时不会panic
  • thread::sleep()thread::sleep_ms()thread::yield_now()
  • thread::current()thread::park/unpark()(暂停/恢复)

    1.2 对闭包使用move

  1. use std::thread;
  2. fn main() {
  3. let v = vec![1, 2, 3];
  4. // 编译通不过:闭包的生存周期可能超过本函数,但是却借用了属于本函数的变量 v
  5. // let handle = thread::spawn(|| {
  6. // 通过关键字 move,指示将借用的变量的所有权转移到闭包中,闭包外面变量 v 已经失效,不得再使用
  7. let handle = thread::spawn(move || {
  8. println!("Here's a vector: {:?}", v);
  9. });
  10. //drop(v);// 即使没有这一句也编译通不过: 这一句只是说明,可能 v 已经失效,但是执行闭包的线程仍然借用着 v,这是不允许的
  11. handle.join().unwrap();
  12. }

2 通道

2.1 基本使用

  1. use std::thread;
  2. use std::sync::mpsc;
  3. pub fn main() {
  4. let (tx, rx) = mpsc::channel();
  5. thread::spawn(move || {
  6. let val = String::from("hi");
  7. tx.send(val).unwrap();
  8. });
  9. let received = rx.recv().unwrap();
  10. println!("Got: {}", received);
  11. }
  • mpsc表示多个生产者、单个消费者
  • 接收方法:recvtry_recvrecv_timeoutrecv_deadlineitertry_iter
  • 发送端退出后,接收方将返回错误:但通道是缓冲的,发送端退出前发送的所有消息,接收方都可以接收
  • 不存在接收方时,发送将失败;存在接收方时,发送一定成功:系统会缓存发送的内容,不限制缓存数量
  • 被发送内容的所有权将转移到发送方法中!!!
  • 异步通道:mpsc::channel(),缓存数量无上限
  • 同步通道:mpsc::sync_channel(),必须指定一个缓存数量上限,通道满时发送方发送操作将阻塞;
  • 同步通道可以指定缓存数量上限为零

    2.2 使用多个发送端

  • 调用发送方的clone方法可以克隆出一个新的发送端,多个发送端可以并发工作

  1. pub fn main2() {
  2. let (tx, rx) = mpsc::channel();
  3. let tx1 = mpsc::Sender::clone(&tx);
  4. // 这样写也是正确的
  5. // let tx1 = tx.clone();
  6. thread::spawn(move || {
  7. let vals = vec![
  8. String::from("hi"),
  9. String::from("from"),
  10. String::from("the"),
  11. String::from("thread"),
  12. ];
  13. for val in vals {
  14. tx1.send(val).unwrap();
  15. thread::sleep(Duration::from_secs(1));
  16. }
  17. });
  18. thread::spawn(move || {
  19. let vals = vec![
  20. String::from("more"),
  21. String::from("messages"),
  22. String::from("for"),
  23. String::from("you"),
  24. ];
  25. for val in vals {
  26. tx.send(val).unwrap();
  27. thread::sleep(Duration::from_secs(1));
  28. }
  29. });
  30. for received in rx {
  31. println!("Got: {}", received);
  32. }
  33. }

3 共享状态

3.1 Mutex<T>

  1. use std::sync::Mutex;
  2. fn main() {
  3. let m = Mutex::new(5);
  4. {
  5. let mut num = m.lock().unwrap();
  6. *num = 6;
  7. }
  8. println!("m = {:?}", m);
  9. }
  • 创建:Mutex::new(),这是一个泛型关联方法,可封装任意类型
  • 获取封装的资源
    • lock():可能会阻塞,直到正确获取到锁,此时返回MutexGuard类型的值,MutexGuard离开作用域时自动开锁
    • 还可能不会阻塞,返回PoisonError,也就是锁被抛弃(拥有锁的线程发生panic,没有正常开锁),被锁保护的资源可能处于不一致状态,不应该使用
    • is_poisoned()判断锁是否被抛弃
    • try_lock():不会阻塞,可能的返回值有:(1) 正常,返回MutexGuard (2) WouldBlock,已经锁定,无法获取锁 (3) Poisoned,锁被抛弃

      3.2 用Arc<T>封装Mutex<T>以便在多个线程中使用

  1. use std::sync::{Mutex, Arc};
  2. use std::thread;
  3. fn main() {
  4. let counter = Arc::new(Mutex::new(0));// 用 Arc<T> 封装 Mutex<T>
  5. let mut handles = vec![];
  6. for _ in 0..10 {
  7. let counter = Arc::clone(&counter);// 克隆
  8. let handle = thread::spawn(move || {
  9. let mut num = counter.lock().unwrap();// Mutex<T> 实现了内部可变性,通过不可变的 counter 可以改变内部封装的值
  10. *num += 1;
  11. });
  12. handles.push(handle);
  13. }
  14. for handle in handles {
  15. handle.join().unwrap();
  16. }
  17. println!("Result: {}", *counter.lock().unwrap());
  18. }