终止模式之两阶段终止模式
Two Phase Termination
在一个线程 T1 中如何“优雅”终止线程 T2?这里的【优雅】指的是给 T2 一个料理后事的机会。
1. 错误思路
- 使用线程对象的 stop() 方法停止线程(这个方法也被废弃了)
- stop 方法会真正杀死线程,如果这时线程锁住了共享资源,那么当它被杀死后就再也没有机会释放锁,
其它线程将永远无法获取锁
- stop 方法会真正杀死线程,如果这时线程锁住了共享资源,那么当它被杀死后就再也没有机会释放锁,
使用 System.exit(int) 方法停止线程
-
2. 两阶段终止模式
2.1 利用isInterrupted
interrupt 可以打断正在执行的线程,无论这个线程是在 sleep,wait,还是正常运行
class TPTInterrupt {private Thread thread;public void start(){thread = new Thread(() -> {while(true) {Thread current = Thread.currentThread();if(current.isInterrupted()) {log.debug("料理后事");break;}try {Thread.sleep(1000); // 降低占用CPU的能力log.debug("将结果保存");} catch (InterruptedException e) {current.interrupt(); // 如果是在休眠处打断,则信号会被取消,因此需要重新添加上。}// 执行监控操作}},"监控线程");thread.start();}public void stop() {thread.interrupt();}}
调用 ```java TPTInterrupt t = new TPTInterrupt(); t.start();
-
Thread.sleep(3500); log.debug(“stop”); t.stop();
结果```java11:49:42.915 c.TwoPhaseTermination [监控线程] - 将结果保存11:49:43.919 c.TwoPhaseTermination [监控线程] - 将结果保存11:49:44.919 c.TwoPhaseTermination [监控线程] - 将结果保存11:49:45.413 c.TestTwoPhaseTermination [main] - stop11:49:45.413 c.TwoPhaseTermination [监控线程] - 料理后事
2.2 利用停止标记
实话说:感觉这个和上面的没多大区别,只是两种不同的写法!
如果不设置volatile,有可能你第二个线程设置为true了;但是这里读到的还是false
// 停止标记用 volatile 是为了保证该变量在多个线程之间的可见性// 我们的例子中,即主线程把它修改为 true 对 t1 线程可见class TPTVolatile {private Thread thread;private volatile boolean stop = false;public void start(){thread = new Thread(() -> {while(true) {Thread current = Thread.currentThread();if(stop) {log.debug("料理后事");break;}try {Thread.sleep(1000);log.debug("将结果保存");} catch (InterruptedException e) {}// 执行监控操作}},"监控线程");thread.start();}public void stop() {stop = true; // 是只有这句话,有缺陷,比如在上面休眠处这个线程修改了,上面线程不会立刻结束thread.interrupt(); // 因此,还可以配合interrupt使用,即使是在上面休眠处更改,也会立刻结束}}
调用
TPTVolatile t = new TPTVolatile();t.start();Thread.sleep(3500);log.debug("stop");t.stop();
结果
11:54:52.003 c.TPTVolatile [监控线程] - 将结果保存11:54:53.006 c.TPTVolatile [监控线程] - 将结果保存11:54:54.007 c.TPTVolatile [监控线程] - 将结果保存11:54:54.502 c.TestTwoPhaseTermination [main] - stop11:54:54.502 c.TPTVolatile [监控线程] - 料理后事
案例:JVM内存监控
同步模式之保护性暂停
1. 定义
即 Guarded Suspension(作为两个线程的桥梁),用在一个线程等待另一个线程的执行结果
要点
- 有一个结果需要从一个线程传递到另一个线程,让他们关联同一个 GuardedObject
- 如果有结果不断从一个线程到另一个线程那么可以使用消息队列(见生产者/消费者)
- JDK 中,join 的实现、Future 的实现,采用的就是此模式
- 因为要等待另一方的结果,因此归类到同步模式
2. 实现
class GuardedObject {private Object response;private final Object lock = new Object();public Object get() {synchronized (lock) {// 条件不满足则等待while (response == null) { // while就是前面说的,避免虚假唤醒try {lock.wait();} catch (InterruptedException e) {e.printStackTrace();}}return response;}}public void complete(Object response) {synchronized (lock) {// 条件满足,通知等待线程this.response = response;lock.notifyAll();}}}
3. 应用
一个线程等待另一个线程的执行结果
public static void main(String[] args) {GuardedObject guardedObject = new GuardedObject();new Thread(() -> {try {// 子线程执行下载List<String> response = download();log.debug("download complete...");guardedObject.complete(response);} catch (IOException e) {e.printStackTrace();}}).start();log.debug("waiting...");// 主线程阻塞等待Object response = guardedObject.get();log.debug("get response: [{}] lines", ((List<String>) response).size());}
执行结果
08:42:18.568 [main] c.TestGuardedObject - waiting...08:42:23.312 [Thread-0] c.TestGuardedObject - download complete...08:42:23.312 [main] c.TestGuardedObject - get response: [3] lines
这个模式你也可以用join方法;但是用这个模式有两点好处: 1.join必须是一个线程结束,而这个可以在这个线程完成事情后就调用另外一个线程,无需结束,还可以再做一些其它事情。 2.join的话公共需要的资源必须是全局变量,而这里的可以设置为局部变量
3. 带超时版 GuardedOobject
上面是等这个资源准备好,第二个线程执行 这个是第二个线程只等一定时间,等不着算了
如果要控制超时时间呢
class GuardedObjectV2 {private Object response;private final Object lock = new Object();// 如果是直接在get方法加时间参数 wait改为有时间的 是不行的,因为还会到wile循环 因此看下面,要有记录经历的时间(然后break)public Object get(long millis) {synchronized (lock) {// 1) 记录最初时间long begin = System.currentTimeMillis();// 2) 已经经历的时间long timePassed = 0;while (response == null) {// 4) 假设 millis 是 1000,结果在 400 时唤醒了,那么还有 600 要等long waitTime = millis - timePassed;log.debug("waitTime: {}", waitTime);if (waitTime <= 0) {log.debug("break...");break;}try {lock.wait(waitTime);} catch (InterruptedException e) {e.printStackTrace();}// 3) 如果提前被唤醒,这时已经经历的时间假设为 400timePassed = System.currentTimeMillis() - begin;log.debug("timePassed: {}, object is null {}",timePassed, response == null);}return response;}}public void complete(Object response) {synchronized (lock) {// 条件满足,通知等待线程this.response = response;log.debug("notify...");lock.notifyAll();}}}
测试,没有超时
public static void main(String[] args) {GuardedObjectV2 v2 = new GuardedObjectV2();new Thread(() -> {sleep(1);v2.complete(null);sleep(1);v2.complete(Arrays.asList("a", "b", "c"));}).start();Object response = v2.get(2500);if (response != null) {log.debug("get response: [{}] lines", ((List<String>) response).size());} else {log.debug("can't get response");}}
输出
08:49:39.917 [main] c.GuardedObjectV2 - waitTime: 250008:49:40.917 [Thread-0] c.GuardedObjectV2 - notify...08:49:40.917 [main] c.GuardedObjectV2 - timePassed: 1003, object is null true08:49:40.917 [main] c.GuardedObjectV2 - waitTime: 149708:49:41.918 [Thread-0] c.GuardedObjectV2 - notify...08:49:41.918 [main] c.GuardedObjectV2 - timePassed: 2004, object is null false08:49:41.918 [main] c.TestGuardedObjectV2 - get response: [3] lines
测试,超时
// 等待时间不足List<String> lines = v2.get(1500);
输出
08:47:54.963 [main] c.GuardedObjectV2 - waitTime: 150008:47:55.963 [Thread-0] c.GuardedObjectV2 - notify...08:47:55.963 [main] c.GuardedObjectV2 - timePassed: 1002, object is null true08:47:55.963 [main] c.GuardedObjectV2 - waitTime: 49808:47:56.461 [main] c.GuardedObjectV2 - timePassed: 1500, object is null true08:47:56.461 [main] c.GuardedObjectV2 - waitTime: 008:47:56.461 [main] c.GuardedObjectV2 - break...08:47:56.461 [main] c.TestGuardedObjectV2 - can't get response08:47:56.963 [Thread-0] c.GuardedObjectV2 - notify...
原理之join
4. 多任务版 GuardedObject(这个暂时不理解)
图中 Futures 就好比居民楼一层的信箱(每个信箱有房间编号),左侧的 t0,t2,t4 就好比等待邮件的居民,右侧的 t1,t3,t5 就好比邮递员
如果需要在多个类之间使用 GuardedObject 对象,作为参数传递不是很方便,因此设计一个用来解耦的中间类,这样不仅能够解耦【结果等待者】和【结果生产者】,还能够同时支持多个任务的管理
新增 id 用来标识 Guarded Object
class GuardedObject {// 标识 Guarded Objectprivate int id;public GuardedObject(int id) {this.id = id;}public int getId() {return id;}// 结果private Object response;// 获取结果// timeout 表示要等待多久 2000public Object get(long timeout) {synchronized (this) {// 开始时间 15:00:00long begin = System.currentTimeMillis();// 经历的时间long passedTime = 0;while (response == null) {// 这一轮循环应该等待的时间long waitTime = timeout - passedTime;// 经历的时间超过了最大等待时间时,退出循环if (timeout - passedTime <= 0) {break;}try {this.wait(waitTime); // 虚假唤醒 15:00:01} catch (InterruptedException e) {e.printStackTrace();}// 求得经历时间passedTime = System.currentTimeMillis() - begin; // 15:00:02 1s}return response;}}// 产生结果public void complete(Object response) {synchronized (this) {// 给结果成员变量赋值this.response = response;this.notifyAll();}}}
中间解耦类
class Mailboxes {private static Map<Integer, GuardedObject> boxes = new Hashtable<>();private static int id = 1;// 产生唯一 idprivate static synchronized int generateId() {return id++;}public static GuardedObject getGuardedObject(int id) {return boxes.remove(id);}public static GuardedObject createGuardedObject() {GuardedObject go = new GuardedObject(generateId());boxes.put(go.getId(), go);return go;}public static Set<Integer> getIds() {return boxes.keySet();}}
业务相关类
class People extends Thread{@Overridepublic void run() {// 收信GuardedObject guardedObject = Mailboxes.createGuardedObject();log.debug("开始收信 id:{}", guardedObject.getId());Object mail = guardedObject.get(5000);log.debug("收到信 id:{}, 内容:{}", guardedObject.getId(), mail);}}
class Postman extends Thread {private int id;private String mail;public Postman(int id, String mail) {this.id = id;this.mail = mail;}@Overridepublic void run() {GuardedObject guardedObject = Mailboxes.getGuardedObject(id);log.debug("送信 id:{}, 内容:{}", id, mail);guardedObject.complete(mail);}}
测试
public static void main(String[] args) throws InterruptedException {for (int i = 0; i < 3; i++) {new People().start();}Sleeper.sleep(1);for (Integer id : Mailboxes.getIds()) {new Postman(id, "内容" + id).start();}}
某次运行结果
10:35:05.689 c.People [Thread-1] - 开始收信 id:310:35:05.689 c.People [Thread-2] - 开始收信 id:110:35:05.689 c.People [Thread-0] - 开始收信 id:210:35:06.688 c.Postman [Thread-4] - 送信 id:2, 内容:内容210:35:06.688 c.Postman [Thread-5] - 送信 id:1, 内容:内容110:35:06.688 c.People [Thread-0] - 收到信 id:2, 内容:内容210:35:06.688 c.People [Thread-2] - 收到信 id:1, 内容:内容110:35:06.688 c.Postman [Thread-3] - 送信 id:3, 内容:内容310:35:06.689 c.People [Thread-1] - 收到信 id:3, 内容:内容3
异步模式之生产者/消费者
为什么这个归类于异步;前面的保护性暂停是同步? 因为前面的生产者生成后,消费者立刻拿到 这个不是的,生产后,消费者不一定立刻拿到
1. 定义
要点
- 与前面的保护性暂停中的 GuardObject 不同,不需要产生结果和消费结果的线程一一对应
- 消费队列可以用来平衡生产和消费的线程资源
- 生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据
- 消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据
- JDK 中各种阻塞队列,采用的就是这种模式(有利于后面我们学习阻塞队列的原理)
2. 实现
// Message没有set方法,则构造创建后不能修改,就是线程安全的。再加个final,不能有子类,就更安全final class Message {private int id;private Object message; // 这个消息可以只用Object吗?不可以,像这种通信的,ID很重要,用于区分不同消息public Message(int id, Object message) {this.id = id;this.message = message;}public int getId() {return id;}public Object getMessage() {return message;}}// // 消息队列类,java线程间通信;和rabbitmq不一样,那是进程间的通信class MessageQueue {private LinkedList<Message> queue; // 用双向队列比较合适;在java中实现就是LinkedListprivate int capacity; // 队列容量,不能无限放public MessageQueue(int capacity) {this.capacity = capacity;queue = new LinkedList<>();}public Message take() {synchronized (queue) {while (queue.isEmpty()) {log.debug("没货了, wait");try {queue.wait(); // 线程在哪里等待,肯定是一个可以被共享的对象上,那就是这个list} catch (InterruptedException e) {e.printStackTrace();}}Message message = queue.removeFirst(); // 从队列头部获取消息并返回queue.notifyAll();return message;}}// 存消息,能不能用Object,不能啊,这里我们要包装一下。为什么?因为你通信,有一个id很重要,// 这个是一个很好的区分;要不然这些消息都一样public void put(Message message) {synchronized (queue) {while (queue.size() == capacity) {log.debug("库存已达上限, wait");try {queue.wait();} catch (InterruptedException e) {e.printStackTrace();}}queue.addLast(message);queue.notifyAll();}}}
应用
MessageQueue messageQueue = new MessageQueue(2);// 4 个生产者线程, 下载任务for (int i = 0; i < 4; i++) {int id = i;new Thread(() -> {try {log.debug("download...");List<String> response = Downloader.download();log.debug("try put message({})", id);messageQueue.put(new Message(id, response));} catch (IOException e) {e.printStackTrace();}}, "生产者" + i).start();}// 1 个消费者线程, 处理结果new Thread(() -> {while (true) {Message message = messageQueue.take();List<String> response = (List<String>) message.getMessage();log.debug("take message({}): [{}] lines", message.getId(), response.size());}}, "消费者").start();
某次运行结果
10:48:38.070 [生产者3] c.TestProducerConsumer - download...10:48:38.070 [生产者0] c.TestProducerConsumer - download...10:48:38.070 [消费者] c.MessageQueue - 没货了, wait10:48:38.070 [生产者1] c.TestProducerConsumer - download...10:48:38.070 [生产者2] c.TestProducerConsumer - download...10:48:41.236 [生产者1] c.TestProducerConsumer - try put message(1)10:48:41.237 [生产者2] c.TestProducerConsumer - try put message(2)10:48:41.236 [生产者0] c.TestProducerConsumer - try put message(0)10:48:41.237 [生产者3] c.TestProducerConsumer - try put message(3)10:48:41.239 [生产者2] c.MessageQueue - 库存已达上限, wait10:48:41.240 [生产者1] c.MessageQueue - 库存已达上限, wait10:48:41.240 [消费者] c.TestProducerConsumer - take message(0): [3] lines10:48:41.240 [生产者2] c.MessageQueue - 库存已达上限, wait10:48:41.240 [消费者] c.TestProducerConsumer - take message(3): [3] lines10:48:41.240 [消费者] c.TestProducerConsumer - take message(1): [3] lines10:48:41.240 [消费者] c.TestProducerConsumer - take message(2): [3] lines10:48:41.240 [消费者] c.MessageQueue - 没货了, wait
同步模式之顺序控制
学到现在,我们已经回wait notify 和ReentrantLock里面的await和single方法了
这个模式,严格说不算是模式,就是控制线程执行的顺序
协调线程间的执行顺序
1. 固定运行顺序
1.1 wait notify 版
// 用来同步的对象static Object obj = new Object();// t2 运行标记, 代表 t2 是否执行过static boolean t2runed = false;public static void main(String[] args) {Thread t1 = new Thread(() -> {synchronized (obj) {// 如果 t2 没有执行过while (!t2runed) {try {// t1 先等一会obj.wait();} catch (InterruptedException e) {e.printStackTrace();}}}System.out.println(1);});Thread t2 = new Thread(() -> {System.out.println(2);synchronized (obj) {// 修改运行标记t2runed = true;// 通知 obj 上等待的线程(可能有多个,因此需要用 notifyAll)obj.notifyAll();}});t1.start();t2.start();}
1.2 Park Unpark 版
可以看到,实现上很麻烦:
- 首先,需要保证先 wait 再 notify,否则 wait 线程永远得不到唤醒。因此使用了『运行标记』来判断该不该
wait - 第二,如果有些干扰线程错误地 notify 了 wait 线程,条件不满足时还要重新等待,使用了 while 循环来解决
此问题 - 最后,唤醒对象上的 wait 线程需要使用 notifyAll,因为『同步对象』上的等待线程可能不止一个
可以使用 LockSupport 类的 park 和 unpark 来简化上面的题目:
Thread t1 = new Thread(() -> {try { Thread.sleep(1000); } catch (InterruptedException e) { }// 当没有『许可』时,当前线程暂停运行;有『许可』时,用掉这个『许可』,当前线程恢复运行LockSupport.park();System.out.println("1");});Thread t2 = new Thread(() -> {System.out.println("2");// 给线程 t1 发放『许可』(多次连续调用 unpark 只会发放一个『许可』)LockSupport.unpark(t1);});t1.start();t2.start();
park 和 unpark 方法比较灵活,他俩谁先调用,谁后调用无所谓。并且是以线程为单位进行『暂停』和『恢复』,不需要『同步对象』和『运行标记』
2. 交替输出
线程 1 输出 a 5 次,线程 2 输出 b 5 次,线程 3 输出 c 5 次。现在要求输出 abcabcabcabcabc 怎么实现
2.1 wait notify 版
class SyncWaitNotify {private int flag;private int loopNumber;public SyncWaitNotify(int flag, int loopNumber) {this.flag = flag;this.loopNumber = loopNumber;}public void print(int waitFlag, int nextFlag, String str) {for (int i = 0; i < loopNumber; i++) {synchronized (this) {while (this.flag != waitFlag) {try {this.wait();} catch (InterruptedException e) {e.printStackTrace();}}System.out.print(str);flag = nextFlag;this.notifyAll();}}}}
SyncWaitNotify syncWaitNotify = new SyncWaitNotify(1, 5);new Thread(() -> {syncWaitNotify.print(1, 2, "a");}).start();new Thread(() -> {syncWaitNotify.print(2, 3, "b");}).start();new Thread(() -> {syncWaitNotify.print(3, 1, "c");}).start();
2.2 Lock 条件变量版
class AwaitSignal extends ReentrantLock {public void start(Condition first) {this.lock();try {log.debug("start");first.signal();} finally {this.unlock();}}public void print(String str, Condition current, Condition next) {for (int i = 0; i < loopNumber; i++) {this.lock();try {current.await();log.debug(str);next.signal();} catch (InterruptedException e) {e.printStackTrace();} finally {this.unlock();}}}// 循环次数private int loopNumber;public AwaitSignal(int loopNumber) {this.loopNumber = loopNumber;}}
AwaitSignal as = new AwaitSignal(5);Condition aWaitSet = as.newCondition();Condition bWaitSet = as.newCondition();Condition cWaitSet = as.newCondition();new Thread(() -> {as.print("a", aWaitSet, bWaitSet);}).start();new Thread(() -> {as.print("b", bWaitSet, cWaitSet);}).start();new Thread(() -> {as.print("c", cWaitSet, aWaitSet);}).start();as.start(aWaitSet);
注意 该实现没有考虑 a,b,c 线程都就绪再开始
2.3 Park Unpark 版
class SyncPark {private int loopNumber;private Thread[] threads;public SyncPark(int loopNumber) {this.loopNumber = loopNumber;}public void setThreads(Thread... threads) {this.threads = threads;}public void print(String str) {for (int i = 0; i < loopNumber; i++) {LockSupport.park();System.out.print(str);LockSupport.unpark(nextThread());}}private Thread nextThread() {Thread current = Thread.currentThread();int index = 0;for (int i = 0; i < threads.length; i++) {if(threads[i] == current) {index = i;break;}}if(index < threads.length - 1) {return threads[index+1];} else {return threads[0];}}public void start() {for (Thread thread : threads) {thread.start();}LockSupport.unpark(threads[0]);}}
SyncPark syncPark = new SyncPark(5);Thread t1 = new Thread(() -> {syncPark.print("a");});Thread t2 = new Thread(() -> {syncPark.print("b");});Thread t3 = new Thread(() -> {syncPark.print("c\n");});syncPark.setThreads(t1, t2, t3);syncPark.start();
同步模式之Balking
这个模式很简单,对于我们前面的两阶段终止模式中的:守护线程,我们只需要这么做,就可以只启动一次了!!
但是呢?,仅这样是不的,因为如果两个线程都来到了if判断处,那么还是会有两个线程的。因此考虑加锁,如下图,那么用volat可以吗?答案是不可以的,为什么??其实这里就是两个线程都是读写,不是一个(多个)读,另一个写。可以好好体会下synchronized和volatile的区别!
典型应用场景:保证监护线程只启动一次:一般用在web环境下。(前面的两阶段终止中的监控线程就有这个问题,不能保证只调用一次!,即你调用几次方法,就启动几次。)
1. 定义
Balking (犹豫)模式用在一个线程发现另一个线程或本线程已经做了某一件相同的事,那么本线程就无需再做了,直接结束返回
2. 实现
例如:
public class MonitorService {// 用来表示是否已经有线程已经在执行启动了private volatile boolean starting;public void start() {log.info("尝试启动监控线程...");synchronized (this) {if (starting) {return;}starting = true;}// 真正启动监控线程...}}
这个是典型应用一:对于上面的方法你可能说我们只需要调用一次方法不就好了吗?但是对于前段客户来说,它可能会调用多次start。当前端页面多次点击按钮调用 start 时
输出
[http-nio-8080-exec-1] cn.itcast.monitor.service.MonitorService - 该监控线程已启动?(false)[http-nio-8080-exec-1] cn.itcast.monitor.service.MonitorService - 监控线程已启动...[http-nio-8080-exec-2] cn.itcast.monitor.service.MonitorService - 该监控线程已启动?(true)[http-nio-8080-exec-3] cn.itcast.monitor.service.MonitorService - 该监控线程已启动?(true)[http-nio-8080-exec-4] cn.itcast.monitor.service.MonitorService - 该监控线程已启动?(true)
这个是典型应用二:它还经常用来实现线程安全的单例(这个就不看了,也看不太明白,老师说这个也是balking模式的应用)
public final class Singleton {private Singleton() {}private static Singleton INSTANCE = null;public static synchronized Singleton getInstance() {if (INSTANCE != null) {return INSTANCE;}INSTANCE = new Singleton();return INSTANCE;}}
对比一下保护性暂停模式:保护性暂停模式用在一个线程等待另一个线程的执行结果,当条件不满足时线程等待。
