并发编程-应用篇

1. 限制对 CPU 的使用

限制 CPU 高占用率。

1.1. sleep 实现

在没有利用 cpu 来计算时,让 while(true) 空转会浪费 cpu,这时可以使用 yieldsleep 来让出 cpu 的使用权给其他程序,从而减少cpu占用率

  1. while (true) {
  2. try {
  3. /*
  4. * 在没有利用 cpu 来计算时,让 while(true) 空转会浪费 cpu,
  5. * 这时可以使用 yield 或 sleep 来让出 cpu 的使用权给其他程序,
  6. * 从而减少cpu占用率
  7. */
  8. Thread.sleep(1);
  9. } catch (InterruptedException e) {
  10. e.printStackTrace();
  11. }
  12. }
  • 可以用 wait 或条件变量达到类似的效果
  • 不同的是,后两种都需要加锁,并且需要相应的唤醒操作,一般适用于要进行同步的场景
  • sleep 适用于无需锁同步的场景

    1.2. wait 实现

    ``` synchronized(锁对象) { while(条件不满足) {
    1. try {
    2. 锁对象.wait();
    3. } catch(InterruptedException e) {
    4. e.printStackTrace();
    5. }
    } // do something… }
  1. ### 1.3. 条件变量实现

lock.lock(); try { while(条件不满足) { try { 条件变量.await(); } catch (InterruptedException e) { e.printStackTrace(); } } // do something… } finally { lock.unlock(); }

  1. ## 2. 限制对共享资源的使用
  2. # 并发编程-模式篇
  3. ## 1. 终止模式之两阶段终止模式
  4. 两个不同的线程,如何让一个线程停止另一个线程,并且让停止的线程完成一些停止前的操作。
  5. ### 1.1. 方案1:利用 isInterrupted 打断标识
  6. 调用线程的 `interrupt` 方法可以打断正在执行的线程,无论线程是`sleep``wait`,还是正常运行。值得注意,如果线程在休眠状态,打断标识会被清除。

public static void main(String[] args) throws InterruptedException { System.out.println(“程序开始….”); // 方案1:利用 isInterrupted 打断标识 useIsInterrupted(); System.out.println(“程序结束….”); }

// 方案1:利用 isInterrupted 打断标识 public static void useIsInterrupted() throws InterruptedException { Thread t = new Thread(() -> { while (true) { Thread current = Thread.currentThread(); if (current.isInterrupted()) { System.out.println(“收到结束指示,进行结束前处理!”); break; } try { // 注意:当sleep状态被打断后,打断标识会被清除,所以异常捕获后要手动再次进行打断,因为运行时打断不会清除打断标识 Thread.sleep(1000); System.out.println(“线程的业务处理….”); } catch (InterruptedException e) { current.interrupt(); }

  1. }
  2. }, "监控线程");
  3. t.start();
  4. Thread.sleep(3500);
  5. // 打断线程
  6. t.interrupt();

}

  1. ### 1.2. 方案2:自定义停止标记

private static boolean stop = false;

public static void main(String[] args) throws InterruptedException { System.out.println(“程序开始….”); // 方案2:自定义停止标记 useCustomFlag(); System.out.println(“程序结束….”); }

// 方案2:自定义停止标记 public static void useCustomFlag() throws InterruptedException { Thread t = new Thread(() -> { while (true) { if (stop) { System.out.println(“收到结束指示,进行结束前处理!”); break; } try { Thread.sleep(1000); System.out.println(“线程的业务处理….”); } catch (InterruptedException e) { e.printStackTrace(); } } }, “监控线程”); t.start();

  1. Thread.sleep(3500);
  2. // 打断线程并设置标识为true
  3. t.interrupt();
  4. stop = true;

}

  1. ## 2. 同步模式之保护性暂停
  2. ### 2.1. 定义
  3. 保护性暂停(Guarded Suspension),本质就是用在一个线程等待另一个线程的执行结果,要点如下:
  4. - 有一个结果需要从一个线程传递到另一个线程,让他们关联同一个 `GuardedObject`
  5. - 如果有结果不断从一个线程到另一个线程那么可以使用消息队列(见生产者/消费者)
  6. - JDK 中,`join` `Future` 的实现,都是采用此模式
  7. - 因为要等待另一方的结果,因此归类到同步模式
  8. ![](https://gitee.com/moonzero/images/raw/master/code-note/20211216094733583_756.png)
  9. ### 2.2. 基础使用示例
  10. 定义 `GuardedObject` 类,
  11. - 提供获取数据方法,如未得到返回结果,则线程进入等待;
  12. - 提供数据完成方法,如得到返回结果,则唤醒线程

class GuardedObject { private Object response; private final Object lock = new Object();

  1. public Object get() {
  2. synchronized (lock) {
  3. // 判断无响应结果,则循环等待
  4. while (response == null) {
  5. try {
  6. LOGGER.info("waiting....");
  7. lock.wait();
  8. } catch (InterruptedException e) {
  9. e.printStackTrace();
  10. }
  11. }
  12. return response;
  13. }
  14. }
  15. public void complete(Object response) {
  16. synchronized (lock) {
  17. // 条件满足,通知等待线程
  18. this.response = response;
  19. lock.notifyAll();
  20. }
  21. }

}

  1. 测试

@Test public void testGuardedObjectBasic() { // 创建 GuardedObject 实例 GuardedObject guardedObject = new GuardedObject();

  1. // 创建获取数据的线程
  2. new Thread(() -> {
  3. try {
  4. LOGGER.info("load start...");
  5. Thread.sleep(4000); // 模拟业务处理
  6. LOGGER.info("load complete...");
  7. // 返回结果并唤醒线程
  8. guardedObject.complete("i am result");
  9. } catch (InterruptedException e) {
  10. e.printStackTrace();
  11. }
  12. }).start();
  13. // 另一个线程获取结果,结果返回前会进行等待
  14. Object o = guardedObject.get();
  15. LOGGER.info("get response: [{}]", o);

}

  1. ### 2.3. GuardedObject 超时设置改进版
  2. 这里需要注意线程在等待时,未返回结果前被唤醒,需要计算开始进入等待已经历的时长。

class GuardedObjectV2 { private Object response; private final Object lock = new Object();

  1. /**
  2. * 可设置超时时间
  3. *
  4. * @param timeout
  5. * @return
  6. */
  7. public Object get(long timeout) {
  8. synchronized (lock) {
  9. // 记录线程开始执行的时间
  10. long base = System.currentTimeMillis();
  11. // 记录已等待的时间
  12. long timePassed = 0;
  13. // 判断无响应结果,则循环等待
  14. while (response == null) {
  15. // 通过超时时间与唤醒前已等待的时间,计算剩余可等待时间
  16. long waitTime = timeout - timePassed;
  17. GuardedObjectDemo.LOGGER.info("waiting time: {}", waitTime);
  18. // 判断是否等待超时
  19. if (waitTime <= 0) {
  20. GuardedObjectDemo.LOGGER.error("waiting time up, break..");
  21. break;
  22. }
  23. try {
  24. lock.wait(waitTime);
  25. } catch (InterruptedException e) {
  26. e.printStackTrace();
  27. }
  28. // 这里需要注意被提前唤醒的情况。记录一下当前被唤醒经历的时长
  29. timePassed = System.currentTimeMillis() - base;
  30. }
  31. return response;
  32. }
  33. }
  34. public void complete(Object response) {
  35. synchronized (lock) {
  36. // 条件满足,通知等待线程
  37. this.response = response;
  38. GuardedObjectDemo.LOGGER.info("notifyAll...");
  39. lock.notifyAll();
  40. }
  41. }

}

  1. ### 2.4. GuardedObject 多任务改进版
  2. 如果涉及多个异步线程需要等待不同的线程的返回结果,那需要创建多个 `GuardedObject`。<br />如果需要在多个类之间使用 `GuardedObject` 对象,作为参数传递不是很方便,因此设计一个用来解耦的中间类,这样不仅能够解耦【结果等待者】和【结果生产者】,还能够同时支持多个任务的管理<br />![](https://gitee.com/moonzero/images/raw/master/code-note/20211216140915024_12380.png)<br />对 `GuardedObject` 类进行改造,增加 `id` 用来标识不同的任务线程。

class GuardedObjectV3 {

  1. // 标识不同的 GuardedObject
  2. private final int id;
  3. private Object response;
  4. private final Object lock = new Object();
  5. public GuardedObjectV3(int id) {
  6. this.id = id;
  7. }
  8. public int getId() {
  9. return id;
  10. }
  11. ....

}

  1. 定义一个中间解耦类,用于创建和保存不同的 `GuardedObject` 实例

class TaskManagement { // 保存多个 GuardedObject 任务 private final static Map GUARDED_MAP = new ConcurrentHashMap<>();

  1. private static int id = 1;
  2. // 生成 GuardedObject 相应的id
  3. private static synchronized int generateId() {
  4. return id++;
  5. }
  6. public static GuardedObjectV3 getGuardedObject(int id) {
  7. // 从容器获取相应的 GuardedObject 并移除
  8. return GUARDED_MAP.remove(id);
  9. }
  10. public static GuardedObjectV3 createGuardedObject() {
  11. GuardedObjectV3 go = new GuardedObjectV3(generateId());
  12. // 放入容器并返回
  13. GUARDED_MAP.put(go.getId(), go);
  14. return go;
  15. }
  16. // 获取当前所有任务的id
  17. public static Set<Integer> getIds() {
  18. return GUARDED_MAP.keySet();
  19. }

}

  1. 业务测试

// 多个待接收结果的任务 for (int i = 1; i < 4; i++) { new Thread(() -> { // 创建 GuardedObject GuardedObjectV3 guardedObject = TaskManagement.createGuardedObject(); int id = guardedObject.getId(); LOGGER.info(“Receiver{} loading data…”, id); // 等待获取返回结果 Object o = guardedObject.get(5000); LOGGER.info(“Receiver{} get response: [{}]”, id, o); }).start(); }

Thread.sleep(1000);

// 获取所有任务 for (Integer id : TaskManagement.getIds()) { new Thread(() -> { GuardedObjectV3 guardedObject = TaskManagement.getGuardedObject(id); LOGGER.info(“{} sending data…”, id); try { Thread.sleep(new Random().nextInt(3000)); } catch (InterruptedException e) { e.printStackTrace(); } guardedObject.complete(id + “ :: send data”); }).start(); } System.in.read();

  1. ## 3. 异步模式之生产者/消费者
  2. ### 3.1. 定义
  3. - 与保护性暂停模式中的 GuardObject 不同,不需要产生结果和消费结果的线程一一对应
  4. - 消费队列可以用来平衡生产和消费的线程资源
  5. - 生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据
  6. - 消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据
  7. - JDK 中各种阻塞队列,采用的就是这种模式
  8. ![](https://gitee.com/moonzero/images/raw/master/code-note/20211216113129428_22994.png)
  9. ### 3.2. 使用示例
  10. - 定义消息队列

class MessageQueue { private final static Logger LOGGER = LoggerFactory.getLogger(MessageQueue.class);

  1. // 消息的队列集合
  2. private final LinkedList<Message> list = new LinkedList<>();
  3. // 队列容量
  4. private final int capcity;
  5. public MessageQueue(int capcity) {
  6. this.capcity = capcity;
  7. }
  8. // 获取消息
  9. public Message take() {
  10. // 检查队列是否为空
  11. synchronized (list) {
  12. while (list.isEmpty()) {
  13. try {
  14. LOGGER.info("队列为空, 消费者线程等待");
  15. list.wait();
  16. } catch (InterruptedException e) {
  17. e.printStackTrace();
  18. }
  19. }
  20. // 从队列头部获取消息并返回
  21. Message message = list.removeFirst();
  22. LOGGER.info("已消费消息 {}", message);
  23. list.notifyAll();
  24. return message;
  25. }
  26. }
  27. // 存入消息
  28. public void put(Message message) {
  29. synchronized (list) {
  30. // 检查消息是否已满
  31. while (list.size() == capcity) {
  32. try {
  33. LOGGER.info("队列已满, 生产者线程等待");
  34. list.wait();
  35. } catch (InterruptedException e) {
  36. e.printStackTrace();
  37. }
  38. }
  39. // 将消息加入队列尾部
  40. list.addLast(message);
  41. LOGGER.info("已生产消息 {}", message);
  42. list.notifyAll();
  43. }
  44. }

}

/ 定义消息类 / final class Message { private final int id; private final Object value;

  1. public Message(int id, Object value) {
  2. this.id = id;
  3. this.value = value;
  4. }
  5. public int getId() {
  6. return id;
  7. }
  8. public Object getValue() {
  9. return value;
  10. }

}

  1. - 测试

// 创建消息队列 MessageQueue queue = new MessageQueue(6); // 创建多个生产者 for (int i = 1; i < 5; i++) { int id = i; new Thread(() -> { queue.put(new Message(id, “值” + id)); }, “生产者” + i).start(); }

// 创建一个消息者 new Thread(() -> { while (true) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } Message message = queue.take(); } }, “消费者”).start();

```