Singleton 模式
单例模式是一种对象创建模式,用于产生一个对象的具体实例,它可以确保系统中一个类只产生一个实例。在 Java 中,这样的行为能带来两个好处:
- 对于频繁使用的对象,可以省略 new 操作花费的时间,这对于那些重量级对象而言,是非常可观的一笔系统开销。
- 由于 new 操作的次数减少,因而对系统内存的使用频率也会降低,这将减轻 GC 压力。
严格来说,单例模式与并行没有直接的关系,讨论这个模式,是因为这个模式太常见了,我们不可避免地会在多线程环境中使用它们,因此我们需要一种高效的单例实现。
静态内部类实现了延迟加载,同时保证了线程安全(依赖 JVM 来保证线程安全)不建议使用双重锁校验模式。
public class StaticSingleton {
private StaticSingleton () {}
private static class SingletonHolder {
private static StaticSingleton instance = new StaticSingleton ();
}
public static StaticSingleton getinstance () {
return SingletonHolder.instance;
}
}
- 构造函数必须是 private 的,以控制不能随便创建这个类的实例。
- instance 必须是 private 并且 static 的,否则 instance 的安全性无法得到保证。
Immutability 模式
多个线程同时读写同一共享变量存在并发问题,这里的必要条件之一是读写,如果只有读而没有写,是没有并发问题的。因此解决并发问题,最简单的办法就是让共享变量只有读操作,而没有写操作。这个办法如此重要,以至于被上升到了一种解决并发问题的设计模式:不变性(Immutability)模式。
所谓不变性,就是对象一旦被创建之后,状态就不再发生变化。即变量一旦被赋值,就不允许再被修改了,没有修改操作,也就保持了不变性。将一个类所有的属性都设置成 final 的,并且只有只读方法,那么这个类基本上就具备不可变性了。更严格的做法是这个类本身也是 final 的,也就是不允许继承。因为子类可以覆盖父类的方法,有可能改变不可变性。
不变模式的主要使用场景需满足以下两个条件:
- 当对象创建后,其内部状态和数据不再发生任何变化
- 对象需要被共享,被多线程频繁访问。
Java SDK 里很多类都具备不可变性,例如经常用到的 String、Integer、Double 等基础类型的包装类都具备不可变性,这些对象的线程安全性都是靠不可变性来保证的。如果你仔细翻看这些类的声明、属性和方法,你会发现它们都严格遵守不可变类的三点要求:类和属性都是 final 的,所有方法均是只读的。
如果具备不可变性的类,需要提供类似修改的功能,通常做法是创建一个新的不可变对象,这是与可变对象的一个重要区别,可变对象往往是修改自己的属性。但所有的修改操作都创建一个新的不可变对象,那是不是创建的对象太多了,有点太浪费内存呢?这样做的确有些浪费,那如何解决呢?
利用享元模式避免创建重复对象
利用享元模式(Flyweight Pattern)可以减少创建对象的数量,从而减少内存占用。Java 里的 Long、Integer、Short 等基本数据类型的包装类都用到了享元模式。
享元模式本质上其实就是一个对象池,利用享元模式创建对象的逻辑也很简单:创建之前,先去对象池里看看是不是存在;如果存在就利用对象池里的对象;如果不存在就会新创建一个对象,并且把这个新创建出来的对象放进对象池里。
下面以 Long 这个类为例,看它是如何利用享元模式来优化对象的创建的。Long 内部维护了一个静态的对象池,缓存了 [-128,127] 之间的数字,这个对象池在 JVM 启动时就创建好了,而且这个对象池一直都不会变化,也就是说它是静态的。之所以采用这样的设计,是因为 [-128,127] 之间的数字利用率最高。
public static Long valueOf(long l) {
final int offset = 128;
if (l >= -128 && l <= 127) {
return LongCache.cache[(int)l + offset];
}
return new Long(l);
}
// [-128,127]直接的数字做了缓存
private static class LongCache {
private LongCache(){}
static final Long cache[] = new Long[-(-128) + 127 + 1];
static {
for(int i = 0; i < cache.length; i++)
cache[i] = new Long(i - 128);
}
}
Guarded Suspension 模式
想象如下场景:外出聚餐前我们预订了一个包间,到饭店后大堂经理看了一眼包间,发现服务员正在收拾,就会告诉我们预订的包间正在收拾,等包间收拾完后,大堂经理再带我们去包间就餐。在这个场景中,我们是否要等待,完全是由大堂经理来协调的。
在并发编程的角度,可以类比有很多桌客人,而大堂经理只有一个,通过这一个大堂经理来协调多个包间,等包间收拾完成这个条件满足后,大堂经理就会主动通知客人去对应包间就餐。这种设计模式被称为:Guarded Suspension,Guarded 是被守护、被保护的意思,Suspension 是暂停的意思,如果执行现在的处理会造成问题,就让执行处理的线程进行等待,通过让线程等待来保证实例的安全性。
下面我们来看看 Guarded Suspension 模式是如何模拟大堂经理这个角色的。
public class GuardedObject<T> {
//受保护的对象
T obj;
private final Lock lock = new ReentrantLock();
private final Condition done = lock.newCondition();
private final int timeout=2;
// 保存所有GuardedObject
private final static Map<Object, GuardedObject> gos = new ConcurrentHashMap<>();
// 静态方法创建GuardedObject
public static <K> GuardedObject create(K key) {
GuardedObject go = new GuardedObject();
gos.put(key, go);
return go;
}
public static <K, T> void fireEvent(K key, T obj) {
GuardedObject go = gos.remove(key);
if (go != null){
go.onChanged(obj);
}
}
// 获取受保护对象
public T get(Predicate<T> p) {
lock.lock();
try {
// MESA管程推荐写法
while(!p.test(obj)){
done.await(timeout, TimeUnit.SECONDS);
}
} catch(InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
return obj;
}
// 事件通知方法
public void onChanged(T obj) {
lock.lock();
try {
this.obj = obj;
done.signalAll();
} finally {
lock.unlock();
}
}
}
上面的示例代码中,GuardedObject 内部维护了一个 Map,Key 是包间 ID,Value 是 GuardedObject 对象实例,同时增加了静态方法 create() 和 fireEvent();create() 方法用来创建一个 GuardedObject 对象实例,并根据 key 值将其加入到 Map 中,而 fireEvent() 方法则是模拟的大堂经理根据包间找就餐人的逻辑。协调逻辑本质上就是根据 await 和 signalAll 这样的等待-唤醒机制来进行的。
Balking 模式
我们提到可以用“多线程版本的 if”来理解 Guarded Suspension 模式,不同于单线程中的 if,这个“多线程版本的 if”是需要等待的,而且还很执着,必须要等到条件为真。但世界上不是所有场景都需要这么执着,有时候我们还需要快速放弃。如果现在不适合执行这个操作,或者没必要执行这个操作,就立即返回,这样的场景就可以使用 Balking 模式,这样能够提高程序的响应性。
需要快速放弃的一个最常见的例子是各种编辑器提供的自动保存功能。自动保存功能的实现逻辑一般都是隔一定时间自动执行存盘操作,存盘操作的前提是文件做过修改,如果文件没有执行过修改操作,就需要快速放弃存盘操作。代码示例如下所示:
boolean changed=false;
//自动存盘操作
void autoSave(){
synchronized(this){
if (!changed) {
return;
}
changed = false;
}
//执行存盘操作
//省略且实现
this.execSave();
}
//编辑操作
void edit(){
//省略编辑逻辑
......
change();
}
//改变状态
void change(){
synchronized(this){
changed = true;
}
}
上面我们用 synchronized 实现了 Balking 模式,这种实现方式最为稳妥。不过在某些特定场景下,我们也可以使用 volatile 来实现,但使用 volatile 的前提是对原子性没有要求。
Balking 模式有一个非常典型的应用场景就是单次初始化,下面的示例代码是它的实现。这个实现方案中,我们将 init() 声明为一个同步方法,这样同一个时刻就只有一个线程能够执行 init() 方法;init() 方法在第一次执行完时会将 inited 设置为 true,这样后续执行 init() 方法的线程就不会再执行 doInit() 了。
class InitTest{
boolean inited = false;
synchronized void init(){
if(inited){
return;
}
//省略doInit的实现
doInit();
inited=true;
}
}
Producer-Consumer 模式
生产者线程负责提交用户请求,消费者线程则负责具体处理生产者提交的任务。生产者和消费者之间则通过共享内存缓冲区进行通信。共享内存缓冲区作为生产者和消费者间的通信桥梁,避免了生产者和消费者直接通信。同时,由于内存缓冲区的存在,允许生产者和消费者存在执行上的性能差异,确保系统正常运行。
1. wait、notify
public class ProConDemo {
// 队列容量
private static final int CAPACITY = 5;
private static int i = 0;
/**
* 生产者
*/
static class Producer implements Runnable {
private Queue<Integer> queue;
private int maxLength;
public Producer(Queue<Integer> queue, int maxLength) {
this.queue = queue;
this.maxLength = maxLength;
}
@Override
public void run() {
while (true) {
synchronized (queue) {
try {
// 在循环中调用wait,防止虚假唤醒
while (maxLength == queue.size()) {
System.out.println("Queue is full, Producer[" + Thread.currentThread().getName() + "] " +
"waiting for consumer to take something from queue.");
queue.wait();
}
} catch (InterruptedException e) {}
System.out.println("[" + Thread.currentThread().getName() + "] Producing value :" + i);
queue.offer(i++);
queue.notifyAll();
}
}
}
}
/**
* 消费者
*/
static class Consumer implements Runnable {
private Queue<Integer> queue;
public Consumer(Queue<Integer> queue) {this.queue = queue;}
@Override
public void run() {
while (true) {
synchronized (queue) {
try {
// 防止虚假唤醒
while (queue.isEmpty()) {
System.out.println("Queue is empty, Consumer[" + Thread.currentThread().getName() + "] " +
"thread is waiting for Producer");
queue.wait();
}
} catch (InterruptedException e) {}
Integer element = queue.poll();
System.out.println("[" + Thread.currentThread().getName() + "] Consuming value :" + element);
queue.notifyAll();
}
}
}
}
}
2. await、signal
public class ProConDemo {
private static final int CAPACITY = 5;
private static int i = 0;
private static Lock lock = new ReentrantLock();
private static Condition full = lock.newCondition();
private static Condition empty = lock.newCondition();
/**
* 生产者
*/
static class Producer implements Runnable {
private Queue<Integer> queue;
private int maxLength;
public Producer(Queue<Integer> queue, int maxLength) {
this.queue = queue;
this.maxLength = maxLength;
}
@Override
public void run() {
while (true) {
lock.lock();
try {
while (maxLength == queue.size()) {
System.out.println("Queue is full, Producer[" + Thread.currentThread().getName() + "] " +
"waiting for consumer to take something from queue.");
full.await();
}
System.out.println("[" + Thread.currentThread().getName() + "] Producing value :" + i);
queue.offer(i++);
// 唤醒其他所有消费者
empty.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
}
/**
* 消费者
*/
static class Consumer implements Runnable {
private Queue<Integer> queue;
public Consumer(Queue<Integer> queue) { this.queue = queue;}
@Override
public void run() {
while (true) {
lock.lock();
try {
while (queue.isEmpty()) {
System.out.println("Queue is empty, Consumer[" + Thread.currentThread().getName() + "] " +
"thread is waiting for Producer");
empty.await();
}
Integer element = queue.poll();
System.out.println("[" + Thread.currentThread().getName() + "] Consuming value :" + element);
// 唤醒其他所有生产者
full.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
}
}
3. BlockingQueue
public class ProConDemo {
// 阻塞队列长度
private static final int CAPACITY = 5;
private static volatile AtomicInteger count = new AtomicInteger(0);
/**
* 生产者
*/
static class Producer implements Runnable {
private BlockingQueue<Integer> blockingQueue;
private volatile boolean isRunning = true;
public Producer(LinkedBlockingQueue<Integer> queue) {this.blockingQueue = queue;}
@Override
public void run() {
while (isRunning) {
try {
blockingQueue.put(count.incrementAndGet());
System.out.println("[" + Thread.currentThread().getName() + "] Producing value :" + count.get());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public void stop() {
isRunning = false;
}
}
/**
* 消费者
*/
static class Consumer implements Runnable {
private BlockingQueue<Integer> blockingQueue;
public Consumer(LinkedBlockingQueue<Integer> queue) {this.blockingQueue = queue;}
@Override
public void run() {
while (true) {
try {
Integer x = blockingQueue.take();
System.out.println("[" + Thread.currentThread().getName() + "] Consuming value :" + x);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
4. Disruptor
Disruptor 是一款高性能的有界内存队列,对标的是 JDK 提供的 BlockingQueue 系列容器。详情链接
Future 模式
Future 模式的核心思想是异步调用。当我们需要调用一个函数时,如果这个函数执行得很慢,那么我们就要进行等待。但有时候,我们可能并不急着要结果。因此,我们可以让被调者立即返回,让它在后台慢慢处理这个请求。对于调用者来说,则可以先处理一些其他任务,在真正需要数据的场合再去尝试获得需要的数据。
Future 模式有点类似在网上买东西。如果我们在网上下单买了一部手机,当我们支付完成后,手机并没有办法立即送到家里,但是在电脑上会立即产生一个订单。这个订单就是将来发货或者领取手机的重要凭证,这个凭证也就是 Future 模式中会给出的一个契约。在支付活动结束后,大家不会傻傻地等着手机到来,而是各忙各的。而这张订单就成了商家发货的驱动力。当然,这一切我们不关心,我们只需要在快递上门时收货而已。
Future 模式很常用,因此 JDK 内部已经为我们准备好了一套完整的实现。首先,我们先看一下 Future 模式的基本结构,如下图所示:
其中 Future 接口类似于订单或者说是契约。通过它,你可以得到真实的数据。RunnableFuture 继承了 Future 和 Runnable 两个接口,其中 run() 方法用于构造真实的数据。它有一个具体的实现 FutureTask 类,FutureTask 类有一个内部类 Sync,一些实质性的工作会委托 Sync 类来实现。而 Sync 类最终会调用 Callable 接口,完成实际数据的组装工作。
Callable 接口只有一个方法 call(),它会返回需要构造的实际数据。这个 Callable 接口也是 Future 框架和应用程序之间的重要接口。要实现自己的业务系统,通常需要实现自己的 Callable 对象。此外,FutureTask 类也与应用密切相关,通常可以使用 Callable 实例构造个 FutureTask 实例,并将它提交给线程池。
Two-Phase Termination 模式
Java 语言的 Thread 类中曾经提供了一个 stop() 方法用来终止线程,可是早已不建议使用了,原因是这个方法用的就是一剑封喉的做法,被终止的线程没有机会料理后事。既然不建议使用 stop() 方法,那在 Java 领域,我们又该如何优雅地终止线程呢?
前辈们经过认真对比分析总结出了一套成熟的方案,叫做两阶段终止模式。顾名思义,就是将终止过程分成两个阶段,第一个阶段主要是线程 T1 向线程 T2 发送终止指令,而第二阶段则是线程 T2 响应终止指令。
通过 Thread 类提供的 interrupt() 方法,将线程转换到 RUNNABLE 状态。
终止的优雅方式是让 Java 线程自己执行完 run() 方法,所以一般我们会设置一个标志位,然后线程会在合适的时机检查这个标志位,如果发现符合终止条件,则自动退出 run() 方法。
综合上面这两点,我们能总结出终止指令,其实包括两方面内容:interrupt() 方法和线程终止的标志位。理解了两阶段终止模式之后,下面我们看一个实际工作中的案例。
public class Proxy {
// 线程终止标志位
private volatile boolean terminated = false;
// 线程启动标志位,防止重复启动
private boolean started = false;
private Thread rptThread;
public synchronized void start(){
if (started) {
return;
}
started = true;
terminated = false;
rptThread = new Thread(()->{
while (!terminated){
report();
try {
Thread.sleep(2000);
} catch (InterruptedException e){
//重新设置线程中断状态
Thread.currentThread().interrupt();
}
}
started = false;
});
rptThread.start();
}
public synchronized void stop(){
// 设置中断标志位
terminated = true;
// 中断线程rptThread
rptThread.interrupt();
}
}
需要注意的是,我们在捕获 Thread.sleep() 的中断异常后,通过 Thread.currentThread().interrupt() 重新设置了线程的中断状态,因为 JVM 的异常处理会清除线程的中断状态。