模式之保护性暂停
GuardedObject

class GuardedObject {private Object response;private final Object lock = new Object();public Object get() {synchronized (lock) {// 条件不满足则等待while (response == null) {try {lock.wait();} catch (InterruptedException e) {e.printStackTrace();}}return response;}}public void complete(Object response) {synchronized (lock) {// 条件满足,通知等待线程this.response = response;lock.notifyAll();}}}
带超时版 GuardedObject
class GuardedObjectV2 {private Object response;private final Object lock = new Object();public Object get(long millis) {synchronized (lock) {// 1) 记录最初时间long begin = System.currentTimeMillis();// 2) 已经经历的时间long timePassed = 0;while (response == null) {// 4) 假设 millis 是 1000,结果在 400 时唤醒了,那么还有 600 要等long waitTime = millis - timePassed;log.debug("waitTime: {}", waitTime);if (waitTime <= 0) {log.debug("break...");break;}try {lock.wait(waitTime);} catch (InterruptedException e) {e.printStackTrace();}// 3) 如果提前被唤醒,这时已经经历的时间假设为 400timePassed = System.currentTimeMillis() - begin;log.debug("timePassed: {}, object is null {}",timePassed, response == null);}return response;}}public void complete(Object response) {synchronized (lock) {// 条件满足,通知等待线程this.response = response;log.debug("notify...");lock.notifyAll();}}}
多任务版 GuardedObject
新增 id 用来标识 Guarded Object
class GuardedObject {// 标识 Guarded Objectprivate int id;public GuardedObject(int id) {this.id = id;}public int getId() {return id;}// 结果private Object response;// 获取结果// timeout 表示要等待多久 2000public Object get(long timeout) {synchronized (this) {// 开始时间 15:00:00long begin = System.currentTimeMillis();// 经历的时间long passedTime = 0;while (response == null) {// 这一轮循环应该等待的时间long waitTime = timeout - passedTime;// 经历的时间超过了最大等待时间时,退出循环if (timeout - passedTime <= 0) {break;}try {this.wait(waitTime); // 虚假唤醒 15:00:01} catch (InterruptedException e) {e.printStackTrace();}// 求得经历时间passedTime = System.currentTimeMillis() - begin; // 15:00:02 1s}return response;}}// 产生结果public void complete(Object response) {synchronized (this) {// 给结果成员变量赋值this.response = response;this.notifyAll();}}}
中间解耦类
class Mailboxes {private static Map<Integer, GuardedObject> boxes = new Hashtable<>();private static int id = 1;// 产生唯一 idprivate static synchronized int generateId() {return id++;}public static GuardedObject getGuardedObject(int id) {return boxes.remove(id);}public static GuardedObject createGuardedObject() {GuardedObject go = new GuardedObject(generateId());boxes.put(go.getId(), go);return go;}public static Set<Integer> getIds() {return boxes.keySet();}}
业务相关类
class People extends Thread{@Overridepublic void run() {// 收信GuardedObject guardedObject = Mailboxes.createGuardedObject();log.debug("开始收信 id:{}", guardedObject.getId());Object mail = guardedObject.get(5000);log.debug("收到信 id:{}, 内容:{}", guardedObject.getId(), mail);}}class Postman extends Thread {private int id;private String mail;public Postman(int id, String mail) {this.id = id;this.mail = mail;}@Overridepublic void run() {GuardedObject guardedObject = Mailboxes.getGuardedObject(id);log.debug("送信 id:{}, 内容:{}", id, mail);guardedObject.complete(mail);}}
模式之生产者消费者
- 定义
要点
- 与前面的保护性暂停中的 GuardObject 不同,不需要产生结果和消费结果的线程一一对应
- 消费队列可以用来平衡生产和消费的线程资源
- 生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据
- 消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据
- JDK 中各种阻塞队列,采用的就是这种模式

实现
class Message {private int id;private Object message;public Message(int id, Object message) {this.id = id;this.message = message;}public int getId() {return id;}public Object getMessage() {return message;}}class MessageQueue {private LinkedList<Message> queue;private int capacity;public MessageQueue(int capacity) {this.capacity = capacity;queue = new LinkedList<>();}public Message take() {synchronized (queue) {while (queue.isEmpty()) {log.debug("没货了, wait");try {queue.wait();} catch (InterruptedException e) {e.printStackTrace();}}Message message = queue.removeFirst();queue.notifyAll();return message;}}public void put(Message message) {synchronized (queue) {while (queue.size() == capacity) {log.debug("库存已达上限, wait");try {queue.wait();} catch (InterruptedException e) {e.printStackTrace();}queue.addLast(message);queue.notifyAll();}}}
应用
MessageQueue messageQueue = new MessageQueue(2);// 4 个生产者线程, 下载任务for (int i = 0; i < 4; i++) {int id = i;new Thread(() -> {try {log.debug("download...");List<String> response = Downloader.download();log.debug("try put message({})", id);messageQueue.put(new Message(id, response));} catch (IOException e) {e.printStackTrace();}}, "生产者" + i).start();}// 1 个消费者线程, 处理结果new Thread(() -> {while (true) {Message message = messageQueue.take();List<String> response = (List<String>) message.getMessage();log.debug("take message({}): [{}] lines", message.getId(), response.size());}}, "消费者").start();
同步模式之顺序控制
固定运行顺序
wait notify 版
// 用来同步的对象static Object obj = new Object();// t2 运行标记, 代表 t2 是否执行过static boolean t2runed = false;public static void main(String[] args) {Thread t1 = new Thread(() -> {synchronized (obj) {// 如果 t2 没有执行过while (!t2runed) {try {// t1 先等一会obj.wait();} catch (InterruptedException e) {e.printStackTrace();}}}System.out.println(1);});Thread t2 = new Thread(() -> {System.out.println(2);synchronized (obj) {// 修改运行标记t2runed = true;// 通知 obj 上等待的线程(可能有多个,因此需要用 notifyAll)obj.notifyAll();}});t1.start();t2.start();}
Park Unpark 版
可以看到,实现上很麻烦:
- 首先,需要保证先 wait 再 notify,否则 wait 线程永远得不到唤醒。因此使用了『运行标记』来判断该不该wait
- 第二,如果有些干扰线程错误地 notify 了 wait 线程,条件不满足时还要重新等待,使用了 while 循环来解决此问题
- 最后,唤醒对象上的 wait 线程需要使用 notifyAll,因为『同步对象』上的等待线程可能不止一个可以使用 LockSupport 类的 park 和 unpark 来简化上面的题目:
park 和 unpark 方法比较灵活,他俩谁先调用,谁后调用无所谓。并且是以线程为单位进行『暂停』和『恢复』, 不需要『同步对象』和『运行标记』Thread t1 = new Thread(() -> {try { Thread.sleep(1000); } catch (InterruptedException e) { }// 当没有『许可』时,当前线程暂停运行;有『许可』时,用掉这个『许可』,当前线程恢复运行LockSupport.park();System.out.println("1");});Thread t2 = new Thread(() -> {System.out.println("2");// 给线程 t1 发放『许可』(多次连续调用 unpark 只会发放一个『许可』)LockSupport.unpark(t1);});t1.start();t2.start();
交替输出
线程 1 输出 a 5 次,线程 2 输出 b 5 次,线程 3 输出 c 5 次。现在要求输出 abcabcabcabcabc 怎么实现wait notify 版
```java class SyncWaitNotify { private int flag; private int loopNumber; public SyncWaitNotify(int flag, int loopNumber) {
} public void print(int waitFlag, int nextFlag, String str) {//高复用,抽象this.flag = flag;this.loopNumber = loopNumber;
} }for (int i = 0; i < loopNumber; i++) {synchronized (this) {while (this.flag != waitFlag) {try {this.wait();} catch (InterruptedException e) {e.printStackTrace();}}System.out.print(str);flag = nextFlag;this.notifyAll();}}
SyncWaitNotify syncWaitNotify = new SyncWaitNotify(1, 5); new Thread(() -> { syncWaitNotify.print(1, 2, “a”); }).start(); new Thread(() -> { syncWaitNotify.print(2, 3, “b”); }).start(); new Thread(() -> { syncWaitNotify.print(3, 1, “c”); }).start();
<a name="oX6k8"></a>### Lock 条件变量版```javaclass AwaitSignal extends ReentrantLock {public void start(Condition first) {this.lock();try {log.debug("start");first.signal();} finally {this.unlock();}}public void print(String str, Condition current, Condition next) {for (int i = 0; i < loopNumber; i++) {this.lock();try {current.await();log.debug(str);next.signal();} catch (InterruptedException e) {e.printStackTrace();} finally {this.unlock();}}}// 循环次数private int loopNumber;public AwaitSignal(int loopNumber) {this.loopNumber = loopNumber;}}AwaitSignal as = new AwaitSignal(5);Condition aWaitSet = as.newCondition();Condition bWaitSet = as.newCondition();Condition cWaitSet = as.newCondition();new Thread(() -> {as.print("a", aWaitSet, bWaitSet);}).start();new Thread(() -> {as.print("b", bWaitSet, cWaitSet);}).start();new Thread(() -> {as.print("c", cWaitSet, aWaitSet);}).start();as.start(aWaitSet);
Park Unpark 版
class SyncPark {private int loopNumber;private Thread[] threads;public SyncPark(int loopNumber) {this.loopNumber = loopNumber;}public void setThreads(Thread... threads) {this.threads = threads;}public void print(String str) {for (int i = 0; i < loopNumber; i++) {LockSupport.park();System.out.print(str);LockSupport.unpark(nextThread());}}private Thread nextThread() {Thread current = Thread.currentThread();int index = 0;for (int i = 0; i < threads.length; i++) {if(threads[i] == current) {index = i;break;}}if(index < threads.length - 1) {return threads[index+1];} else {return threads[0];}}public void start() {for (Thread thread : threads) {thread.start();}LockSupport.unpark(threads[0]);}}SyncPark syncPark = new SyncPark(5);Thread t1 = new Thread(() -> {syncPark.print("a");});Thread t2 = new Thread(() -> {syncPark.print("b");});Thread t3 = new Thread(() -> {syncPark.print("c\n");});syncPark.setThreads(t1, t2, t3);syncPark.start();
两阶段终止模式
利用 isInterrupted
class TPTInterrupt {private Thread thread;public void start(){thread = new Thread(() -> {while(true) {Thread current = Thread.currentThread();if(current.isInterrupted()) {log.debug("料理后事");break;}try {Thread.sleep(1000);log.debug("将结果保存");} catch (InterruptedException e) {current.interrupt();}// 执行监控操作}},"监控线程");thread.start();}public void stop() {thread.interrupt();}}
利用停止标记
// 停止标记用 volatile 是为了保证该变量在多个线程之间的可见性// 我们的例子中,即主线程把它修改为 true 对 t1 线程可见class TPTVolatile {private Thread thread;private volatile boolean stop = false;public void start(){thread = new Thread(() -> {while(true) {Thread current = Thread.currentThread();if(stop) {log.debug("料理后事");break;}try {Thread.sleep(1000);log.debug("将结果保存");} catch (InterruptedException e) {}// 执行监控操作}},"监控线程");thread.start();}public void stop() {stop = true;thread.interrupt();}}
同步模式之 Balking
- 定义
Balking (犹豫)模式用在一个线程发现另一个线程或本线程已经做了某一件相同的事,那么本线程就无需再做了,直接结束返回 实现
例如: ```java public class MonitorService { // 用来表示是否已经有线程已经在执行启动了 private volatile boolean starting; public void start() { log.info(“尝试启动监控线程…”); synchronized (this) {if (starting) {return;}starting = true;
}
// 真正启动监控线程… } }
它还经常用来实现线程安全的单例```javapublic final class Singleton {private Singleton() {}private static Singleton INSTANCE = null;public static synchronized Singleton getInstance() {if (INSTANCE != null) {return INSTANCE;}INSTANCE = new Singleton();return INSTANCE;}}
对比一下保护性暂停模式:保护性暂停模式用在一个线程等待另一个线程的执行结果,当条件不满足时线程等待。
享元模式
简介
定义
英文名称:Flyweight pattern. 当需要重用数量有限的同一类对象时
wikipedia: A flyweight is an object that minimizes memory usage by sharing as much data as possible with other similar objects
出自
“Gang of Four” design patterns
归类
Structual patterns
体现
包装类
在JDK中 Boolean,Byte,Short,Integer,Long,Character 等包装类提供了 valueOf 方法,例如 Long 的
valueOf 会缓存 -128~127 之间的 Long 对象,在这个范围之间会重用对象,大于这个范围,才会新建 Long 对
象:
public static Long valueOf(long l) {final int offset = 128;if (l >= -128 && l <= 127) { // will cachereturn LongCache.cache[(int)l + offset];}return new Long(l);}
注意:
- Byte, Short, Long 缓存的范围都是 -128~127
- Character 缓存的范围是 0~127
- Integer的默认范围是 -128~127
- 最小值不能变
- 但最大值可以通过调整虚拟机参数 `
-Djava.lang.Integer.IntegerCache.high` 来改变
- Boolean 缓存了 TRUE 和 FALSE
String 串池
BigDecimal BigInteger
DIY
例如:一个线上商城应用,QPS 达到数千,如果每次都重新创建和关闭数据库连接,性能会受到极大影响。 这时 预先创建好一批连接,放入连接池。一次请求到达后,从连接池获取连接,使用完毕后再还回连接池,这样既节约了连接的创建和关闭时间,也实现了连接的重用,不至于让庞大的连接数压垮数据库。
class Pool {// 1. 连接池大小private final int poolSize;// 2. 连接对象数组private Connection[] connections;// 3. 连接状态数组 0 表示空闲, 1 表示繁忙private AtomicIntegerArray states;// 4. 构造方法初始化public Pool(int poolSize) {this.poolSize = poolSize;this.connections = new Connection[poolSize];this.states = new AtomicIntegerArray(new int[poolSize]);for (int i = 0; i < poolSize; i++) {connections[i] = new MockConnection("连接" + (i+1));}}// 5. 借连接public Connection borrow() {while(true) {for (int i = 0; i < poolSize; i++) {// 获取空闲连接if(states.get(i) == 0) {if (states.compareAndSet(i, 0, 1)) {log.debug("borrow {}", connections[i]);return connections[i];}}}// 如果没有空闲连接,当前线程进入等待synchronized (this) {try {log.debug("wait...");this.wait();} catch (InterruptedException e) {e.printStackTrace();}}}}// 6. 归还连接public void free(Connection conn) {for (int i = 0; i < poolSize; i++) {if (connections[i] == conn) {states.set(i, 0);synchronized (this) {log.debug("free {}", conn);this.notifyAll();}break;}}}}class MockConnection implements Connection {// 实现略}
使用连接池:
Pool pool = new Pool(2);for (int i = 0; i < 5; i++) {new Thread(() -> {Connection conn = pool.borrow();try {Thread.sleep(new Random().nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();}pool.free(conn);}).start();}
以上实现没有考虑:
- 连接的动态增长与收缩
- 连接保活(可用性检测)
- 等待超时处理
- 分布式 hash
对于关系型数据库,有比较成熟的连接池实现,例如c3p0, druid等 对于更通用的对象池,可以考虑使用apache
commons pool,例如redis连接池可以参考jedis中关于连接池的实现
工作线程
定义
饥饿

不同的任务类型,采用不同的线程 池

