模式简介

保护性暂停模式是提供了一种线程间通信能力的模式,如果有一个线程的执行结果需要传递给另一个线程,就需要使用保护性暂停模式将两条线程关联起来。如下图所示:
image.png
有一个结果需要从一个线程传递到另一个线程,让他们关联同一个 GuardedObject。我们需要注意的是【一个结果】,如果有结果不断从一个线程到另一个线程那么可以使用消息队列,这就是另一种设计模式了。在Java中,join 方法和 Future 的实现就是采用的该设计模式了。

模式的实现

简单实现

  1. @Slf4j(topic = "c.GuardedObject")
  2. public class GuardedObject {
  3. private Object response;
  4. private final Object lock = new Object();
  5. public Object get() {
  6. synchronized (lock) {
  7. // 条件不满足则等待
  8. while (response == null) {
  9. try {
  10. //没有时间限制的wait
  11. lock.wait();
  12. } catch (InterruptedException e) {
  13. e.printStackTrace();
  14. }
  15. }
  16. return response;
  17. }
  18. }
  19. public void complete(Object response) {
  20. synchronized (lock) {
  21. // 条件满足,通知等待线程
  22. this.response = response;
  23. lock.notifyAll();
  24. }
  25. }
  26. public static void main(String[] args) {
  27. GuardedObject guardedObject = new GuardedObject();
  28. //投放 respone 的线程
  29. new Thread(() -> {
  30. try {
  31. log.debug("download ...");
  32. // 模拟线程进行下载资源,需要10s的时间
  33. Thread.sleep(10000);
  34. Object response = new Object();
  35. guardedObject.complete(response);
  36. } catch (InterruptedException e) {
  37. e.printStackTrace();
  38. }
  39. }, "download").start();
  40. log.debug("waiting...");
  41. // 主线程阻塞等待 respone
  42. Object response = guardedObject.get();
  43. log.debug("get response: {}", response);
  44. }
  45. }

上面的代码就是短暂性暂停的简单实现方式,分析一下代码:

  • 该类有两个属性,response 和 lock ,其中 response 就是两个线程通信的传递结果,而 lock 则是为了保证线程安全而加的锁了
  • get 方法就是一个线程在等待一个非空的 response ,如果 respone 是null,则进入无时间限制的等待;而 complete 方法则是由另一个线程产生 response ,并唤醒正在等待这个结果的所有线程。不难看出,这其实就是 wait-notify 机制的一种应用。
  • main线程在无时间限制的等待download线程计算respone结果

运行结果:
image.png
可以看到,main线程确实等待了download线程10s后才拿到结果,可见,这实现了和join方法一样的效果,其实join方法就是采用这种设计模式实现的,下面是join方法的源码:
image.png
至于join方法,到下一个有时间限制的保护性暂停模式再来解读。

超时版实现

join方法有等待时间,join是的设计模式采用的就是保护性暂停模式,因此它也有超时版本的实现,如下代码所示:

  1. @Slf4j(topic = "c.TimeOutGuardedObject")
  2. public class TimeOutGuardedObject {
  3. private Object response;
  4. private final Object lock = new Object();
  5. public Object get(long millis) {
  6. synchronized (lock) {
  7. // 1) 记录最初时间
  8. long begin = System.currentTimeMillis();
  9. // 2) 已经经历的时间
  10. long timePassed = 0;
  11. while (response == null) {
  12. // 4) 假设 millis 是 1000,结果在 400 时唤醒了,那么还有 600 要等
  13. long waitTime = millis - timePassed;
  14. //timePassed>millis,超时等待了,直接break
  15. if (waitTime <= 0) {
  16. break;
  17. }
  18. try {
  19. //等待时长为 waitTime
  20. lock.wait(waitTime);
  21. } catch (InterruptedException e) {
  22. e.printStackTrace();
  23. }
  24. // 3) 如果提前被唤醒,这时已经经历的时间假设为 400
  25. timePassed = System.currentTimeMillis() - begin;
  26. }
  27. return response;
  28. }
  29. }
  30. public void complete(Object response) {
  31. synchronized (lock) {
  32. // 条件满足,通知等待线程
  33. this.response = response;
  34. lock.notifyAll();
  35. }
  36. }
  37. public static void main(String[] args) {
  38. TimeOutGuardedObject v2 = new TimeOutGuardedObject();
  39. new Thread(() -> {
  40. try {
  41. Thread.sleep(1000);
  42. v2.complete(null);
  43. } catch (InterruptedException e) {
  44. e.printStackTrace();
  45. }
  46. }).start();
  47. log.debug("begin");
  48. //最多等待10s
  49. Object response = v2.get(10000);
  50. if (response != null) {
  51. log.debug("get response: {}", response);
  52. } else {
  53. log.debug("can't get response,response is null");
  54. }
  55. }
  56. }

代码分析:

  • get方法里面的wait采用的是有时间限制的wait,但是参数使用的是waitTime=millis - timePassed,而不是使用millis,原因是可能在wait过程中,被虚假唤醒。如果采用millis的话,虚假唤醒之后,又需要等待millis,导致总的等待直接超过了millis

测试结果,当如上述代码一样“v2.complete(null)”,这样,就可以测试超时等待是否真的成功,测试结果如下:
image.png
可以看到确实是只等待10s就不再等待了,然后,把代码换成这样:

  1. Thread.sleep(1000);
  2. Object response = new Object();
  3. v2.complete(response);

测试结果如下:
image.png
可以看到设计是成功的。而有时间限制的join方法就是采用这种设计模式,其源码如下:
image.png
和上面那个代码的区别就是join是等待线程的执行结束,而上面那个代码是等待respone结果。

多任务版本实现

image.png
图中 Futures 就好比居民楼一层的信箱(每个信箱有房间编号),左侧的 t0,t2,t4 就好比等待邮件的居民,右侧的 t1,t3,t5 就好比邮递员。如果需要在多个类之间使用 GuardedObject 对象,作为参数传递不是很方便,因此设计一个用来解耦的中间类, 这样不仅能够解耦【结果等待者】和【结果生产者】,还能够同时支持多个任务的管理。代码实现如下:

  1. @Slf4j(topic = "c.MultitaskGuardedObject")
  2. public class MultitaskGuardedObject {
  3. // 标识 Guarded Object
  4. private final int id;
  5. public MultitaskGuardedObject(int id) {
  6. this.id = id;
  7. }
  8. public int getId() {
  9. return id;
  10. }
  11. private Object response;
  12. public Object get(long millis) {
  13. synchronized (this) {
  14. // 1) 记录最初时间
  15. long begin = System.currentTimeMillis();
  16. // 2) 已经经历的时间
  17. long timePassed = 0;
  18. while (response == null) {
  19. // 4) 假设 millis 是 1000,结果在 400 时唤醒了,那么还有 600 要等
  20. long waitTime = millis - timePassed;
  21. if (waitTime <= 0) {
  22. break;
  23. }
  24. try {
  25. //等待时长为 waitTime
  26. this.wait(waitTime);
  27. } catch (InterruptedException e) {
  28. e.printStackTrace();
  29. }
  30. // 3) 如果提前被唤醒,这时已经经历的时间假设为 400
  31. timePassed = System.currentTimeMillis() - begin;
  32. }
  33. return response;
  34. }
  35. }
  36. // 产生结果
  37. public void complete(Object response) {
  38. synchronized (this) {
  39. // 给结果成员变量赋值
  40. this.response = response;
  41. this.notifyAll();
  42. }
  43. }
  44. public static void main(String[] args) throws InterruptedException {
  45. for (int i = 0; i < 3; i++) {
  46. new People().start();
  47. }
  48. Thread.sleep(1000);
  49. for (Integer id : Mailboxes.getIds()) {
  50. new Postman(id, "Mail的内容:Dear " + id + ",hello").start();
  51. }
  52. }
  53. }
  54. class Mailboxes {
  55. private static final Map<Integer, MultitaskGuardedObject> BOXES = new Hashtable<>();
  56. private static int id = 1;
  57. private static synchronized int generateId() {
  58. return id++;
  59. }
  60. public static MultitaskGuardedObject getGuardedObject(int id) {
  61. return BOXES.remove(id);
  62. }
  63. public static MultitaskGuardedObject createGuardedObject() {
  64. MultitaskGuardedObject go = new MultitaskGuardedObject(generateId());
  65. BOXES.put(go.getId(), go);
  66. return go;
  67. }
  68. public static Set<Integer> getIds() {
  69. return BOXES.keySet();
  70. }
  71. }
  72. @Slf4j(topic = "c.People")
  73. class People extends Thread {
  74. @Override
  75. public void run() {
  76. // 收信
  77. MultitaskGuardedObject guardedObject = Mailboxes.createGuardedObject();
  78. log.debug("开始收信 id:{}", guardedObject.getId());
  79. Object mail = guardedObject.get(5000);
  80. log.debug("收到信 id:{}, 内容:{}", guardedObject.getId(), mail);
  81. }
  82. }
  83. @Slf4j(topic = "c.Postman")
  84. class Postman extends Thread {
  85. private final int id;
  86. private final String mail;
  87. public Postman(int id, String mail) {
  88. this.id = id;
  89. this.mail = mail;
  90. }
  91. @Override
  92. public void run() {
  93. MultitaskGuardedObject guardedObject = Mailboxes.getGuardedObject(id);
  94. log.debug("送信 id:{}, 内容:{}", id, mail);
  95. guardedObject.complete(mail);
  96. }
  97. }

测试结果:
image.png