模式
模式之两阶段终止
利用 isInterrupted
interrupt 可以打断正在执行的线程,无论这个线程是在 sleep,wait,还是正常运行
class Test {
public static void main(String[] args) {
TPTInterrupt tptInterrupt = new TPTInterrupt();
tptInterrupt.start();
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
tptInterrupt.stop();
}
}
@Slf4j(topic = "c.TPTInterrupt")
class TPTInterrupt {
private Thread thread;
public void start() {
thread = new Thread(() -> {
while (true) {
Thread current = Thread.currentThread();
if (current.isInterrupted()) {
log.info("料理后事");
break;
}
try {
Thread.sleep(1000);
log.info("将结果保存");
} catch (InterruptedException e) {
current.interrupt(); // 异常打断
}
// 执行监控操作
}
}, "监控线程");
thread.start();
}
public void stop() {
thread.interrupt();
}
}
结果
16:07:04.933 c.TPTInterrupt [监控线程] - 将结果保存
16:07:05.942 c.TPTInterrupt [监控线程] - 将结果保存
16:07:06.936 c.TPTInterrupt [监控线程] - 料理后事
利用停止标记
// 停止标记用 volatile 是为了保证该变量在多个线程之间的可见性
// 我们的例子中,即主线程把它修改为 true 对 t1 线程可见
class TPTVolatile {
private Thread thread;
private volatile boolean stop = false;
public void start(){
thread = new Thread(() -> {
while(true) {
Thread current = Thread.currentThread();
if(stop) {
log.debug("料理后事");
break;
}
try {
Thread.sleep(1000);
log.debug("将结果保存");
} catch (InterruptedException e) {
}
// 执行监控操作
}
},"监控线程");
thread.start();
}
public void stop() {
stop = true;
thread.interrupt();
}
}
调用
TPTVolatile t = new TPTVolatile();
t.start();
Thread.sleep(3500);
log.debug("stop");
t.stop();
结果
11:54:52.003 c.TPTVolatile [监控线程] - 将结果保存
11:54:53.006 c.TPTVolatile [监控线程] - 将结果保存
11:54:54.007 c.TPTVolatile [监控线程] - 将结果保存
11:54:54.502 c.TestTwoPhaseTermination [main] - stop
11:54:54.502 c.TPTVolatile [监控线程] - 料理后事
打断 park线程
打断 park 线程, 不会清空打断状态
不推荐的方法
还有一些不推荐使用的方法,这些方法已过时,容易破坏同步代码块,造成线程死锁
同步模式之保护性暂停
定义
即 Guarded Suspension,用在一个线程等待另一个线程的执行结果
要点
- 有一个结果需要从一个线程传递到另一个线程,让他们关联同一个 GuardedObject
- 如果有结果不断从一个线程到另一个线程那么可以使用消息队列(见生产者/消费者)
- JDK 中,join 的实现、Future 的实现,采用的就是此模式
- 因为要等待另一方的结果,因此归类到同步模式
实现
带超时版
@Slf4j(topic = "c.TestGuardedObjectV2")
public class TestGuardedObjectV2 {
public static void main(String[] args) {
GuardedObjectV2 v2 = new GuardedObjectV2();
new Thread(() -> {
sleep(1);
v2.complete(null);
sleep(1);
v2.complete(Arrays.asList("a", "b", "c"));
}).start();
Object response = v2.get(2500);
if (response != null) {
log.debug("get response: [{}] lines", ((List<String>) response).size());
} else {
log.debug("can't get response");
}
}
}
/**
* 添加超时处理
*/
@Slf4j(topic = "c.GuardedObjectV2")
class GuardedObjectV2 {
private Object response;
private final Object lock = new Object();
public Object get(long millis) {
synchronized (lock) {
// 1) 记录最初时间
long last = System.currentTimeMillis();
// 2) 已经经历的时间
long timePassed = 0;
while (response == null) {
// 4) 假设 millis 是 1000,结果在 400 时唤醒了,那么还有 600 要等
long waitTime = millis - timePassed;
log.debug("waitTime: {}", waitTime);
if (waitTime <= 0) {
log.debug("break...");
break;
}
try {
lock.wait(waitTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 3) 如果提前被唤醒,这时已经经历的时间假设为 400
timePassed = System.currentTimeMillis() - last;
log.debug("timePassed: {}, object is null {}", timePassed, response == null);
}
return response;
}
}
public void complete(Object response) {
synchronized (lock) {
// 条件满足,通知等待线程
this.response = response;
log.debug("notify...");
lock.notifyAll();
}
}
}
扩展
join的原理就是使用了此模式
是调用者轮询检查线程 alive 状态
t1.join();
// 等价于
synchronized (t1) {
// 调用者线程进入 t1 的 waitSet 等待, 直到 t1 运行结束
while (t1.isAlive()) {
t1.wait(0);
}
}
join源码
也就是带超时版的保护性暂停模式
public final synchronized void join(long millis)
throws InterruptedException {
long base = System.currentTimeMillis();
long now = 0;
if (millis < 0) {
throw new IllegalArgumentException("timeout value is negative");
}
if (millis == 0) {
while (isAlive()) {
wait(0);
}
} else {
while (isAlive()) {
long delay = millis - now;
if (delay <= 0) {
break;
}
wait(delay);
now = System.currentTimeMillis() - base;
}
}
}
注意
join 体现的是【保护性暂停】模式,请参考之。
多任务版 GuardedObject
图中 Futures 就好比居民楼一层的信箱(每个信箱有房间编号),左侧的 t0,t2,t4 就好比等待邮件的居民,右侧的 t1,t3,t5 就好比邮递员。
如果需要在多个类之间使用 GuardedObject 对象,作为参数传递不是很方便,因此设计一个用来解耦的中间类,这样不仅能够解耦【结果等待者】和【结果生产者】,还能够同时支持多个任务的管理。
也就是送信例子:
新增 id 用来标识 Guarded Object
@Slf4j(topic = "c.TestGuardedObjectV3")
public class TestGuardedObjectV3 {
public static void main(String[] args) {
for (int i = 0; i < 3; i++) {
GuardedObjectV3 v3 = Fetures.createFeture();
new Thread(() -> {
log.debug("waiting id({})...", v3.getId());
log.debug("get response id({}): [{}] lines", v3.getId(), ((List<String>) v3.get()).size());
}).start();
new Thread(() -> {
try {
List<String> lines = download();
log.debug("download complete id({})...", v3.getId());
Fetures.complete(v3.getId(), lines);
} catch (IOException e) {
e.printStackTrace();
}
}).start();
}
}
}
class Fetures {
private static final ConcurrentHashMap<Integer, GuardedObjectV3> FETURES = new ConcurrentHashMap<>();
private static final AtomicInteger ID_GENERATOR = new AtomicInteger();
public static GuardedObjectV3 createFeture() {
// 为每个 GuardedObject 分配一个 id
int id = ID_GENERATOR.incrementAndGet();
GuardedObjectV3 v3 = new GuardedObjectV3(id);
// 放入公共位置,将来异步响应返回时,根据编号获取
FETURES.put(id, v3);
return v3;
}
public static void complete(int id, Object response) {
// 异步响应完成,根据编号获取并移除
GuardedObjectV3 v3 = FETURES.remove(id);
if (v3 != null) {
v3.complete(response);
}
}
}
/**
* 添加多任务处理
*/
class GuardedObjectV3 {
private int id;
private Object response;
private final Object lock = new Object();
public GuardedObjectV3(int id) {
this.id = id;
}
public int getId() {
return id;
}
public Object get() {
synchronized (lock) {
while (response == null) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return response;
}
}
public void complete(Object response) {
synchronized (lock) {
this.response = response;
lock.notifyAll();
}
}
}
中间解耦类
class Mailboxes {
private static Map<Integer, GuardedObject> boxes = new Hashtable<>();
private static int id = 1;
// 产生唯一 id
private static synchronized int generateId() {
return id++;
}
public static GuardedObject getGuardedObject(int id) {
return boxes.remove(id);
}
public static GuardedObject createGuardedObject() {
GuardedObject go = new GuardedObject(generateId());
boxes.put(go.getId(), go);
return go;
}
public static Set<Integer> getIds() {
return boxes.keySet();
}
}
业务相关类
class People extends Thread{
@Override
public void run() {
// 收信
GuardedObject guardedObject = Mailboxes.createGuardedObject();
log.debug("开始收信 id:{}", guardedObject.getId());
Object mail = guardedObject.get(5000);
log.debug("收到信 id:{}, 内容:{}", guardedObject.getId(), mail);
}
}
class Postman extends Thread {
private int id;
private String mail;
public Postman(int id, String mail) {
this.id = id;
this.mail = mail;
}
@Override
public void run() {
GuardedObject guardedObject = Mailboxes.getGuardedObject(id);
log.debug("送信 id:{}, 内容:{}", id, mail);
guardedObject.complete(mail);
}
}
测试
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 3; i++) {
new People().start();
}
Sleeper.sleep(1);
for (Integer id : Mailboxes.getIds()) {
new Postman(id, "内容" + id).start();
}
}
运行结果
10:35:05.689 c.People [Thread-1] - 开始收信 id:3
10:35:05.689 c.People [Thread-2] - 开始收信 id:1
10:35:05.689 c.People [Thread-0] - 开始收信 id:2
10:35:06.688 c.Postman [Thread-4] - 送信 id:2, 内容:内容2
10:35:06.688 c.Postman [Thread-5] - 送信 id:1, 内容:内容1
10:35:06.688 c.People [Thread-0] - 收到信 id:2, 内容:内容2
10:35:06.688 c.People [Thread-2] - 收到信 id:1, 内容:内容1
10:35:06.688 c.Postman [Thread-3] - 送信 id:3, 内容:内容3
10:35:06.689 c.People [Thread-1] - 收到信 id:3, 内容:内容3
异步模式之生产者/消费者
定义
要点
- 与前面的保护性暂停中的 GuardObject 不同,不需要产生结果和消费结果的线程一一对应
- 消费队列可以用来平衡生产和消费的线程资源
- 生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据
- 消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据
- JDK 中各种阻塞队列,采用的就是这种模式
实现
// 生产者消费者模式
@Slf4j(topic = "c.Test7")
public class Test7 {
public static void main(String[] args) {
messageQueue messageQueue = new messageQueue(2);
for (int i = 0; i < 3; i++) {
int id = i;
new Thread(() -> {
messageQueue.put(new Message(id, "value=" + id));
}, "生产者" + id).start();
}
new Thread(() -> {
while (true) {
try {
Thread.sleep(1000);
Message take = messageQueue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "消费者").start();
}
}
@Slf4j(topic = "c.messageQueue")
class messageQueue {
private LinkedList<Message> queue = new LinkedList<>(); // 消息队列
private int capcity; // 消息容量
public messageQueue(int capcity) {
this.capcity = capcity;
}
// 消费者
public Message take() {
synchronized (queue) {
// 没有消息则等待着
while (queue.isEmpty()) {
try {
log.info("消息队列为空,消费者开始等待");
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 从消息头部获取消息返回
Message message = queue.removeFirst();
log.info("已经消费消息{}", message);
// 同时唤醒生产者线程,消息没满
queue.notifyAll();
return message;
}
}
// 生产者
public void put(Message message) {
synchronized (queue) {
// 容量满了就等待
while (capcity == queue.size()) {
try {
log.info("消息队列已满,生产者开始等待");
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 生成消息
queue.add(message);
log.info("生产消息{}", message.getMessage());
// 唤醒消费者线程
queue.notifyAll();
}
}
}
// 消息类
final class Message {
private int id;
private Object message;
public Message(int id, Object message) {
this.id = id;
this.message = message;
}
public int getId() {
return id;
}
public Object getMessage() {
return message;
}
@Override
public String toString() {
return "Message{" +
"id=" + id +
", message=" + message +
'}';
}
}
运行结果:
15:03:07.744 c.messageQueue [生产者0] - 生产消息value=0
15:03:07.744 c.messageQueue [生产者1] - 消息队列已满,生产者开始等待
15:03:08.744 c.messageQueue [消费者] - 已经消费消息Message{id=2, message=value=2}
15:03:08.744 c.messageQueue [生产者1] - 生产消息value=1
15:03:09.756 c.messageQueue [消费者] - 已经消费消息Message{id=0, message=value=0}
15:03:10.756 c.messageQueue [消费者] - 已经消费消息Message{id=1, message=value=1}
15:03:11.757 c.messageQueue [消费者] - 消息队列为空,消费者开始等待
同步模式之顺序控制
顺序输出
比如,必须先 2 后 1 打印
wait notify版
@Slf4j(topic = "c.Test25")
public class Test25 {
static final Object lock = new Object();
// 表示 t2 是否运行过
static boolean t2runned = false;
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
synchronized (lock) {
while (!t2runned) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("1");
}
}, "t1");
Thread t2 = new Thread(() -> {
synchronized (lock) {
log.debug("2");
t2runned = true;
lock.notify();
}
}, "t2");
t1.start();
t2.start();
}
}
ReentrantLock版本
@Slf4j(topic = "c.Test43")
class Test43 {
static final ReentrantLock lock = new ReentrantLock();
// 表示 t2 是否运行过
static boolean t2runned = false;
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
try {
if (lock.tryLock(2, TimeUnit.SECONDS)) {
log.debug("1");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t1");
Thread t2 = new Thread(() -> {
try {
lock.lock();
log.debug("2");
t2runned = true;
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
lock.unlock();
} finally {
}
}, "t2");
t1.start();
t2.start();
}
}
Park&Unpark版本
可以看到,实现上很麻烦:
- 首先,需要保证先 wait 再 notify,否则 wait 线程永远得不到唤醒。因此使用了『运行标记』来判断该不该wait
- 第二,如果有些干扰线程错误地 notify 了 wait 线程,条件不满足时还要重新等待,使用了 while 循环来解决此问题
- 最后,唤醒对象上的 wait 线程需要使用 notifyAll,因为『同步对象』上的等待线程可能不止一个
可以使用 LockSupport 类的 park 和 unpark 来简化上面的题目:
@Slf4j(topic = "c.Test26")
public class Test26 {
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
LockSupport.park();
log.debug("1");
}, "t1");
t1.start();
new Thread(() -> {
log.debug("2");
LockSupport.unpark(t1);
},"t2").start();
}
}
park 和 unpark 方法比较灵活,他俩谁先调用,谁后调用无所谓。并且是以线程为单位进行『暂停』和『恢复』,不需要『同步对象』和『运行标记』。
三个版本打印结果:
15:44:49.473 c.Test25 [t2] - 2
15:44:49.475 c.Test25 [t1] - 1
交替输出
线程 1 输出 a 5 次,线程 2 输出 b 5 次,线程 3 输出 c 5 次。现在要求输出 abcabcabcabcabc 怎么实现
wait notify版本
@Slf4j(topic = "c.Test27")
public class Test27 {
public static void main(String[] args) {
WaitNotify wn = new WaitNotify(1, 5);
new Thread(() -> {
wn.print("a", 1, 2);
}).start();
new Thread(() -> {
wn.print("b", 2, 3);
}).start();
new Thread(() -> {
wn.print("c", 3, 1);
}).start();
}
}
/*
输出内容 等待标记 下一个标记
a 1 2
b 2 3
c 3 1
*/
class WaitNotify {
// 打印 a 1 2
public void print(String str, int waitFlag, int nextFlag) {
for (int i = 0; i < loopNumber; i++) {
synchronized (this) {
while (flag != waitFlag) { // 此时不相同的2个线程都在等待
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("开始打印" + str);
flag = nextFlag; // 等待标记修改为下一个标记
this.notifyAll(); // 此时唤醒其他等待线程
}
}
}
// 等待标记
private int flag; // 2
// 循环次数
private int loopNumber;
public WaitNotify(int flag, int loopNumber) {
this.flag = flag;
this.loopNumber = loopNumber;
}
}
该实现没有考虑 a,b,c 线程都就绪再开始。
Park&Unpark版
@Slf4j(topic = "c.Test31")
public class Test31 {
static Thread t1;
static Thread t2;
static Thread t3;
public static void main(String[] args) {
ParkUnpark pu = new ParkUnpark(5);
t1 = new Thread(() -> {
pu.print("a", t2);
});
t2 = new Thread(() -> {
pu.print("b", t3);
});
t3 = new Thread(() -> {
pu.print("c", t1);
});
t1.start();
t2.start();
t3.start();
LockSupport.unpark(t1); // 唤醒第一个线程
}
}
class ParkUnpark {
public void print(String str, Thread next) {
for (int i = 0; i < loopNumber; i++) {
LockSupport.park(); // 暂停当前线程
System.out.println("打印结果:" + str);
LockSupport.unpark(next); // 唤醒下一个线程
}
}
private int loopNumber;
public ParkUnpark(int loopNumber) {
this.loopNumber = loopNumber;
}
}
await版
public class Test30 {
public static void main(String[] args) throws InterruptedException {
AwaitSignal awaitSignal = new AwaitSignal(5);
Condition a = awaitSignal.newCondition();
Condition b = awaitSignal.newCondition();
Condition c = awaitSignal.newCondition();
new Thread(() -> {
awaitSignal.print("a", a, b);
}).start();
new Thread(() -> {
awaitSignal.print("b", b, c);
}).start();
new Thread(() -> {
awaitSignal.print("c", c, a);
}).start();
awaitSignal.lock();
try {
Thread.sleep(1000);
a.signal(); // 唤醒其中一个线程
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
awaitSignal.unlock(); // 最后需要释放锁
}
}
}
class AwaitSignal extends ReentrantLock {
private int loopNumber;
public AwaitSignal(int loopNumber) {
this.loopNumber = loopNumber;
}
// 参数1 打印内容, 参数2 进入哪一间休息室, 参数3 下一间休息室
public void print(String str, Condition current, Condition next) {
for (int i = 0; i < loopNumber; i++) {
lock();
try {
current.await(); // 先去对应的休息室休息
System.out.println("打印结果:" + str);
next.signal(); // 唤醒下一个休息室
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
unlock();
}
}
}
}
同步模式之Balking
Balking (犹豫)模式用在一个线程发现另一个线程或本线程已经做了某一件相同的事,那么本线程就无需再做了,直接结束返回。
模式代码:
public class MonitorService {
// 用来表示是否已经有线程已经在执行启动了
private volatile boolean starting;
public void start() {
log.info("尝试启动监控线程...");
synchronized (this) {
if (starting) {
return;
}
starting = true;
}
// 真正启动监控线程...
}
}
两阶段中止模式的升级:
class Test8 {
public static void main(String[] args) {
TPTInterrupt1 tptInterrupt = new TPTInterrupt1();
tptInterrupt.start();
tptInterrupt.start();
tptInterrupt.start();
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
tptInterrupt.stop();
}
}
@Slf4j(topic = "c.TPTInterrupt1")
class TPTInterrupt1 {
private Thread thread;
// 停止标记
private volatile boolean stop = false;
// 防止多个线程开启,false表示多个开启,此时不能再开启
private boolean starting = false;
public void start() {
synchronized (this) {
if (starting) { // false
return;
}
// 注意这里保证starting设置为true,所以保证这两个操作是原子性的,使用使用synchronized而不是volatile
starting = true;
}
thread = new Thread(() -> {
while (true) {
Thread current = Thread.currentThread();
if (stop) {
log.info("料理后事");
break;
}
try {
Thread.sleep(1000);
log.info("执行监控记录");
} catch (InterruptedException e) {
current.interrupt(); // 异常打断
}
// 执行监控操作
}
}, "监控线程");
thread.start();
}
public void stop() {
// thread.interrupt();
stop = true;
}
}
结果:
14:56:59.551 c.TPTInterrupt1 [监控线程] - 执行监控记录
14:57:00.552 c.TPTInterrupt1 [监控线程] - 执行监控记录
14:57:01.561 c.TPTInterrupt1 [监控线程] - 执行监控记录
14:57:01.561 c.TPTInterrupt1 [监控线程] - 料理后事
它还经常用来实现线程安全的单例:
public final class Singleton {
private Singleton() {
}
private static Singleton INSTANCE = null;
public static synchronized Singleton getInstance() {
if (INSTANCE != null) {
return INSTANCE;
}
INSTANCE = new Singleton();
return INSTANCE;
}
}
对比一下保护性暂停模式:保护性暂停模式用在一个线程等待另一个线程的执行结果,当条件不满足时线程等待。
享元模式
简介
定义: 英文名称:Flyweight pattern. 当需要重用数量有限的同一类对象时
wikipedia: A flflyweight is an object that minimizes memory usage by sharing as much data as possible with other similar objects
出自 “Gang of Four” design patterns
归类 Structual patterns
体现
包装类
在JDK中 Boolean,Byte,Short,Integer,Long,Character 等包装类提供了 valueOf 方法,例如 Long 的valueOf 会缓存 -128~127 之间的 Long 对象,在这个范围之间会重用对象,大于这个范围,才会新建 Long 对象。
public static Long valueOf(long l) {
final int offset = 128;
if (l >= -128 && l <= 127) { // will cache
return LongCache.cache[(int)l + offset];
}
return new Long(l);
}
注意:
- Byte, Short, Long 缓存的范围都是 -128~127
- Character 缓存的范围是 0~127
- Integer的默认范围是 -128~127
- 最小值不能变
- 但最大值可以通过调整虚拟机参数 `
- -Djava.lang.Integer.IntegerCache.high` 来改变
Boolean 缓存了 TRUE 和 FALSE
String串池
String线程池在JVM中有讲述。
BigDecimal BigInteger
这2个类的本身操作是线程安全的,但是组合起来就不一定,AtomicReference原子引用中有讲述例子。
DIY
例如:一个线上商城应用,QPS 达到数千,如果每次都重新创建和关闭数据库连接,性能会受到极大影响。 这时预先创建好一批连接,放入连接池。一次请求到达后,从连接池获取连接,使用完毕后再还回连接池,这样既节约了连接的创建和关闭时间,也实现了连接的重用,不至于让庞大的连接数压垮数据库。
public class Test3 {
public static void main(String[] args) {
Pool pool = new Pool(2);
for (int i = 0; i < 5; i++) {
new Thread(() -> {
Connection conn = pool.borrow();
try {
Thread.sleep(new Random().nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
pool.free(conn);
}).start();
}
}
}
@Slf4j(topic = "c.Pool")
class Pool {
// 1. 连接池大小
private final int poolSize;
// 2. 连接对象数组
private Connection[] connections;
// 3. 连接状态数组 0 表示空闲, 1 表示繁忙
private AtomicIntegerArray states;
// 4. 构造方法初始化
public Pool(int poolSize) {
this.poolSize = poolSize;
this.connections = new Connection[poolSize];
this.states = new AtomicIntegerArray(new int[poolSize]);
for (int i = 0; i < poolSize; i++) {
connections[i] = new MockConnection("连接" + (i+1));
}
}
// 5. 借连接
public Connection borrow() {
while(true) {
for (int i = 0; i < poolSize; i++) {
// 获取空闲连接
if(states.get(i) == 0) {
if (states.compareAndSet(i, 0, 1)) {
log.debug("borrow {}", connections[i]);
return connections[i];
}
}
}
// 如果没有空闲连接,当前线程进入等待
synchronized (this) {
try {
log.debug("wait...");
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
// 6. 归还连接
public void free(Connection conn) {
for (int i = 0; i < poolSize; i++) {
if (connections[i] == conn) {
states.set(i, 0);
synchronized (this) {
log.debug("free {}", conn);
this.notifyAll();
}
break;
}
}
}
}
class MockConnection implements Connection {
private String name;
public MockConnection(String name) {
this.name = name;
}
// 实现方法略
}
以上实现没有考虑:
- 连接的动态增长与收缩
- 连接保活(可用性检测)
- 等待超时处理
- 分布式 hash
对于关系型数据库,有比较成熟的连接池实现,例如c3p0, druid等 对于更通用的对象池,可以考虑使用apachecommons pool,例如redis连接池可以参考jedis中关于连接池的实现
异步模式之工作线程
定义
让有限的工作线程(Worker Thread)来轮流异步处理无限多的任务。也可以将其归类为分工模式,它的典型实现就是线程池,也体现了经典设计模式中的享元模式。
例如,海底捞的服务员(线程),轮流处理每位客人的点餐(任务),如果为每位客人都配一名专属的服务员,那么成本就太高了(对比另一种多线程设计模式:Thread-Per-Message)
注意,不同任务类型应该使用不同的线程池,这样能够避免饥饿,并能提升效率
例如,如果一个餐馆的工人既要招呼客人(任务类型A),又要到后厨做菜(任务类型B)显然效率不咋地,分成服务员(线程池A)与厨师(线程池B)更为合理,当然你能想到更细致的分工。
饥饿
固定大小线程池会有饥饿现象
- 两个工人是同一个线程池中的两个线程
- 他们要做的事情是:为客人点餐和到后厨做菜,这是两个阶段的工作
- 客人点餐:必须先点完餐,等菜做好,上菜,在此期间处理点餐的工人必须等待
- 后厨做菜:没啥说的,做就是了
- 比如工人A 处理了点餐任务,接下来它要等着 工人B 把菜做好,然后上菜,他俩也配合的蛮好
- 但现在同时来了两个客人,这个时候工人A 和工人B 都去处理点餐了,这时没人做饭了,饥饿
也就是线程都去处理A类型的任务,而B类型需要前面A类型的返回值,然而没有线程去执行B任务,所以造成饥饿的现象。
public class TestDeadLock {
static final List<String> MENU = Arrays.asList("地三鲜", "宫保鸡丁", "辣子鸡丁", "烤鸡翅");
static Random RANDOM = new Random();
static String cooking() {
return MENU.get(RANDOM.nextInt(MENU.size()));
}
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.execute(() -> {
log.debug("处理点餐...");
Future<String> f = executorService.submit(() -> {
log.debug("做菜");
return cooking();
});
try {
log.debug("上菜: {}", f.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
/*executorService.execute(() -> {
log.debug("处理点餐...");
Future<String> f = executorService.submit(() -> {
log.debug("做菜");
return cooking();
});
try {
log.debug("上菜: {}", f.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});*/
}
}
输出:
17:21:27.883 c.TestDeadLock [pool-1-thread-1] - 处理点餐...
17:21:27.891 c.TestDeadLock [pool-1-thread-2] - 做菜
17:21:27.891 c.TestDeadLock [pool-1-thread-1] - 上菜: 烤鸡翅
当注释取消后,可能的输出:
17:08:41.339 c.TestDeadLock [pool-1-thread-2] - 处理点餐...
17:08:41.339 c.TestDeadLock [pool-1-thread-1] - 处理点餐...
解决方法可以增加线程池的大小,不过不是根本解决方案,还是前面提到的,不同的任务类型,采用不同的线程池,例如:
public class TestDeadLock {
static final List<String> MENU = Arrays.asList("地三鲜", "宫保鸡丁", "辣子鸡丁", "烤鸡翅");
static Random RANDOM = new Random();
static String cooking() {
return MENU.get(RANDOM.nextInt(MENU.size()));
}
public static void main(String[] args) {
ExecutorService waiterPool = Executors.newFixedThreadPool(1);
ExecutorService cookPool = Executors.newFixedThreadPool(1);
waiterPool.execute(() -> {
log.debug("处理点餐...");
Future<String> f = cookPool.submit(() -> {
log.debug("做菜");
return cooking();
});
try {
log.debug("上菜: {}", f.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
waiterPool.execute(() -> {
log.debug("处理点餐...");
Future<String> f = cookPool.submit(() -> {
log.debug("做菜");
return cooking();
});
try {
log.debug("上菜: {}", f.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
}
}
输出:
17:25:14.626 c.TestDeadLock [pool-1-thread-1] - 处理点餐...
17:25:14.630 c.TestDeadLock [pool-2-thread-1] - 做菜
17:25:14.631 c.TestDeadLock [pool-1-thread-1] - 上菜: 地三鲜
17:25:14.632 c.TestDeadLock [pool-1-thread-1] - 处理点餐...
17:25:14.632 c.TestDeadLock [pool-2-thread-1] - 做菜
17:25:14.632 c.TestDeadLock [pool-1-thread-1] - 上菜: 辣子鸡丁
创建多少线程池合适
- 过小会导致程序不能充分地利用系统资源、容易导致饥饿
- 过大会导致更多的线程上下文切换,占用更多内存