模式简介
保护性暂停模式是提供了一种线程间通信能力的模式,如果有一个线程的执行结果需要传递给另一个线程,就需要使用保护性暂停模式将两条线程关联起来。如下图所示:
有一个结果需要从一个线程传递到另一个线程,让他们关联同一个 GuardedObject。我们需要注意的是【一个结果】,如果有结果不断从一个线程到另一个线程那么可以使用消息队列,这就是另一种设计模式了。在Java中,join 方法和 Future 的实现就是采用的该设计模式了。
模式的实现
简单实现
@Slf4j(topic = "c.GuardedObject")public class GuardedObject {private Object response;private final Object lock = new Object();public Object get() {synchronized (lock) {// 条件不满足则等待while (response == null) {try {//没有时间限制的waitlock.wait();} catch (InterruptedException e) {e.printStackTrace();}}return response;}}public void complete(Object response) {synchronized (lock) {// 条件满足,通知等待线程this.response = response;lock.notifyAll();}}public static void main(String[] args) {GuardedObject guardedObject = new GuardedObject();//投放 respone 的线程new Thread(() -> {try {log.debug("download ...");// 模拟线程进行下载资源,需要10s的时间Thread.sleep(10000);Object response = new Object();guardedObject.complete(response);} catch (InterruptedException e) {e.printStackTrace();}}, "download").start();log.debug("waiting...");// 主线程阻塞等待 responeObject response = guardedObject.get();log.debug("get response: {}", response);}}
上面的代码就是短暂性暂停的简单实现方式,分析一下代码:
- 该类有两个属性,response 和 lock ,其中 response 就是两个线程通信的传递结果,而 lock 则是为了保证线程安全而加的锁了
- get 方法就是一个线程在等待一个非空的 response ,如果 respone 是null,则进入无时间限制的等待;而 complete 方法则是由另一个线程产生 response ,并唤醒正在等待这个结果的所有线程。不难看出,这其实就是 wait-notify 机制的一种应用。
- main线程在无时间限制的等待download线程计算respone结果
运行结果:
可以看到,main线程确实等待了download线程10s后才拿到结果,可见,这实现了和join方法一样的效果,其实join方法就是采用这种设计模式实现的,下面是join方法的源码:
至于join方法,到下一个有时间限制的保护性暂停模式再来解读。
超时版实现
join方法有等待时间,join是的设计模式采用的就是保护性暂停模式,因此它也有超时版本的实现,如下代码所示:
@Slf4j(topic = "c.TimeOutGuardedObject")public class TimeOutGuardedObject {private Object response;private final Object lock = new Object();public Object get(long millis) {synchronized (lock) {// 1) 记录最初时间long begin = System.currentTimeMillis();// 2) 已经经历的时间long timePassed = 0;while (response == null) {// 4) 假设 millis 是 1000,结果在 400 时唤醒了,那么还有 600 要等long waitTime = millis - timePassed;//timePassed>millis,超时等待了,直接breakif (waitTime <= 0) {break;}try {//等待时长为 waitTimelock.wait(waitTime);} catch (InterruptedException e) {e.printStackTrace();}// 3) 如果提前被唤醒,这时已经经历的时间假设为 400timePassed = System.currentTimeMillis() - begin;}return response;}}public void complete(Object response) {synchronized (lock) {// 条件满足,通知等待线程this.response = response;lock.notifyAll();}}public static void main(String[] args) {TimeOutGuardedObject v2 = new TimeOutGuardedObject();new Thread(() -> {try {Thread.sleep(1000);v2.complete(null);} catch (InterruptedException e) {e.printStackTrace();}}).start();log.debug("begin");//最多等待10sObject response = v2.get(10000);if (response != null) {log.debug("get response: {}", response);} else {log.debug("can't get response,response is null");}}}
代码分析:
- get方法里面的wait采用的是有时间限制的wait,但是参数使用的是waitTime=millis - timePassed,而不是使用millis,原因是可能在wait过程中,被虚假唤醒。如果采用millis的话,虚假唤醒之后,又需要等待millis,导致总的等待直接超过了millis
测试结果,当如上述代码一样“v2.complete(null)”,这样,就可以测试超时等待是否真的成功,测试结果如下:
可以看到确实是只等待10s就不再等待了,然后,把代码换成这样:
Thread.sleep(1000);Object response = new Object();v2.complete(response);
测试结果如下:
可以看到设计是成功的。而有时间限制的join方法就是采用这种设计模式,其源码如下:
和上面那个代码的区别就是join是等待线程的执行结束,而上面那个代码是等待respone结果。
多任务版本实现

图中 Futures 就好比居民楼一层的信箱(每个信箱有房间编号),左侧的 t0,t2,t4 就好比等待邮件的居民,右侧的 t1,t3,t5 就好比邮递员。如果需要在多个类之间使用 GuardedObject 对象,作为参数传递不是很方便,因此设计一个用来解耦的中间类, 这样不仅能够解耦【结果等待者】和【结果生产者】,还能够同时支持多个任务的管理。代码实现如下:
@Slf4j(topic = "c.MultitaskGuardedObject")public class MultitaskGuardedObject {// 标识 Guarded Objectprivate final int id;public MultitaskGuardedObject(int id) {this.id = id;}public int getId() {return id;}private Object response;public Object get(long millis) {synchronized (this) {// 1) 记录最初时间long begin = System.currentTimeMillis();// 2) 已经经历的时间long timePassed = 0;while (response == null) {// 4) 假设 millis 是 1000,结果在 400 时唤醒了,那么还有 600 要等long waitTime = millis - timePassed;if (waitTime <= 0) {break;}try {//等待时长为 waitTimethis.wait(waitTime);} catch (InterruptedException e) {e.printStackTrace();}// 3) 如果提前被唤醒,这时已经经历的时间假设为 400timePassed = System.currentTimeMillis() - begin;}return response;}}// 产生结果public void complete(Object response) {synchronized (this) {// 给结果成员变量赋值this.response = response;this.notifyAll();}}public static void main(String[] args) throws InterruptedException {for (int i = 0; i < 3; i++) {new People().start();}Thread.sleep(1000);for (Integer id : Mailboxes.getIds()) {new Postman(id, "Mail的内容:Dear " + id + ",hello").start();}}}class Mailboxes {private static final Map<Integer, MultitaskGuardedObject> BOXES = new Hashtable<>();private static int id = 1;private static synchronized int generateId() {return id++;}public static MultitaskGuardedObject getGuardedObject(int id) {return BOXES.remove(id);}public static MultitaskGuardedObject createGuardedObject() {MultitaskGuardedObject go = new MultitaskGuardedObject(generateId());BOXES.put(go.getId(), go);return go;}public static Set<Integer> getIds() {return BOXES.keySet();}}@Slf4j(topic = "c.People")class People extends Thread {@Overridepublic void run() {// 收信MultitaskGuardedObject guardedObject = Mailboxes.createGuardedObject();log.debug("开始收信 id:{}", guardedObject.getId());Object mail = guardedObject.get(5000);log.debug("收到信 id:{}, 内容:{}", guardedObject.getId(), mail);}}@Slf4j(topic = "c.Postman")class Postman extends Thread {private final int id;private final String mail;public Postman(int id, String mail) {this.id = id;this.mail = mail;}@Overridepublic void run() {MultitaskGuardedObject guardedObject = Mailboxes.getGuardedObject(id);log.debug("送信 id:{}, 内容:{}", id, mail);guardedObject.complete(mail);}}
测试结果:
