模式简介
保护性暂停模式是提供了一种线程间通信能力的模式,如果有一个线程的执行结果需要传递给另一个线程,就需要使用保护性暂停模式将两条线程关联起来。如下图所示:
有一个结果需要从一个线程传递到另一个线程,让他们关联同一个 GuardedObject。我们需要注意的是【一个结果】,如果有结果不断从一个线程到另一个线程那么可以使用消息队列,这就是另一种设计模式了。在Java中,join 方法和 Future 的实现就是采用的该设计模式了。
模式的实现
简单实现
@Slf4j(topic = "c.GuardedObject")
public class GuardedObject {
private Object response;
private final Object lock = new Object();
public Object get() {
synchronized (lock) {
// 条件不满足则等待
while (response == null) {
try {
//没有时间限制的wait
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return response;
}
}
public void complete(Object response) {
synchronized (lock) {
// 条件满足,通知等待线程
this.response = response;
lock.notifyAll();
}
}
public static void main(String[] args) {
GuardedObject guardedObject = new GuardedObject();
//投放 respone 的线程
new Thread(() -> {
try {
log.debug("download ...");
// 模拟线程进行下载资源,需要10s的时间
Thread.sleep(10000);
Object response = new Object();
guardedObject.complete(response);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "download").start();
log.debug("waiting...");
// 主线程阻塞等待 respone
Object response = guardedObject.get();
log.debug("get response: {}", response);
}
}
上面的代码就是短暂性暂停的简单实现方式,分析一下代码:
- 该类有两个属性,response 和 lock ,其中 response 就是两个线程通信的传递结果,而 lock 则是为了保证线程安全而加的锁了
- get 方法就是一个线程在等待一个非空的 response ,如果 respone 是null,则进入无时间限制的等待;而 complete 方法则是由另一个线程产生 response ,并唤醒正在等待这个结果的所有线程。不难看出,这其实就是 wait-notify 机制的一种应用。
- main线程在无时间限制的等待download线程计算respone结果
运行结果:
可以看到,main线程确实等待了download线程10s后才拿到结果,可见,这实现了和join方法一样的效果,其实join方法就是采用这种设计模式实现的,下面是join方法的源码:
至于join方法,到下一个有时间限制的保护性暂停模式再来解读。
超时版实现
join方法有等待时间,join是的设计模式采用的就是保护性暂停模式,因此它也有超时版本的实现,如下代码所示:
@Slf4j(topic = "c.TimeOutGuardedObject")
public class TimeOutGuardedObject {
private Object response;
private final Object lock = new Object();
public Object get(long millis) {
synchronized (lock) {
// 1) 记录最初时间
long begin = System.currentTimeMillis();
// 2) 已经经历的时间
long timePassed = 0;
while (response == null) {
// 4) 假设 millis 是 1000,结果在 400 时唤醒了,那么还有 600 要等
long waitTime = millis - timePassed;
//timePassed>millis,超时等待了,直接break
if (waitTime <= 0) {
break;
}
try {
//等待时长为 waitTime
lock.wait(waitTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 3) 如果提前被唤醒,这时已经经历的时间假设为 400
timePassed = System.currentTimeMillis() - begin;
}
return response;
}
}
public void complete(Object response) {
synchronized (lock) {
// 条件满足,通知等待线程
this.response = response;
lock.notifyAll();
}
}
public static void main(String[] args) {
TimeOutGuardedObject v2 = new TimeOutGuardedObject();
new Thread(() -> {
try {
Thread.sleep(1000);
v2.complete(null);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
log.debug("begin");
//最多等待10s
Object response = v2.get(10000);
if (response != null) {
log.debug("get response: {}", response);
} else {
log.debug("can't get response,response is null");
}
}
}
代码分析:
- get方法里面的wait采用的是有时间限制的wait,但是参数使用的是waitTime=millis - timePassed,而不是使用millis,原因是可能在wait过程中,被虚假唤醒。如果采用millis的话,虚假唤醒之后,又需要等待millis,导致总的等待直接超过了millis
测试结果,当如上述代码一样“v2.complete(null)”,这样,就可以测试超时等待是否真的成功,测试结果如下:
可以看到确实是只等待10s就不再等待了,然后,把代码换成这样:
Thread.sleep(1000);
Object response = new Object();
v2.complete(response);
测试结果如下:
可以看到设计是成功的。而有时间限制的join方法就是采用这种设计模式,其源码如下:
和上面那个代码的区别就是join是等待线程的执行结束,而上面那个代码是等待respone结果。
多任务版本实现
图中 Futures 就好比居民楼一层的信箱(每个信箱有房间编号),左侧的 t0,t2,t4 就好比等待邮件的居民,右侧的 t1,t3,t5 就好比邮递员。如果需要在多个类之间使用 GuardedObject 对象,作为参数传递不是很方便,因此设计一个用来解耦的中间类, 这样不仅能够解耦【结果等待者】和【结果生产者】,还能够同时支持多个任务的管理。代码实现如下:
@Slf4j(topic = "c.MultitaskGuardedObject")
public class MultitaskGuardedObject {
// 标识 Guarded Object
private final int id;
public MultitaskGuardedObject(int id) {
this.id = id;
}
public int getId() {
return id;
}
private Object response;
public Object get(long millis) {
synchronized (this) {
// 1) 记录最初时间
long begin = System.currentTimeMillis();
// 2) 已经经历的时间
long timePassed = 0;
while (response == null) {
// 4) 假设 millis 是 1000,结果在 400 时唤醒了,那么还有 600 要等
long waitTime = millis - timePassed;
if (waitTime <= 0) {
break;
}
try {
//等待时长为 waitTime
this.wait(waitTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 3) 如果提前被唤醒,这时已经经历的时间假设为 400
timePassed = System.currentTimeMillis() - begin;
}
return response;
}
}
// 产生结果
public void complete(Object response) {
synchronized (this) {
// 给结果成员变量赋值
this.response = response;
this.notifyAll();
}
}
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 3; i++) {
new People().start();
}
Thread.sleep(1000);
for (Integer id : Mailboxes.getIds()) {
new Postman(id, "Mail的内容:Dear " + id + ",hello").start();
}
}
}
class Mailboxes {
private static final Map<Integer, MultitaskGuardedObject> BOXES = new Hashtable<>();
private static int id = 1;
private static synchronized int generateId() {
return id++;
}
public static MultitaskGuardedObject getGuardedObject(int id) {
return BOXES.remove(id);
}
public static MultitaskGuardedObject createGuardedObject() {
MultitaskGuardedObject go = new MultitaskGuardedObject(generateId());
BOXES.put(go.getId(), go);
return go;
}
public static Set<Integer> getIds() {
return BOXES.keySet();
}
}
@Slf4j(topic = "c.People")
class People extends Thread {
@Override
public void run() {
// 收信
MultitaskGuardedObject guardedObject = Mailboxes.createGuardedObject();
log.debug("开始收信 id:{}", guardedObject.getId());
Object mail = guardedObject.get(5000);
log.debug("收到信 id:{}, 内容:{}", guardedObject.getId(), mail);
}
}
@Slf4j(topic = "c.Postman")
class Postman extends Thread {
private final int id;
private final String mail;
public Postman(int id, String mail) {
this.id = id;
this.mail = mail;
}
@Override
public void run() {
MultitaskGuardedObject guardedObject = Mailboxes.getGuardedObject(id);
log.debug("送信 id:{}, 内容:{}", id, mail);
guardedObject.complete(mail);
}
}
测试结果: