两阶段终止模式

当我们在执行线程一时,想要终止线程二,这时就需要使用 interrupt() 来优雅的停止线程二
图片.png

interrupt() 实现

  1. public class Test7 {
  2. public static void main(String[] args) throws InterruptedException {
  3. Monitor monitor = new Monitor();
  4. monitor.start();
  5. Thread.sleep(3500);
  6. monitor.stop();
  7. }
  8. }
  9. class Monitor {
  10. Thread monitor;
  11. // 启动监控器线程
  12. public void start() {
  13. // 设置线控器线程,用于监控线程状态
  14. monitor = new Thread() {
  15. @Override
  16. public void run() {
  17. // 开始不停的监控
  18. while (true) {
  19. // 判断当前线程是否被打断了
  20. if(Thread.currentThread().isInterrupted()) {
  21. System.out.println("处理后续任务");
  22. // 终止线程执行
  23. break;
  24. }
  25. System.out.println("监控器运行中...");
  26. try {
  27. // 线程休眠
  28. Thread.sleep(1000);
  29. } catch (InterruptedException e) {
  30. e.printStackTrace();
  31. // 如果是在休眠的时候被打断,不会将打断标记设置为true,这时要重新设置打断标记
  32. Thread.currentThread().interrupt();
  33. }
  34. }
  35. }
  36. };
  37. monitor.start();
  38. }
  39. // 用于停止监控器线程
  40. public void stop() {
  41. //打断线程
  42. monitor.interrupt();
  43. }
  44. }

volatile 关键字实现

  1. package com;
  2. public class Uninterruptible {
  3. public static void main(String[] args) throws InterruptedException {
  4. Monitor monitor = new Monitor();
  5. monitor.start();
  6. Thread.sleep(3500);
  7. monitor.stop();
  8. }
  9. }
  10. class Monitor {
  11. Thread monitor;
  12. // 设置标记,用于判断是否被终止了
  13. private volatile boolean stop = false;
  14. // 启动监控器线程
  15. public void start() {
  16. //设置线控器线程,用于监控线程状态
  17. monitor = new Thread() {
  18. @Override
  19. public void run() {
  20. // 开始不停的监控
  21. while (true) {
  22. if (stop) {
  23. System.out.println("处理后续任务");
  24. break;
  25. }
  26. System.out.println("监控器运行中...");
  27. try {
  28. // 线程休眠
  29. Thread.sleep(1000);
  30. } catch (InterruptedException e) {
  31. System.out.println("被打断了");
  32. }
  33. }
  34. }
  35. };
  36. monitor.start();
  37. }
  38. // 用于停止监控器线程
  39. public void stop() {
  40. // 打断线程(设置打断标记,在线程sleep时打断抛出异常)
  41. monitor.interrupt();
  42. // 修改标记
  43. stop = true;
  44. }
  45. }

保护性暂停模式

wait()、notify() 的应用
保护性暂停模式的定义即: Guarded Suspension,用一个线程等待另一个线程的执行结果

  • 有一个结果需要从一个线程传递到另一个线程,让他们关联同一个 GuardedObject
  • 如果有结果不断从一个线程到另一个线程那么可以使用消息队列
  • JDK 中,join()、Future 的实现,采用的就是保护性暂停模式
  • 因为要等待另一方的结果,因此归类到同步模式

    实现

    1. class GuardedObject {
    2. private Object response;
    3. private final Object lock = new Object();
    4. public Object get() {
    5. synchronized (lock) {
    6. // 条件不满足则等待,避免虚假唤醒
    7. while (response == null) {
    8. try {
    9. lock.wait();
    10. } catch (InterruptedException e) {
    11. e.printStackTrace();
    12. }
    13. }
    14. return response;
    15. }
    16. }
    17. public void complete(Object response) {
    18. synchronized (lock) {
    19. // 条件满足,通知等待线程
    20. this.response = response;
    21. lock.notifyAll();
    22. }
    23. }
    24. }

    带超时判断的实现

    1. class GuardedObjectV2 {
    2. private Object response;
    3. private final Object lock = new Object();
    4. public Object get(long millis) {
    5. synchronized (lock) {
    6. // 开始时间
    7. long begin = System.currentTimeMillis();
    8. // 经历的时间
    9. long timePassed = 0;
    10. while (response == null) {
    11. // 这一轮循环应该等待的时间
    12. long waitTime = millis - timePassed;
    13. // 经历的时间超过了最大等待时间时,退出循环
    14. if (waitTime <= 0) {
    15. break;
    16. }
    17. try {
    18. lock.wait(waitTime);
    19. } catch (InterruptedException e) {
    20. e.printStackTrace();
    21. }
    22. timePassed = System.currentTimeMillis() - begin;
    23. }
    24. return response;
    25. }
    26. }
    27. public void complete(Object response) {
    28. synchronized (lock) {
    29. this.response = response;
    30. lock.notifyAll();
    31. }
    32. }
    33. }

    多任务版实现

    如果需要在多个类之间使用 GuardedObject 对象,作为参数传递很不方便,因此设计一个用来解耦的中间类, 这样不仅能够解耦【结果等待者】和【结果生产者】,还能够同时支持多个任务的管理

    1. class GuardedObject {
    2. // 新增 id 用来标识 GuardedObject
    3. private int id;
    4. public GuardedObject(int id) {
    5. this.id = id;
    6. }
    7. public int getId() {
    8. return id;
    9. }
    10. // 结果
    11. private Object response;
    12. // 获取结果。timeout 表示要等待多久
    13. public Object get(long millis) {
    14. synchronized (this) {
    15. long begin = System.currentTimeMillis();
    16. long timePassed = 0;
    17. while (response == null) {
    18. long waitTime = millis - timePassed;
    19. if (waitTime <= 0) {
    20. break;
    21. }
    22. try {
    23. this.wait(waitTime);
    24. } catch (InterruptedException e) {
    25. e.printStackTrace();
    26. }
    27. timePassed = System.currentTimeMillis() - begin;
    28. }
    29. return response;
    30. }
    31. }
    32. // 产生结果
    33. public void complete(Object response) {
    34. synchronized (this) {
    35. this.response = response;
    36. this.notifyAll();
    37. }
    38. }
    39. }

    中间解耦类

    1. class Mailboxes {
    2. private static Map<Integer, GuardedObject> boxes = new Hashtable<>();
    3. private static int id = 1;
    4. // 产生唯一 id
    5. private static synchronized int generateId() {
    6. return id++;
    7. }
    8. public static GuardedObject getGuardedObject(int id) {
    9. return boxes.remove(id);
    10. }
    11. public static GuardedObject createGuardedObject() {
    12. GuardedObject go = new GuardedObject(generateId());
    13. boxes.put(go.getId(), go);
    14. return go;
    15. }
    16. public static Set<Integer> getIds() {
    17. return boxes.keySet();
    18. }
    19. }

    业务相关类 ```java class People extends Thread { @Override public void run() {

    1. // 收信
    2. GuardedObject guardedObject = Mailboxes.createGuardedObject();
    3. log.debug("开始收信 id:{}", guardedObject.getId());
    4. Object mail = guardedObject.get(5000);
    5. log.debug("收到信 id:{}, 内容:{}", guardedObject.getId(), mail);

    } }

class Postman extends Thread { private int id; private String mail;

  1. public Postman(int id, String mail) {
  2. this.id = id;
  3. this.mail = mail;
  4. }
  5. @Override
  6. public void run() {
  7. GuardedObject guardedObject = Mailboxes.getGuardedObject(id);
  8. log.debug("送信 id:{}, 内容:{}", id, mail);
  9. guardedObject.complete(mail);
  10. }

}

public static void main(String[] args) throws InterruptedException { for (int i = 0; i < 3; i++) { new People().start(); } Thread.sleep(1); for (Integer id : Mailboxes.getIds()) { new Postman(id, “内容” + id).start(); } }

  1. <a name="WGXAF"></a>
  2. # 顺序控制模式
  3. <a name="UI2pU"></a>
  4. ## 固定运行顺序
  5. 比如,必须先 2 后 1 打印
  6. - 固定运行顺序 - wait() / notify() 实现
  7. ```java
  8. static final Object LOCK = new Object();
  9. // 判断先执行的内容是否执行完毕
  10. static Boolean judge = false;
  11. public static void main(String[] args) {
  12. new Thread(()->{
  13. synchronized (LOCK) {
  14. while (!judge) {
  15. try {
  16. LOCK.wait();
  17. } catch (InterruptedException e) {
  18. e.printStackTrace();
  19. }
  20. }
  21. System.out.println("2");
  22. }
  23. }).start();
  24. new Thread(()->{
  25. synchronized (LOCK) {
  26. System.out.println("1");
  27. judge = true;
  28. // 执行完毕,唤醒所有等待线程
  29. LOCK.notifyAll();
  30. }
  31. }).start();
  32. }
  • 固定运行顺序 - park() / unpark() 实现 ```java Thread t1 = new Thread(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { } // 当没有『许可』时,当前线程暂停运行;有『许可』时,用掉这个『许可』,当前线程恢复运行 LockSupport.park(); System.out.println(“1”); });

Thread t2 = new Thread(() -> { System.out.println(“2”); // 给线程 t1 发放『许可』(多次连续调用 unpark 只会发放一个『许可』) LockSupport.unpark(t1); });

t1.start(); t2.start();

  1. <a name="VA5Jg"></a>
  2. ## 交替输出
  3. 线程 1 输出 a 5 次,线程 2 输出 b 5 次,线程 3 输出 c 5 次。现在要求输出 abcabcabcabcabc
  4. - 交替输出 - wait() / notify() 实现
  5. ```java
  6. public static void main(String[] args) {
  7. SyncWaitNotify syncWaitNotify = new SyncWaitNotify(1, 5);
  8. new Thread(() -> {
  9. syncWaitNotify.print(1, 2, "a");
  10. }, "t1").start();
  11. new Thread(() -> {
  12. syncWaitNotify.print(2, 3, "b");
  13. }, "t1").start();
  14. new Thread(() -> {
  15. syncWaitNotify.print(3, 1, "c");
  16. }, "t1").start();
  17. }
  18. class SyncWaitNotify {
  19. // 线程的执行标记:1->a 2->b 3->c
  20. private int flag;
  21. // 输出 abc 的循环次数
  22. private int loopNumber;
  23. public SyncWaitNotify(int flag, int loopNumber) {
  24. this.flag = flag;
  25. this.loopNumber = loopNumber;
  26. }
  27. public void print(int waitFlag, int nextFlag, String str) {
  28. for (int i = 0; i < loopNumber; i++) {
  29. synchronized (this) {
  30. while (this.flag != waitFlag) {
  31. try {
  32. this.wait();
  33. } catch (InterruptedException e) {
  34. e.printStackTrace();
  35. }
  36. }
  37. System.out.print(str);
  38. this.flag = nextFlag;
  39. this.notifyAll();
  40. }
  41. }
  42. }
  43. }
  • 交替输出 - await() / signal() 实现 ```java public class Uninterruptible { static AwaitSignal awaitSignal = new AwaitSignal(); static Condition conditionA = awaitSignal.newCondition(); static Condition conditionB = awaitSignal.newCondition(); static Condition conditionC = awaitSignal.newCondition();

    public static void main(String[] args) throws InterruptedException {

    1. new Thread(() -> {
    2. awaitSignal.print("a", conditionA, conditionB);
    3. },"t1").start();
    4. new Thread(() -> {
    5. awaitSignal.print("b", conditionB, conditionC);
    6. },"t2").start();
    7. new Thread(() -> {
    8. awaitSignal.print("c", conditionC, conditionA);
    9. },"t3").start();
    10. Thread.sleep(1000);
    11. awaitSignal.lock();
    12. try {
    13. conditionA.signal();
    14. } finally {
    15. awaitSignal.unlock();
    16. }

    } }

class AwaitSignal extends ReentrantLock {

  1. private int loopNumber = 5;
  2. public void print(String str, Condition condition, Condition nextCondition) {
  3. for (int i = 0; i < loopNumber; i++) {
  4. this.lock();
  5. try {
  6. //全部进入等待状态
  7. condition.await();
  8. System.out.print(str);
  9. nextCondition.signal();
  10. } catch (InterruptedException e) {
  11. e.printStackTrace();
  12. } finally {
  13. this.unlock();
  14. }
  15. }
  16. }

}

  1. - 交替输出 - park() / unpark()
  2. ```java
  3. public class Uninterruptible {
  4. public static void main(String[] args) {
  5. SyncPark syncPark = new SyncPark();
  6. Thread t1 = new Thread(() -> {
  7. syncPark.print("a");
  8. });
  9. Thread t2 = new Thread(() -> {
  10. syncPark.print("b");
  11. });
  12. Thread t3 = new Thread(() -> {
  13. syncPark.print("c");
  14. });
  15. syncPark.setThreads(t1, t2, t3);
  16. syncPark.start();
  17. }
  18. }
  19. class SyncPark {
  20. private int loopNumber = 5;
  21. private Thread[] threads;
  22. public void setThreads(Thread... threads) {
  23. this.threads = threads;
  24. }
  25. public void start() {
  26. for (Thread thread : threads) {
  27. thread.start();
  28. }
  29. LockSupport.unpark(threads[0]);
  30. }
  31. public void print(String str) {
  32. for (int i = 0; i < loopNumber; i++) {
  33. LockSupport.park();
  34. System.out.print(str);
  35. LockSupport.unpark(nextThread());
  36. }
  37. }
  38. private Thread nextThread() {
  39. Thread current = Thread.currentThread();
  40. int index = 0;
  41. for (int i = 0; i < threads.length; i++) {
  42. if (threads[i] == current) {
  43. index = i;
  44. break;
  45. }
  46. }
  47. if (index < threads.length - 1) {
  48. return threads[index + 1];
  49. } else {
  50. return threads[0];
  51. }
  52. }
  53. }

犹豫模式

犹豫模式 (Balking) 用在:一个线程发现另一个线程或本线程已经做了某一件相同的事,那么本线程就无需再做 了,直接结束返回
用一个标记来判断该任务是否已经被执行过了

  1. @Service
  2. @Slf4j
  3. public class MonitorService {
  4. private volatile boolean stop;
  5. // 标记是否执行过 start()
  6. private volatile boolean starting;
  7. private Thread monitorThread;
  8. public void start() {
  9. synchronized (this) {
  10. if (starting) {
  11. return;
  12. }
  13. starting = true;
  14. }
  15. // 由于上面代码实现的 balking 模式,以下代码只可能被一个线程执行,因此无需互斥
  16. monitorThread = new Thread(() -> {
  17. while (!stop) {
  18. report();
  19. sleep(2);
  20. }
  21. log.info("监控线程已停止...");
  22. starting = false;
  23. });
  24. stop = false;
  25. log.info("监控线程已启动...");
  26. monitorThread.start();
  27. }
  28. public synchronized void stop() {
  29. stop = true;
  30. monitorThread.interrupt();
  31. }
  32. }