并发编程-应用篇
1. 限制对 CPU 的使用
限制 CPU 高占用率。
1.1. sleep 实现
在没有利用 cpu 来计算时,让 while(true) 空转会浪费 cpu,这时可以使用 yield 或 sleep 来让出 cpu 的使用权给其他程序,从而减少cpu占用率
while (true) {try {/** 在没有利用 cpu 来计算时,让 while(true) 空转会浪费 cpu,* 这时可以使用 yield 或 sleep 来让出 cpu 的使用权给其他程序,* 从而减少cpu占用率*/Thread.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}}
- 可以用
wait或条件变量达到类似的效果 - 不同的是,后两种都需要加锁,并且需要相应的唤醒操作,一般适用于要进行同步的场景
sleep适用于无需锁同步的场景1.2. wait 实现
``` synchronized(锁对象) { while(条件不满足) {
} // do something… }try {锁对象.wait();} catch(InterruptedException e) {e.printStackTrace();}
### 1.3. 条件变量实现
lock.lock(); try { while(条件不满足) { try { 条件变量.await(); } catch (InterruptedException e) { e.printStackTrace(); } } // do something… } finally { lock.unlock(); }
## 2. 限制对共享资源的使用# 并发编程-模式篇## 1. 终止模式之两阶段终止模式两个不同的线程,如何让一个线程停止另一个线程,并且让停止的线程完成一些停止前的操作。### 1.1. 方案1:利用 isInterrupted 打断标识调用线程的 `interrupt` 方法可以打断正在执行的线程,无论线程是`sleep`、`wait`,还是正常运行。值得注意,如果线程在休眠状态,打断标识会被清除。
public static void main(String[] args) throws InterruptedException { System.out.println(“程序开始….”); // 方案1:利用 isInterrupted 打断标识 useIsInterrupted(); System.out.println(“程序结束….”); }
// 方案1:利用 isInterrupted 打断标识 public static void useIsInterrupted() throws InterruptedException { Thread t = new Thread(() -> { while (true) { Thread current = Thread.currentThread(); if (current.isInterrupted()) { System.out.println(“收到结束指示,进行结束前处理!”); break; } try { // 注意:当sleep状态被打断后,打断标识会被清除,所以异常捕获后要手动再次进行打断,因为运行时打断不会清除打断标识 Thread.sleep(1000); System.out.println(“线程的业务处理….”); } catch (InterruptedException e) { current.interrupt(); }
}}, "监控线程");t.start();Thread.sleep(3500);// 打断线程t.interrupt();
}
### 1.2. 方案2:自定义停止标记
private static boolean stop = false;
public static void main(String[] args) throws InterruptedException { System.out.println(“程序开始….”); // 方案2:自定义停止标记 useCustomFlag(); System.out.println(“程序结束….”); }
// 方案2:自定义停止标记 public static void useCustomFlag() throws InterruptedException { Thread t = new Thread(() -> { while (true) { if (stop) { System.out.println(“收到结束指示,进行结束前处理!”); break; } try { Thread.sleep(1000); System.out.println(“线程的业务处理….”); } catch (InterruptedException e) { e.printStackTrace(); } } }, “监控线程”); t.start();
Thread.sleep(3500);// 打断线程并设置标识为truet.interrupt();stop = true;
}
## 2. 同步模式之保护性暂停### 2.1. 定义保护性暂停(Guarded Suspension),本质就是用在一个线程等待另一个线程的执行结果,要点如下:- 有一个结果需要从一个线程传递到另一个线程,让他们关联同一个 `GuardedObject`- 如果有结果不断从一个线程到另一个线程那么可以使用消息队列(见生产者/消费者)- JDK 中,`join` 与 `Future` 的实现,都是采用此模式- 因为要等待另一方的结果,因此归类到同步模式### 2.2. 基础使用示例定义 `GuardedObject` 类,- 提供获取数据方法,如未得到返回结果,则线程进入等待;- 提供数据完成方法,如得到返回结果,则唤醒线程
class GuardedObject { private Object response; private final Object lock = new Object();
public Object get() {synchronized (lock) {// 判断无响应结果,则循环等待while (response == null) {try {LOGGER.info("waiting....");lock.wait();} catch (InterruptedException e) {e.printStackTrace();}}return response;}}public void complete(Object response) {synchronized (lock) {// 条件满足,通知等待线程this.response = response;lock.notifyAll();}}
}
测试
@Test public void testGuardedObjectBasic() { // 创建 GuardedObject 实例 GuardedObject guardedObject = new GuardedObject();
// 创建获取数据的线程new Thread(() -> {try {LOGGER.info("load start...");Thread.sleep(4000); // 模拟业务处理LOGGER.info("load complete...");// 返回结果并唤醒线程guardedObject.complete("i am result");} catch (InterruptedException e) {e.printStackTrace();}}).start();// 另一个线程获取结果,结果返回前会进行等待Object o = guardedObject.get();LOGGER.info("get response: [{}]", o);
}
### 2.3. GuardedObject 超时设置改进版这里需要注意线程在等待时,未返回结果前被唤醒,需要计算开始进入等待已经历的时长。
class GuardedObjectV2 { private Object response; private final Object lock = new Object();
/*** 可设置超时时间** @param timeout* @return*/public Object get(long timeout) {synchronized (lock) {// 记录线程开始执行的时间long base = System.currentTimeMillis();// 记录已等待的时间long timePassed = 0;// 判断无响应结果,则循环等待while (response == null) {// 通过超时时间与唤醒前已等待的时间,计算剩余可等待时间long waitTime = timeout - timePassed;GuardedObjectDemo.LOGGER.info("waiting time: {}", waitTime);// 判断是否等待超时if (waitTime <= 0) {GuardedObjectDemo.LOGGER.error("waiting time up, break..");break;}try {lock.wait(waitTime);} catch (InterruptedException e) {e.printStackTrace();}// 这里需要注意被提前唤醒的情况。记录一下当前被唤醒经历的时长timePassed = System.currentTimeMillis() - base;}return response;}}public void complete(Object response) {synchronized (lock) {// 条件满足,通知等待线程this.response = response;GuardedObjectDemo.LOGGER.info("notifyAll...");lock.notifyAll();}}
}
### 2.4. GuardedObject 多任务改进版如果涉及多个异步线程需要等待不同的线程的返回结果,那需要创建多个 `GuardedObject`。<br />如果需要在多个类之间使用 `GuardedObject` 对象,作为参数传递不是很方便,因此设计一个用来解耦的中间类,这样不仅能够解耦【结果等待者】和【结果生产者】,还能够同时支持多个任务的管理<br /><br />对 `GuardedObject` 类进行改造,增加 `id` 用来标识不同的任务线程。
class GuardedObjectV3 {
// 标识不同的 GuardedObjectprivate final int id;private Object response;private final Object lock = new Object();public GuardedObjectV3(int id) {this.id = id;}public int getId() {return id;}....
}
定义一个中间解耦类,用于创建和保存不同的 `GuardedObject` 实例
class TaskManagement {
// 保存多个 GuardedObject 任务
private final static Map
private static int id = 1;// 生成 GuardedObject 相应的idprivate static synchronized int generateId() {return id++;}public static GuardedObjectV3 getGuardedObject(int id) {// 从容器获取相应的 GuardedObject 并移除return GUARDED_MAP.remove(id);}public static GuardedObjectV3 createGuardedObject() {GuardedObjectV3 go = new GuardedObjectV3(generateId());// 放入容器并返回GUARDED_MAP.put(go.getId(), go);return go;}// 获取当前所有任务的idpublic static Set<Integer> getIds() {return GUARDED_MAP.keySet();}
}
业务测试
// 多个待接收结果的任务 for (int i = 1; i < 4; i++) { new Thread(() -> { // 创建 GuardedObject GuardedObjectV3 guardedObject = TaskManagement.createGuardedObject(); int id = guardedObject.getId(); LOGGER.info(“Receiver{} loading data…”, id); // 等待获取返回结果 Object o = guardedObject.get(5000); LOGGER.info(“Receiver{} get response: [{}]”, id, o); }).start(); }
Thread.sleep(1000);
// 获取所有任务 for (Integer id : TaskManagement.getIds()) { new Thread(() -> { GuardedObjectV3 guardedObject = TaskManagement.getGuardedObject(id); LOGGER.info(“{} sending data…”, id); try { Thread.sleep(new Random().nextInt(3000)); } catch (InterruptedException e) { e.printStackTrace(); } guardedObject.complete(id + “ :: send data”); }).start(); } System.in.read();
## 3. 异步模式之生产者/消费者### 3.1. 定义- 与保护性暂停模式中的 GuardObject 不同,不需要产生结果和消费结果的线程一一对应- 消费队列可以用来平衡生产和消费的线程资源- 生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据- 消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据- JDK 中各种阻塞队列,采用的就是这种模式### 3.2. 使用示例- 定义消息队列
class MessageQueue { private final static Logger LOGGER = LoggerFactory.getLogger(MessageQueue.class);
// 消息的队列集合private final LinkedList<Message> list = new LinkedList<>();// 队列容量private final int capcity;public MessageQueue(int capcity) {this.capcity = capcity;}// 获取消息public Message take() {// 检查队列是否为空synchronized (list) {while (list.isEmpty()) {try {LOGGER.info("队列为空, 消费者线程等待");list.wait();} catch (InterruptedException e) {e.printStackTrace();}}// 从队列头部获取消息并返回Message message = list.removeFirst();LOGGER.info("已消费消息 {}", message);list.notifyAll();return message;}}// 存入消息public void put(Message message) {synchronized (list) {// 检查消息是否已满while (list.size() == capcity) {try {LOGGER.info("队列已满, 生产者线程等待");list.wait();} catch (InterruptedException e) {e.printStackTrace();}}// 将消息加入队列尾部list.addLast(message);LOGGER.info("已生产消息 {}", message);list.notifyAll();}}
}
/ 定义消息类 / final class Message { private final int id; private final Object value;
public Message(int id, Object value) {this.id = id;this.value = value;}public int getId() {return id;}public Object getValue() {return value;}
}
- 测试
// 创建消息队列 MessageQueue queue = new MessageQueue(6); // 创建多个生产者 for (int i = 1; i < 5; i++) { int id = i; new Thread(() -> { queue.put(new Message(id, “值” + id)); }, “生产者” + i).start(); }
// 创建一个消息者 new Thread(() -> { while (true) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } Message message = queue.take(); } }, “消费者”).start();
```
