1 同步模式

1 保护性暂停

  • 保护性暂停(Guarded Suspension)定义

    • 一个线程等待另一个线程的执行结果
    • 产生结果的线程和消费结果的线程一一对应
    • JDK中,thread.join()的实现、Future的实现,采用的就是此模式
    • 因为要等待另一方的结果,因此归类到同步模式
  • 实现过程

    • 有一个结果需要从一个线程传递到另一个线程,让他们关联同一个GuardedObject
    • 如果有结果不断从一个线程到另一个线程那么可以使用消息队列(见生产者/消费者模式)

image.png

  • 实例 - 多任务版GuardedObject

image.png
图中Futures就好比居民楼一层的信箱(每个信箱有房间编号),左侧的t0,t2,t4就好比等待邮件的居民,右侧的t1,t3,t5 就好比邮递员
如果需要在多个类之间使用多个GuardedObject对象,作为参数传递不是很方便,因此设计一个用来解耦的中间类Mailboxes(RPC框架中也用到了这个思想),这样不仅能够解耦【结果等待者】和【结果生产者】,还能够同时支持多个任务的管理

  • 代码如下 ```java import lombok.extern.slf4j.Slf4j;

import java.util.Hashtable; import java.util.Map; import java.util.Set;

@Slf4j(topic = “c.Test20”) public class Test20 { public static void main(String[] args) throws InterruptedException { //开启三个居民线程收信 for (int i = 0; i < 3; i++) { new People().start(); }

  1. //开启三个邮递员线程送信
  2. Thread.sleep(1000);
  3. for (Integer id : Mailboxes.getIds()) {
  4. new Postman(id, "内容" + id).start();
  5. }
  6. }

}

//居民,等待从信箱中收信 @Slf4j(topic = “c.People”) class People extends Thread { @Override public void run() { //新建信箱 GuardedObject guardedObject = Mailboxes.createGuardedObject(); log.debug(“开始收信 id:{}”, guardedObject.getId()); Object mail = guardedObject.get(5000); log.debug(“收到信 id:{}, 内容:{}”, guardedObject.getId(), mail); } }

//邮递员,向邮箱中送信 @Slf4j(topic = “c.Postman”) class Postman extends Thread { private int id; private String mail;

  1. public Postman(int id, String mail) {
  2. this.id = id;
  3. this.mail = mail;
  4. }
  5. @Override
  6. public void run() {
  7. GuardedObject guardedObject = Mailboxes.getGuardedObject(id);
  8. log.debug("送信 id:{}, 内容:{}", id, mail);
  9. guardedObject.complete(mail);
  10. }

}

//邮箱,管理消息中间类GuardedObject class Mailboxes { //Hashtable线程安全 private static Map boxes = new Hashtable<>();

  1. private static int id = 1;
  2. //产生唯一 id
  3. private static synchronized int generateId() {
  4. return id++;
  5. }
  6. public static GuardedObject getGuardedObject(int id) {
  7. return boxes.remove(id);
  8. }
  9. public static GuardedObject createGuardedObject() {
  10. GuardedObject go = new GuardedObject(generateId());
  11. boxes.put(go.getId(), go);
  12. return go;
  13. }
  14. public static Set<Integer> getIds() {
  15. return boxes.keySet();
  16. }

}

//消息中间类 class GuardedObject {

  1. //标识Guarded Object
  2. private int id;
  3. public GuardedObject(int id) {
  4. this.id = id;
  5. }
  6. public int getId() {
  7. return id;
  8. }
  9. //结果
  10. private Object response;
  11. //获取结果,需要避免虚假唤醒
  12. //timeout - 最长等待时间
  13. public synchronized Object get(long timeout) {
  14. //开始时间
  15. long begin = System.currentTimeMillis();
  16. //经历的时间
  17. long passedTime = 0;
  18. while (response == null) {
  19. //这一轮循环应该等待的时间
  20. long waitTime = timeout - passedTime;
  21. //经历的时间超过了最大等待时间时,退出循环
  22. if (timeout - passedTime <= 0) {
  23. break;
  24. }
  25. try {
  26. this.wait(waitTime); // 虚假唤醒 15:00:01
  27. } catch (InterruptedException e) {
  28. e.printStackTrace();
  29. }
  30. //求得经历时间
  31. passedTime = System.currentTimeMillis() - begin; // 15:00:02 1s
  32. }
  33. return response;
  34. }
  35. //产生结果
  36. public synchronized void complete(Object response) {
  37. //给结果成员变量赋值
  38. this.response = response;
  39. this.notifyAll();
  40. }

}

  1. - 上述代码输出
  2. ```java
  3. 13:50:44 [Thread-0] c.People - 开始收信 id:2
  4. 13:50:44 [Thread-2] c.People - 开始收信 id:1
  5. 13:50:44 [Thread-1] c.People - 开始收信 id:3
  6. 13:50:45 [Thread-4] c.Postman - 送信 id:2, 内容:内容2
  7. 13:50:45 [Thread-0] c.People - 收到信 id:2, 内容:内容2
  8. 13:50:45 [Thread-3] c.Postman - 送信 id:3, 内容:内容3
  9. 13:50:45 [Thread-5] c.Postman - 送信 id:1, 内容:内容1
  10. 13:50:45 [Thread-2] c.People - 收到信 id:1, 内容:内容1
  11. 13:50:45 [Thread-1] c.People - 收到信 id:3, 内容:内容3

2 顺序控制

  • 顺序控制概述
    • 顺序控制即控制线程的执行顺序,如指定线程12的执行顺序为12
    • 严格来说顺序控制不算是一种模式,但是顺序控制的实现也具有一定通用性

1 固定运行顺序

  • 需求:线程2先执行,线程1后执行

    1 wait() + notify()实现

    1. public class Concurrent {
    2. static final Object obj = new Object();
    3. //t2运行标记,代表t2是否执行过
    4. static boolean t2runed = false;
    5. public static void main(String[] args) {
    6. Thread t1 = new Thread(() -> {
    7. synchronized (obj) {
    8. //如果t2没有执行过
    9. while (!t2runed) {
    10. try {
    11. //t1先等一会
    12. obj.wait();
    13. } catch (InterruptedException e) {
    14. e.printStackTrace();
    15. }
    16. }
    17. }
    18. System.out.println(1);
    19. });
    20. Thread t2 = new Thread(() -> {
    21. System.out.println(2);
    22. synchronized (obj) {
    23. //修改运行标记
    24. t2runed = true;
    25. //通知obj上等待的线程(可能有多个,因此需要用 notifyAll)
    26. obj.notifyAll();
    27. }
    28. });
    29. t1.start();
    30. t2.start();
    31. }
    32. }

    可以看到,使用wait()+notify()实现很麻烦:

  1. 需要保证先wait()再notify(),否则wait()线程永远得不到唤醒。因此使用了运行标记作为条件来判断该不该wait
  2. 如果有些干扰线程错误地notify()了wait()线程,条件不满足时还要重新等待,因此需要使用while循环来解决此问题
  3. 最后,唤醒对象上的wait()线程需要使用notifyAll(),因为同步对象上的等待线程可能不止一个

2 park() + unpark()实现

  1. Thread t1 = new Thread(() -> {
  2. try {
  3. Thread.sleep(1000);
  4. } catch (InterruptedException e) {
  5. e.printStackTrace();
  6. }
  7. //当没有『许可』时,当前线程暂停运行;有『许可』时,用掉这个『许可』,当前线程恢复运行
  8. LockSupport.park();
  9. System.out.println("1");
  10. });
  11. Thread t2 = new Thread(() -> {
  12. System.out.println("2");
  13. //给线程t1发放『许可』
  14. LockSupport.unpark(t1);
  15. });
  16. t1.start();
  17. t2.start();
  • 相比于wait()和notify(),park()和unpark()方法比较灵活,他俩谁先调用,谁后调用无所谓。并且是以线程为单位进行暂停和恢复,不需要同步锁和运行标记

2 交替输出

  • 需求:线程1输出a5次,线程2输出b5次,线程3输出c5次。现在要求输出abcabcabcabcabc怎么实现

    1 wait() + notify()实现

    ```java public class Concurrent { public static void main(String[] args) {
    1. SyncWaitNotify syncWaitNotify = new SyncWaitNotify(1, 5);
    2. new Thread(() -> {
    3. syncWaitNotify.print(1, 2, "a");
    4. }).start();
    5. new Thread(() -> {
    6. syncWaitNotify.print(2, 3, "b");
    7. }).start();
    8. new Thread(() -> {
    9. syncWaitNotify.print(3, 1, "c");
    10. }).start();
    } }

class SyncWaitNotify { //运行标记,指示可以运行的线程 private int flag; //指定循环次数,即每个线程需要打印的字符数量 private final int loopNumber;

  1. public SyncWaitNotify(int flag, int loopNumber) {
  2. this.flag = flag;
  3. this.loopNumber = loopNumber;
  4. }
  5. public void print(int waitFlag, int nextFlag, String str) {
  6. for (int i = 0; i < loopNumber; i++) {
  7. synchronized (this) {
  8. //如果当前不该当前线程输出,则等待
  9. while (this.flag != waitFlag) {
  10. try {
  11. this.wait();
  12. } catch (InterruptedException e) {
  13. e.printStackTrace();
  14. }
  15. }
  16. System.out.print(str);
  17. flag = nextFlag;
  18. this.notifyAll();
  19. }
  20. }
  21. }

}

  1. <a name="mJaQV"></a>
  2. #### 2 ReentrantLock实现
  3. ```java
  4. public class Concurrent {
  5. public static void main(String[] args) {
  6. AwaitSignal as = new AwaitSignal(5);
  7. Condition aWaitSet = as.newCondition();
  8. Condition bWaitSet = as.newCondition();
  9. Condition cWaitSet = as.newCondition();
  10. new Thread(() -> {
  11. as.print("a", aWaitSet, bWaitSet);
  12. }).start();
  13. new Thread(() -> {
  14. as.print("b", bWaitSet, cWaitSet);
  15. }).start();
  16. new Thread(() -> {
  17. as.print("c", cWaitSet, aWaitSet);
  18. }).start();
  19. as.start(aWaitSet);
  20. }
  21. }
  22. @Slf4j()
  23. class AwaitSignal extends ReentrantLock {
  24. //循环次数
  25. private final int loopNumber;
  26. public AwaitSignal(int loopNumber) {
  27. this.loopNumber = loopNumber;
  28. }
  29. public void start(Condition first) {
  30. this.lock();
  31. try {
  32. log.debug("start");
  33. first.signal();
  34. } finally {
  35. this.unlock();
  36. }
  37. }
  38. public void print(String str, Condition current, Condition next) {
  39. for (int i = 0; i < loopNumber; i++) {
  40. this.lock();
  41. try {
  42. current.await();
  43. log.debug(str);
  44. next.signal();
  45. } catch (InterruptedException e) {
  46. e.printStackTrace();
  47. } finally {
  48. this.unlock();
  49. }
  50. }
  51. }
  52. }

3 park() + unpark()实现

  1. public class Concurrent {
  2. public static void main(String[] args) {
  3. SyncPark syncPark = new SyncPark(5);
  4. Thread t1 = new Thread(() -> {
  5. syncPark.print("a");
  6. });
  7. Thread t2 = new Thread(() -> {
  8. syncPark.print("b");
  9. });
  10. Thread t3 = new Thread(() -> {
  11. syncPark.print("c\n");
  12. });
  13. syncPark.setThreads(t1, t2, t3);
  14. syncPark.start();
  15. }
  16. }
  17. class SyncPark {
  18. private final int loopNumber;
  19. private Thread[] threads;
  20. public SyncPark(int loopNumber) {
  21. this.loopNumber = loopNumber;
  22. }
  23. public void setThreads(Thread... threads) {
  24. this.threads = threads;
  25. }
  26. public void print(String str) {
  27. for (int i = 0; i < loopNumber; i++) {
  28. LockSupport.park();
  29. System.out.print(str);
  30. LockSupport.unpark(nextThread());
  31. }
  32. }
  33. private Thread nextThread() {
  34. Thread current = Thread.currentThread();
  35. int index = 0;
  36. for (int i = 0; i < threads.length; i++) {
  37. if (threads[i] == current) {
  38. index = i;
  39. break;
  40. }
  41. }
  42. if (index < threads.length - 1) {
  43. return threads[index + 1];
  44. } else {
  45. return threads[0];
  46. }
  47. }
  48. public void start() {
  49. for (Thread thread : threads) {
  50. thread.start();
  51. }
  52. LockSupport.unpark(threads[0]);
  53. }
  54. }

2 异步模式

1 生产者消费者(阻塞队列)

  • 定义

    • 与前面的保护性暂停中的GuardObject不同,不需要产生结果和消费结果的线程一一对应
    • 消费队列可以用来平衡生产和消费的线程资源
    • 生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据
    • JDK中各种阻塞队列,采用的就是这种模式
    • 消息队列中的数据不会被立刻消费,而且消费者也不需要等待生产者生产数据,因此归类为异步模式
  • 实现过程

image.png

  • t1、t2、t3作为生产者不断产生数据放入消息队列,t4作为消费者不断从消息队列中取出数据
  • 消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据
  • 实例

有三个生产者,每个生产者生产两条消息,有一个消费者,消费者不断消费消息

  1. @Slf4j(topic = "c.Test21")
  2. public class Test21 {
  3. public static void main(String[] args) {
  4. MessageQueue queue = new MessageQueue(4);
  5. //开启三个生产者线程,每个生产者生产两条消息
  6. for (int i = 0; i < 3; i++) {
  7. int t_id = i;
  8. new Thread(() -> {
  9. for (int m_id = 0; m_id < 2; m_id++)
  10. queue.put(new Message(t_id, m_id, "值" + t_id + "." + m_id));
  11. }, "生产者" + t_id).start();
  12. }
  13. //开启一个消费者,消费者每隔1秒钟消费一条消息
  14. new Thread(() -> {
  15. while (true) {
  16. try {
  17. Thread.sleep(1000);
  18. } catch (InterruptedException e) {
  19. e.printStackTrace();
  20. }
  21. Message message = queue.take();
  22. System.out.println(message);
  23. }
  24. }, "消费者").start();
  25. }
  26. }
  27. //消息队列类, 用于线程之间通信
  28. @Slf4j(topic = "c.MessageQueue")
  29. class MessageQueue {
  30. //消息的队列集合
  31. private Deque<Message> list = new LinkedList<>();
  32. //队列容量
  33. private int capcity;
  34. public MessageQueue(int capcity) {
  35. this.capcity = capcity;
  36. }
  37. //获取消息
  38. public synchronized Message take() {
  39. //检查消息队列是否为空
  40. while (list.isEmpty()) {
  41. try {
  42. log.debug("队列为空, 消费者线程等待");
  43. wait();
  44. } catch (InterruptedException e) {
  45. e.printStackTrace();
  46. }
  47. }
  48. //从消息队列头部获取消息并返回
  49. Message message = list.removeFirst();
  50. log.debug("已消费消息 {}", message);
  51. notifyAll();
  52. return message;
  53. }
  54. //存入消息
  55. public synchronized void put(Message message) {
  56. //检查消息队列是否已满
  57. while (list.size() == capcity) {
  58. try {
  59. log.debug("队列已满, 生产者线程等待");
  60. wait();
  61. } catch (InterruptedException e) {
  62. e.printStackTrace();
  63. }
  64. }
  65. //将消息加入队列尾部
  66. list.addLast(message);
  67. log.debug("已生产消息 {}", message);
  68. notifyAll();
  69. }
  70. }
  71. final class Message {
  72. private int t_id; //线程ID
  73. private int m_id; //消息ID
  74. private Object value;
  75. public Message(int t_id, int m_id, Object value) {
  76. this.t_id = t_id;
  77. this.m_id = m_id;
  78. this.value = value;
  79. }
  80. public int getT_id() {
  81. return t_id;
  82. }
  83. public int getM_id() {
  84. return m_id;
  85. }
  86. public Object getValue() {
  87. return value;
  88. }
  89. @Override
  90. public String toString() {
  91. return "Message{" +
  92. "t_id=" + t_id +
  93. ", m_id=" + m_id +
  94. ", value=" + value +
  95. '}';
  96. }
  97. }
  • 上述代码输出
    1. 15:26:36 [生产者0] c.MessageQueue - 已生产消息 Message{t_id=0, m_id=0, value=值0.0}
    2. 15:26:36 [生产者2] c.MessageQueue - 已生产消息 Message{t_id=2, m_id=0, value=值2.0}
    3. 15:26:36 [生产者2] c.MessageQueue - 已生产消息 Message{t_id=2, m_id=1, value=值2.1}
    4. 15:26:36 [生产者1] c.MessageQueue - 已生产消息 Message{t_id=1, m_id=0, value=值1.0}
    5. 15:26:36 [生产者1] c.MessageQueue - 队列已满, 生产者线程等待
    6. 15:26:36 [生产者0] c.MessageQueue - 队列已满, 生产者线程等待
    7. 15:26:37 [消费者] c.MessageQueue - 已消费消息 Message{t_id=0, m_id=0, value=值0.0}
    8. Message{t_id=0, m_id=0, value=值0.0}
    9. 15:26:37 [生产者1] c.MessageQueue - 已生产消息 Message{t_id=1, m_id=1, value=值1.1}
    10. 15:26:37 [生产者0] c.MessageQueue - 队列已满, 生产者线程等待
    11. 15:26:38 [消费者] c.MessageQueue - 已消费消息 Message{t_id=2, m_id=0, value=值2.0}
    12. Message{t_id=2, m_id=0, value=值2.0}
    13. 15:26:38 [生产者0] c.MessageQueue - 已生产消息 Message{t_id=0, m_id=1, value=值0.1}
    14. 15:26:39 [消费者] c.MessageQueue - 已消费消息 Message{t_id=2, m_id=1, value=值2.1}
    15. Message{t_id=2, m_id=1, value=值2.1}
    16. 15:26:40 [消费者] c.MessageQueue - 已消费消息 Message{t_id=1, m_id=0, value=值1.0}
    17. Message{t_id=1, m_id=0, value=值1.0}
    18. 15:26:41 [消费者] c.MessageQueue - 已消费消息 Message{t_id=1, m_id=1, value=值1.1}
    19. Message{t_id=1, m_id=1, value=值1.1}
    20. 15:26:42 [消费者] c.MessageQueue - 已消费消息 Message{t_id=0, m_id=1, value=值0.1}
    21. Message{t_id=0, m_id=1, value=值0.1}
    22. 15:26:43 [消费者] c.MessageQueue - 队列为空, 消费者线程等待

2 工作线程(线程池)

  • 定义
    • 让有限的工作线程(Worker Thread)来轮流异步处理无限多的任务。
    • 工作线程模式的典型实现就是线程池,也体现了经典设计模式中的享元模式
    • 不同任务类型应该使用不同的线程池,这样能够避免饥饿,并能提升效率

例如,如果一个餐馆的员工既要招呼客人(任务类型A),又要到后厨做菜(任务类型B)显然效率不高。将员工分成服务员(线程池A)与厨师(线程池B)更为合理

1 饥饿现象

  • 饥饿原因

固定大小的单一线程池会有饥饿现象,例如

  • 两个工人是同一个线程池中的两个线程
  • 他们要做的事情是:为客人点餐和到后厨做菜,这是两个阶段的工作
  • 客人点餐:必须先点完餐,等菜做好,上菜,在此期间处理点餐的工人必须等待
  • 后厨做菜:客人点菜后开始做菜
  • 比如工人A处理了点餐任务,接下来它要等着工人B把菜做好,然后上菜
  • 但现在同时来了两个客人,这个时候工人A和工人B都去处理点餐了,这时没人做饭了,出现饥饿
  • 解决方案

    • 可以增加线程池的大小,不过不是根本解决方案
    • 彻底解决饥饿的方法是不同的任务类型采用不同的线程池
  • 实例

    1. @Slf4j(topic = "c.TestPool")
    2. public class TestPool {
    3. static final List<String> MENU = Arrays.asList("地三鲜", "宫保鸡丁", "辣子鸡丁", "烤鸡翅");
    4. static Random RANDOM = new Random();
    5. static ExecutorService waiterPool;
    6. static ExecutorService cookPool;
    7. //随机做一道菜
    8. static String cooking() {
    9. return MENU.get(RANDOM.nextInt(MENU.size()));
    10. }
    11. public static void main(String[] args) {
    12. waiterPool = Executors.newFixedThreadPool(1);
    13. cookPool = Executors.newFixedThreadPool(1);
    14. waiterPool.execute(new Order());
    15. waiterPool.execute(new Order());
    16. }
    17. static class Order implements Runnable {
    18. @Override
    19. public void run() {
    20. log.debug("处理点餐...");
    21. Future<String> f = cookPool.submit(() -> {
    22. log.debug("做菜");
    23. return cooking();
    24. });
    25. try {
    26. log.debug("上菜: {}", f.get());
    27. } catch (Exception e) {
    28. e.printStackTrace();
    29. }
    30. }
    31. }
    32. }
    • 如果上述代码中线程池的两个线程在同一个线程池,将导致饥饿
    • 上述代码输出
      1. 16:04:14 [pool-1-thread-1] c.TestPool - 处理点餐...
      2. 16:04:14 [pool-2-thread-1] c.TestPool - 做菜
      3. 16:04:14 [pool-1-thread-1] c.TestPool - 上菜: 辣子鸡丁
      4. 16:04:14 [pool-1-thread-1] c.TestPool - 处理点餐...
      5. 16:04:14 [pool-2-thread-1] c.TestPool - 做菜
      6. 16:04:14 [pool-1-thread-1] c.TestPool - 上菜: 烤鸡翅

2 线程池大小

  • 线程池大小可能导致的问题

    • 线程池过小会导致程序不能充分地利用系统资源、容易导致饥饿
    • 线程池过大会导致占用更多内存
  • 通常根据任务的特性来确定线程池的大小

    • CPU密集型的线程数量不宜太多
    • I/O密集型的线程数量不宜太少


1 CPU密集型运算

  • 线程池大小

线程数 = CPU核数 + 1

  • 原因

这样设定能够实现最优的CPU利用率,+1是保证当线程由于页缺失故障(操作系统)或其它原因导致暂停时,额外的线程能补上去,保证CPU时钟周期不被浪费

2 I/O密集型运算

  • 线程池大小

线程数 = CPU核数 * 期望CPU利用率 * (CPU计算时间 + CPU等待时间) / CPU计算时间

  • 原因

CPU不总是处于繁忙状态,例如,当你执行业务计算时,这时候会使用CPU资源,但当你执行I/O操作时、远程RPC调用时,包括进行数据库操作时,这时候CPU就闲下来了,你可以利用多线程提高它的利用率。