1 同步模式
1 保护性暂停
保护性暂停(Guarded Suspension)定义
- 一个线程等待另一个线程的执行结果
- 产生结果的线程和消费结果的线程一一对应
- JDK中,
thread.join()
的实现、Future
的实现,采用的就是此模式 - 因为要等待另一方的结果,因此归类到同步模式
实现过程
- 有一个结果需要从一个线程传递到另一个线程,让他们关联同一个GuardedObject
- 如果有结果不断从一个线程到另一个线程那么可以使用消息队列(见生产者/消费者模式)
- 实例 - 多任务版GuardedObject
图中Futures就好比居民楼一层的信箱(每个信箱有房间编号),左侧的t0,t2,t4就好比等待邮件的居民,右侧的t1,t3,t5 就好比邮递员
如果需要在多个类之间使用多个GuardedObject对象,作为参数传递不是很方便,因此设计一个用来解耦的中间类Mailboxes(RPC框架中也用到了这个思想),这样不仅能够解耦【结果等待者】和【结果生产者】,还能够同时支持多个任务的管理
- 代码如下 ```java import lombok.extern.slf4j.Slf4j;
import java.util.Hashtable; import java.util.Map; import java.util.Set;
@Slf4j(topic = “c.Test20”) public class Test20 { public static void main(String[] args) throws InterruptedException { //开启三个居民线程收信 for (int i = 0; i < 3; i++) { new People().start(); }
//开启三个邮递员线程送信
Thread.sleep(1000);
for (Integer id : Mailboxes.getIds()) {
new Postman(id, "内容" + id).start();
}
}
}
//居民,等待从信箱中收信 @Slf4j(topic = “c.People”) class People extends Thread { @Override public void run() { //新建信箱 GuardedObject guardedObject = Mailboxes.createGuardedObject(); log.debug(“开始收信 id:{}”, guardedObject.getId()); Object mail = guardedObject.get(5000); log.debug(“收到信 id:{}, 内容:{}”, guardedObject.getId(), mail); } }
//邮递员,向邮箱中送信 @Slf4j(topic = “c.Postman”) class Postman extends Thread { private int id; private String mail;
public Postman(int id, String mail) {
this.id = id;
this.mail = mail;
}
@Override
public void run() {
GuardedObject guardedObject = Mailboxes.getGuardedObject(id);
log.debug("送信 id:{}, 内容:{}", id, mail);
guardedObject.complete(mail);
}
}
//邮箱,管理消息中间类GuardedObject
class Mailboxes {
//Hashtable线程安全
private static Map
private static int id = 1;
//产生唯一 id
private static synchronized int generateId() {
return id++;
}
public static GuardedObject getGuardedObject(int id) {
return boxes.remove(id);
}
public static GuardedObject createGuardedObject() {
GuardedObject go = new GuardedObject(generateId());
boxes.put(go.getId(), go);
return go;
}
public static Set<Integer> getIds() {
return boxes.keySet();
}
}
//消息中间类 class GuardedObject {
//标识Guarded Object
private int id;
public GuardedObject(int id) {
this.id = id;
}
public int getId() {
return id;
}
//结果
private Object response;
//获取结果,需要避免虚假唤醒
//timeout - 最长等待时间
public synchronized Object get(long timeout) {
//开始时间
long begin = System.currentTimeMillis();
//经历的时间
long passedTime = 0;
while (response == null) {
//这一轮循环应该等待的时间
long waitTime = timeout - passedTime;
//经历的时间超过了最大等待时间时,退出循环
if (timeout - passedTime <= 0) {
break;
}
try {
this.wait(waitTime); // 虚假唤醒 15:00:01
} catch (InterruptedException e) {
e.printStackTrace();
}
//求得经历时间
passedTime = System.currentTimeMillis() - begin; // 15:00:02 1s
}
return response;
}
//产生结果
public synchronized void complete(Object response) {
//给结果成员变量赋值
this.response = response;
this.notifyAll();
}
}
- 上述代码输出
```java
13:50:44 [Thread-0] c.People - 开始收信 id:2
13:50:44 [Thread-2] c.People - 开始收信 id:1
13:50:44 [Thread-1] c.People - 开始收信 id:3
13:50:45 [Thread-4] c.Postman - 送信 id:2, 内容:内容2
13:50:45 [Thread-0] c.People - 收到信 id:2, 内容:内容2
13:50:45 [Thread-3] c.Postman - 送信 id:3, 内容:内容3
13:50:45 [Thread-5] c.Postman - 送信 id:1, 内容:内容1
13:50:45 [Thread-2] c.People - 收到信 id:1, 内容:内容1
13:50:45 [Thread-1] c.People - 收到信 id:3, 内容:内容3
2 顺序控制
- 顺序控制概述
- 顺序控制即控制线程的执行顺序,如指定线程12的执行顺序为12
- 严格来说顺序控制不算是一种模式,但是顺序控制的实现也具有一定通用性
1 固定运行顺序
-
1 wait() + notify()实现
public class Concurrent {
static final Object obj = new Object();
//t2运行标记,代表t2是否执行过
static boolean t2runed = false;
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
synchronized (obj) {
//如果t2没有执行过
while (!t2runed) {
try {
//t1先等一会
obj.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
System.out.println(1);
});
Thread t2 = new Thread(() -> {
System.out.println(2);
synchronized (obj) {
//修改运行标记
t2runed = true;
//通知obj上等待的线程(可能有多个,因此需要用 notifyAll)
obj.notifyAll();
}
});
t1.start();
t2.start();
}
}
可以看到,使用wait()+notify()实现很麻烦:
- 需要保证先wait()再notify(),否则wait()线程永远得不到唤醒。因此使用了运行标记作为条件来判断该不该wait
- 如果有些干扰线程错误地notify()了wait()线程,条件不满足时还要重新等待,因此需要使用while循环来解决此问题
- 最后,唤醒对象上的wait()线程需要使用notifyAll(),因为同步对象上的等待线程可能不止一个
2 park() + unpark()实现
Thread t1 = new Thread(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//当没有『许可』时,当前线程暂停运行;有『许可』时,用掉这个『许可』,当前线程恢复运行
LockSupport.park();
System.out.println("1");
});
Thread t2 = new Thread(() -> {
System.out.println("2");
//给线程t1发放『许可』
LockSupport.unpark(t1);
});
t1.start();
t2.start();
- 相比于wait()和notify(),park()和unpark()方法比较灵活,他俩谁先调用,谁后调用无所谓。并且是以线程为单位进行暂停和恢复,不需要同步锁和运行标记
2 交替输出
- 需求:线程1输出a5次,线程2输出b5次,线程3输出c5次。现在要求输出abcabcabcabcabc怎么实现
1 wait() + notify()实现
```java public class Concurrent { public static void main(String[] args) {
} }SyncWaitNotify syncWaitNotify = new SyncWaitNotify(1, 5);
new Thread(() -> {
syncWaitNotify.print(1, 2, "a");
}).start();
new Thread(() -> {
syncWaitNotify.print(2, 3, "b");
}).start();
new Thread(() -> {
syncWaitNotify.print(3, 1, "c");
}).start();
class SyncWaitNotify { //运行标记,指示可以运行的线程 private int flag; //指定循环次数,即每个线程需要打印的字符数量 private final int loopNumber;
public SyncWaitNotify(int flag, int loopNumber) {
this.flag = flag;
this.loopNumber = loopNumber;
}
public void print(int waitFlag, int nextFlag, String str) {
for (int i = 0; i < loopNumber; i++) {
synchronized (this) {
//如果当前不该当前线程输出,则等待
while (this.flag != waitFlag) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.print(str);
flag = nextFlag;
this.notifyAll();
}
}
}
}
<a name="mJaQV"></a>
#### 2 ReentrantLock实现
```java
public class Concurrent {
public static void main(String[] args) {
AwaitSignal as = new AwaitSignal(5);
Condition aWaitSet = as.newCondition();
Condition bWaitSet = as.newCondition();
Condition cWaitSet = as.newCondition();
new Thread(() -> {
as.print("a", aWaitSet, bWaitSet);
}).start();
new Thread(() -> {
as.print("b", bWaitSet, cWaitSet);
}).start();
new Thread(() -> {
as.print("c", cWaitSet, aWaitSet);
}).start();
as.start(aWaitSet);
}
}
@Slf4j()
class AwaitSignal extends ReentrantLock {
//循环次数
private final int loopNumber;
public AwaitSignal(int loopNumber) {
this.loopNumber = loopNumber;
}
public void start(Condition first) {
this.lock();
try {
log.debug("start");
first.signal();
} finally {
this.unlock();
}
}
public void print(String str, Condition current, Condition next) {
for (int i = 0; i < loopNumber; i++) {
this.lock();
try {
current.await();
log.debug(str);
next.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
this.unlock();
}
}
}
}
3 park() + unpark()实现
public class Concurrent {
public static void main(String[] args) {
SyncPark syncPark = new SyncPark(5);
Thread t1 = new Thread(() -> {
syncPark.print("a");
});
Thread t2 = new Thread(() -> {
syncPark.print("b");
});
Thread t3 = new Thread(() -> {
syncPark.print("c\n");
});
syncPark.setThreads(t1, t2, t3);
syncPark.start();
}
}
class SyncPark {
private final int loopNumber;
private Thread[] threads;
public SyncPark(int loopNumber) {
this.loopNumber = loopNumber;
}
public void setThreads(Thread... threads) {
this.threads = threads;
}
public void print(String str) {
for (int i = 0; i < loopNumber; i++) {
LockSupport.park();
System.out.print(str);
LockSupport.unpark(nextThread());
}
}
private Thread nextThread() {
Thread current = Thread.currentThread();
int index = 0;
for (int i = 0; i < threads.length; i++) {
if (threads[i] == current) {
index = i;
break;
}
}
if (index < threads.length - 1) {
return threads[index + 1];
} else {
return threads[0];
}
}
public void start() {
for (Thread thread : threads) {
thread.start();
}
LockSupport.unpark(threads[0]);
}
}
2 异步模式
1 生产者消费者(阻塞队列)
定义
- 与前面的保护性暂停中的GuardObject不同,不需要产生结果和消费结果的线程一一对应
- 消费队列可以用来平衡生产和消费的线程资源
- 生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据
- JDK中各种阻塞队列,采用的就是这种模式
- 消息队列中的数据不会被立刻消费,而且消费者也不需要等待生产者生产数据,因此归类为异步模式
实现过程
- t1、t2、t3作为生产者不断产生数据放入消息队列,t4作为消费者不断从消息队列中取出数据
- 消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据
- 实例
有三个生产者,每个生产者生产两条消息,有一个消费者,消费者不断消费消息
@Slf4j(topic = "c.Test21")
public class Test21 {
public static void main(String[] args) {
MessageQueue queue = new MessageQueue(4);
//开启三个生产者线程,每个生产者生产两条消息
for (int i = 0; i < 3; i++) {
int t_id = i;
new Thread(() -> {
for (int m_id = 0; m_id < 2; m_id++)
queue.put(new Message(t_id, m_id, "值" + t_id + "." + m_id));
}, "生产者" + t_id).start();
}
//开启一个消费者,消费者每隔1秒钟消费一条消息
new Thread(() -> {
while (true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Message message = queue.take();
System.out.println(message);
}
}, "消费者").start();
}
}
//消息队列类, 用于线程之间通信
@Slf4j(topic = "c.MessageQueue")
class MessageQueue {
//消息的队列集合
private Deque<Message> list = new LinkedList<>();
//队列容量
private int capcity;
public MessageQueue(int capcity) {
this.capcity = capcity;
}
//获取消息
public synchronized Message take() {
//检查消息队列是否为空
while (list.isEmpty()) {
try {
log.debug("队列为空, 消费者线程等待");
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//从消息队列头部获取消息并返回
Message message = list.removeFirst();
log.debug("已消费消息 {}", message);
notifyAll();
return message;
}
//存入消息
public synchronized void put(Message message) {
//检查消息队列是否已满
while (list.size() == capcity) {
try {
log.debug("队列已满, 生产者线程等待");
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//将消息加入队列尾部
list.addLast(message);
log.debug("已生产消息 {}", message);
notifyAll();
}
}
final class Message {
private int t_id; //线程ID
private int m_id; //消息ID
private Object value;
public Message(int t_id, int m_id, Object value) {
this.t_id = t_id;
this.m_id = m_id;
this.value = value;
}
public int getT_id() {
return t_id;
}
public int getM_id() {
return m_id;
}
public Object getValue() {
return value;
}
@Override
public String toString() {
return "Message{" +
"t_id=" + t_id +
", m_id=" + m_id +
", value=" + value +
'}';
}
}
- 上述代码输出
15:26:36 [生产者0] c.MessageQueue - 已生产消息 Message{t_id=0, m_id=0, value=值0.0}
15:26:36 [生产者2] c.MessageQueue - 已生产消息 Message{t_id=2, m_id=0, value=值2.0}
15:26:36 [生产者2] c.MessageQueue - 已生产消息 Message{t_id=2, m_id=1, value=值2.1}
15:26:36 [生产者1] c.MessageQueue - 已生产消息 Message{t_id=1, m_id=0, value=值1.0}
15:26:36 [生产者1] c.MessageQueue - 队列已满, 生产者线程等待
15:26:36 [生产者0] c.MessageQueue - 队列已满, 生产者线程等待
15:26:37 [消费者] c.MessageQueue - 已消费消息 Message{t_id=0, m_id=0, value=值0.0}
Message{t_id=0, m_id=0, value=值0.0}
15:26:37 [生产者1] c.MessageQueue - 已生产消息 Message{t_id=1, m_id=1, value=值1.1}
15:26:37 [生产者0] c.MessageQueue - 队列已满, 生产者线程等待
15:26:38 [消费者] c.MessageQueue - 已消费消息 Message{t_id=2, m_id=0, value=值2.0}
Message{t_id=2, m_id=0, value=值2.0}
15:26:38 [生产者0] c.MessageQueue - 已生产消息 Message{t_id=0, m_id=1, value=值0.1}
15:26:39 [消费者] c.MessageQueue - 已消费消息 Message{t_id=2, m_id=1, value=值2.1}
Message{t_id=2, m_id=1, value=值2.1}
15:26:40 [消费者] c.MessageQueue - 已消费消息 Message{t_id=1, m_id=0, value=值1.0}
Message{t_id=1, m_id=0, value=值1.0}
15:26:41 [消费者] c.MessageQueue - 已消费消息 Message{t_id=1, m_id=1, value=值1.1}
Message{t_id=1, m_id=1, value=值1.1}
15:26:42 [消费者] c.MessageQueue - 已消费消息 Message{t_id=0, m_id=1, value=值0.1}
Message{t_id=0, m_id=1, value=值0.1}
15:26:43 [消费者] c.MessageQueue - 队列为空, 消费者线程等待
2 工作线程(线程池)
- 定义
- 让有限的工作线程(Worker Thread)来轮流异步处理无限多的任务。
- 工作线程模式的典型实现就是线程池,也体现了经典设计模式中的享元模式
- 不同任务类型应该使用不同的线程池,这样能够避免饥饿,并能提升效率
例如,如果一个餐馆的员工既要招呼客人(任务类型A),又要到后厨做菜(任务类型B)显然效率不高。将员工分成服务员(线程池A)与厨师(线程池B)更为合理
1 饥饿现象
- 饥饿原因
固定大小的单一线程池会有饥饿现象,例如
- 两个工人是同一个线程池中的两个线程
- 他们要做的事情是:为客人点餐和到后厨做菜,这是两个阶段的工作
- 客人点餐:必须先点完餐,等菜做好,上菜,在此期间处理点餐的工人必须等待
- 后厨做菜:客人点菜后开始做菜
- 比如工人A处理了点餐任务,接下来它要等着工人B把菜做好,然后上菜
- 但现在同时来了两个客人,这个时候工人A和工人B都去处理点餐了,这时没人做饭了,出现饥饿
解决方案
- 可以增加线程池的大小,不过不是根本解决方案
- 彻底解决饥饿的方法是不同的任务类型采用不同的线程池
实例
@Slf4j(topic = "c.TestPool")
public class TestPool {
static final List<String> MENU = Arrays.asList("地三鲜", "宫保鸡丁", "辣子鸡丁", "烤鸡翅");
static Random RANDOM = new Random();
static ExecutorService waiterPool;
static ExecutorService cookPool;
//随机做一道菜
static String cooking() {
return MENU.get(RANDOM.nextInt(MENU.size()));
}
public static void main(String[] args) {
waiterPool = Executors.newFixedThreadPool(1);
cookPool = Executors.newFixedThreadPool(1);
waiterPool.execute(new Order());
waiterPool.execute(new Order());
}
static class Order implements Runnable {
@Override
public void run() {
log.debug("处理点餐...");
Future<String> f = cookPool.submit(() -> {
log.debug("做菜");
return cooking();
});
try {
log.debug("上菜: {}", f.get());
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
- 如果上述代码中线程池的两个线程在同一个线程池,将导致饥饿
- 上述代码输出
16:04:14 [pool-1-thread-1] c.TestPool - 处理点餐...
16:04:14 [pool-2-thread-1] c.TestPool - 做菜
16:04:14 [pool-1-thread-1] c.TestPool - 上菜: 辣子鸡丁
16:04:14 [pool-1-thread-1] c.TestPool - 处理点餐...
16:04:14 [pool-2-thread-1] c.TestPool - 做菜
16:04:14 [pool-1-thread-1] c.TestPool - 上菜: 烤鸡翅
2 线程池大小
线程池大小可能导致的问题
- 线程池过小会导致程序不能充分地利用系统资源、容易导致饥饿
- 线程池过大会导致占用更多内存
通常根据任务的特性来确定线程池的大小
- CPU密集型的线程数量不宜太多
- I/O密集型的线程数量不宜太少
1 CPU密集型运算
- 线程池大小
线程数 = CPU核数 + 1
- 原因
这样设定能够实现最优的CPU利用率,+1是保证当线程由于页缺失故障(操作系统)或其它原因导致暂停时,额外的线程能补上去,保证CPU时钟周期不被浪费
2 I/O密集型运算
- 线程池大小
线程数 = CPU核数 * 期望CPU利用率 * (CPU计算时间 + CPU等待时间) / CPU计算时间
- 原因
CPU不总是处于繁忙状态,例如,当你执行业务计算时,这时候会使用CPU资源,但当你执行I/O操作时、远程RPC调用时,包括进行数据库操作时,这时候CPU就闲下来了,你可以利用多线程提高它的利用率。