并发编程-应用篇
1. 限制对 CPU 的使用
限制 CPU 高占用率。
1.1. sleep 实现
在没有利用 cpu 来计算时,让 while(true)
空转会浪费 cpu,这时可以使用 yield
或 sleep
来让出 cpu 的使用权给其他程序,从而减少cpu占用率
while (true) {
try {
/*
* 在没有利用 cpu 来计算时,让 while(true) 空转会浪费 cpu,
* 这时可以使用 yield 或 sleep 来让出 cpu 的使用权给其他程序,
* 从而减少cpu占用率
*/
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
- 可以用
wait
或条件变量达到类似的效果 - 不同的是,后两种都需要加锁,并且需要相应的唤醒操作,一般适用于要进行同步的场景
sleep
适用于无需锁同步的场景1.2. wait 实现
``` synchronized(锁对象) { while(条件不满足) {
} // do something… }try {
锁对象.wait();
} catch(InterruptedException e) {
e.printStackTrace();
}
### 1.3. 条件变量实现
lock.lock(); try { while(条件不满足) { try { 条件变量.await(); } catch (InterruptedException e) { e.printStackTrace(); } } // do something… } finally { lock.unlock(); }
## 2. 限制对共享资源的使用
# 并发编程-模式篇
## 1. 终止模式之两阶段终止模式
两个不同的线程,如何让一个线程停止另一个线程,并且让停止的线程完成一些停止前的操作。
### 1.1. 方案1:利用 isInterrupted 打断标识
调用线程的 `interrupt` 方法可以打断正在执行的线程,无论线程是`sleep`、`wait`,还是正常运行。值得注意,如果线程在休眠状态,打断标识会被清除。
public static void main(String[] args) throws InterruptedException { System.out.println(“程序开始….”); // 方案1:利用 isInterrupted 打断标识 useIsInterrupted(); System.out.println(“程序结束….”); }
// 方案1:利用 isInterrupted 打断标识 public static void useIsInterrupted() throws InterruptedException { Thread t = new Thread(() -> { while (true) { Thread current = Thread.currentThread(); if (current.isInterrupted()) { System.out.println(“收到结束指示,进行结束前处理!”); break; } try { // 注意:当sleep状态被打断后,打断标识会被清除,所以异常捕获后要手动再次进行打断,因为运行时打断不会清除打断标识 Thread.sleep(1000); System.out.println(“线程的业务处理….”); } catch (InterruptedException e) { current.interrupt(); }
}
}, "监控线程");
t.start();
Thread.sleep(3500);
// 打断线程
t.interrupt();
}
### 1.2. 方案2:自定义停止标记
private static boolean stop = false;
public static void main(String[] args) throws InterruptedException { System.out.println(“程序开始….”); // 方案2:自定义停止标记 useCustomFlag(); System.out.println(“程序结束….”); }
// 方案2:自定义停止标记 public static void useCustomFlag() throws InterruptedException { Thread t = new Thread(() -> { while (true) { if (stop) { System.out.println(“收到结束指示,进行结束前处理!”); break; } try { Thread.sleep(1000); System.out.println(“线程的业务处理….”); } catch (InterruptedException e) { e.printStackTrace(); } } }, “监控线程”); t.start();
Thread.sleep(3500);
// 打断线程并设置标识为true
t.interrupt();
stop = true;
}
## 2. 同步模式之保护性暂停
### 2.1. 定义
保护性暂停(Guarded Suspension),本质就是用在一个线程等待另一个线程的执行结果,要点如下:
- 有一个结果需要从一个线程传递到另一个线程,让他们关联同一个 `GuardedObject`
- 如果有结果不断从一个线程到另一个线程那么可以使用消息队列(见生产者/消费者)
- JDK 中,`join` 与 `Future` 的实现,都是采用此模式
- 因为要等待另一方的结果,因此归类到同步模式

### 2.2. 基础使用示例
定义 `GuardedObject` 类,
- 提供获取数据方法,如未得到返回结果,则线程进入等待;
- 提供数据完成方法,如得到返回结果,则唤醒线程
class GuardedObject { private Object response; private final Object lock = new Object();
public Object get() {
synchronized (lock) {
// 判断无响应结果,则循环等待
while (response == null) {
try {
LOGGER.info("waiting....");
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return response;
}
}
public void complete(Object response) {
synchronized (lock) {
// 条件满足,通知等待线程
this.response = response;
lock.notifyAll();
}
}
}
测试
@Test public void testGuardedObjectBasic() { // 创建 GuardedObject 实例 GuardedObject guardedObject = new GuardedObject();
// 创建获取数据的线程
new Thread(() -> {
try {
LOGGER.info("load start...");
Thread.sleep(4000); // 模拟业务处理
LOGGER.info("load complete...");
// 返回结果并唤醒线程
guardedObject.complete("i am result");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
// 另一个线程获取结果,结果返回前会进行等待
Object o = guardedObject.get();
LOGGER.info("get response: [{}]", o);
}
### 2.3. GuardedObject 超时设置改进版
这里需要注意线程在等待时,未返回结果前被唤醒,需要计算开始进入等待已经历的时长。
class GuardedObjectV2 { private Object response; private final Object lock = new Object();
/**
* 可设置超时时间
*
* @param timeout
* @return
*/
public Object get(long timeout) {
synchronized (lock) {
// 记录线程开始执行的时间
long base = System.currentTimeMillis();
// 记录已等待的时间
long timePassed = 0;
// 判断无响应结果,则循环等待
while (response == null) {
// 通过超时时间与唤醒前已等待的时间,计算剩余可等待时间
long waitTime = timeout - timePassed;
GuardedObjectDemo.LOGGER.info("waiting time: {}", waitTime);
// 判断是否等待超时
if (waitTime <= 0) {
GuardedObjectDemo.LOGGER.error("waiting time up, break..");
break;
}
try {
lock.wait(waitTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 这里需要注意被提前唤醒的情况。记录一下当前被唤醒经历的时长
timePassed = System.currentTimeMillis() - base;
}
return response;
}
}
public void complete(Object response) {
synchronized (lock) {
// 条件满足,通知等待线程
this.response = response;
GuardedObjectDemo.LOGGER.info("notifyAll...");
lock.notifyAll();
}
}
}
### 2.4. GuardedObject 多任务改进版
如果涉及多个异步线程需要等待不同的线程的返回结果,那需要创建多个 `GuardedObject`。<br />如果需要在多个类之间使用 `GuardedObject` 对象,作为参数传递不是很方便,因此设计一个用来解耦的中间类,这样不仅能够解耦【结果等待者】和【结果生产者】,还能够同时支持多个任务的管理<br /><br />对 `GuardedObject` 类进行改造,增加 `id` 用来标识不同的任务线程。
class GuardedObjectV3 {
// 标识不同的 GuardedObject
private final int id;
private Object response;
private final Object lock = new Object();
public GuardedObjectV3(int id) {
this.id = id;
}
public int getId() {
return id;
}
....
}
定义一个中间解耦类,用于创建和保存不同的 `GuardedObject` 实例
class TaskManagement {
// 保存多个 GuardedObject 任务
private final static Map
private static int id = 1;
// 生成 GuardedObject 相应的id
private static synchronized int generateId() {
return id++;
}
public static GuardedObjectV3 getGuardedObject(int id) {
// 从容器获取相应的 GuardedObject 并移除
return GUARDED_MAP.remove(id);
}
public static GuardedObjectV3 createGuardedObject() {
GuardedObjectV3 go = new GuardedObjectV3(generateId());
// 放入容器并返回
GUARDED_MAP.put(go.getId(), go);
return go;
}
// 获取当前所有任务的id
public static Set<Integer> getIds() {
return GUARDED_MAP.keySet();
}
}
业务测试
// 多个待接收结果的任务 for (int i = 1; i < 4; i++) { new Thread(() -> { // 创建 GuardedObject GuardedObjectV3 guardedObject = TaskManagement.createGuardedObject(); int id = guardedObject.getId(); LOGGER.info(“Receiver{} loading data…”, id); // 等待获取返回结果 Object o = guardedObject.get(5000); LOGGER.info(“Receiver{} get response: [{}]”, id, o); }).start(); }
Thread.sleep(1000);
// 获取所有任务 for (Integer id : TaskManagement.getIds()) { new Thread(() -> { GuardedObjectV3 guardedObject = TaskManagement.getGuardedObject(id); LOGGER.info(“{} sending data…”, id); try { Thread.sleep(new Random().nextInt(3000)); } catch (InterruptedException e) { e.printStackTrace(); } guardedObject.complete(id + “ :: send data”); }).start(); } System.in.read();
## 3. 异步模式之生产者/消费者
### 3.1. 定义
- 与保护性暂停模式中的 GuardObject 不同,不需要产生结果和消费结果的线程一一对应
- 消费队列可以用来平衡生产和消费的线程资源
- 生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据
- 消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据
- JDK 中各种阻塞队列,采用的就是这种模式

### 3.2. 使用示例
- 定义消息队列
class MessageQueue { private final static Logger LOGGER = LoggerFactory.getLogger(MessageQueue.class);
// 消息的队列集合
private final LinkedList<Message> list = new LinkedList<>();
// 队列容量
private final int capcity;
public MessageQueue(int capcity) {
this.capcity = capcity;
}
// 获取消息
public Message take() {
// 检查队列是否为空
synchronized (list) {
while (list.isEmpty()) {
try {
LOGGER.info("队列为空, 消费者线程等待");
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 从队列头部获取消息并返回
Message message = list.removeFirst();
LOGGER.info("已消费消息 {}", message);
list.notifyAll();
return message;
}
}
// 存入消息
public void put(Message message) {
synchronized (list) {
// 检查消息是否已满
while (list.size() == capcity) {
try {
LOGGER.info("队列已满, 生产者线程等待");
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 将消息加入队列尾部
list.addLast(message);
LOGGER.info("已生产消息 {}", message);
list.notifyAll();
}
}
}
/ 定义消息类 / final class Message { private final int id; private final Object value;
public Message(int id, Object value) {
this.id = id;
this.value = value;
}
public int getId() {
return id;
}
public Object getValue() {
return value;
}
}
- 测试
// 创建消息队列 MessageQueue queue = new MessageQueue(6); // 创建多个生产者 for (int i = 1; i < 5; i++) { int id = i; new Thread(() -> { queue.put(new Message(id, “值” + id)); }, “生产者” + i).start(); }
// 创建一个消息者 new Thread(() -> { while (true) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } Message message = queue.take(); } }, “消费者”).start();
```