- 哔哩哔哩视频">哔哩哔哩视频
- 第一部分 多线程基础
- 5 线程间通信
- 7 Hook线程以及捕获线程执行异常
- 7.2 注入钩子线程
- 8 线程池原理以及自定义线程池
- 第三部分 深入理解volatile关键字
- 13 深入volatile关键字
- 14 7种单例设计模式的设计
- 第四部分 多线程设计架构模式
- 24 Thread-Per-Message设计模式
- 25 Two Phase Termination设计模式
哔哩哔哩视频
第一部分 多线程基础
1 快速认识线程
1.3 线程生命周期详解
每一个线程都有自己的局部变量表,程序计数器、以及生命周期等。
线程的生命周期大体分为以下5个主要阶段
- NEW
- RUNNABLE
- RUNNNING
- BLOCKED
-
1.3.3 现场的RUNNING状态
在该状态中,线程的状态可以发生如下的状态转换。
直接进入TERMINATED状态,比如调用JDK不推荐的stop方法或者判断某个逻辑标识。
- 进入BLOCKED状态,比如调用了sleep,或者wait方法而加入了waitSet中。
- 进行某个阻塞的IO操作,比如因网络数据的读写而进入BLOCKED状态
- 获取某个锁资源,从而加入到该锁 的阻塞队列中二进入了BLOKED状态。
- 由于CPU的调度器轮询使该线程放弃执行,进入RUNNABLE状态
- 线程主动调用yield方法,放弃CPU执行权,进入RUNNABLE状态。
2.1.1 线程 的默认命名
public static void main(String[] args) {IntStream.range(0, 5).boxed().map(i -> new Thread(() -> {System.out.println(Thread.currentThread().getName());})).forEach(Thread::start);}
如果没有为线程显示地指定一个名字,那么线程将会以“Thread-”作为前缀与一个自增数据进行组合,这个自增数字在整个JVM进程中将会不断自增。
2.1.2 命名线程
public class ThreadConstruction {private final static String PREFIX = "ALEX-";public static void main(String[] args) {IntStream.range(0, 5).mapToObj(ThreadConstruction::createdThread).forEach(Thread::start);}private static Thread createdThread(final int intName) {return new Thread(() ->System.out.println(Thread.currentThread().getName()), PREFIX + intName);}}
一旦线程启动,名字将不在被修改。
2.5.2 JVM内存结构
补充这部分
2.6 守护线程
2.6.2 守护线程的作用
守护线程经常用作执行一些后台任务,因此有时也被称为后台进程,当你希望关闭某些线程的时候,或者退出JVM进程额时候,一些线程能够自动关闭,此时可以考虑用守护线程为你完成这样的工作
3 Thread API的详细介绍
3.1 sleep
睡眠有一个重要特性,那就是不会放弃monitor锁的所有权。
3.1.2 使用TimeUnit替代Thread.sleep
3.3.1 线程优先级介绍
进程有进程的优先级,线程同样也有优先级,理论上优先级较高的线程会获取优先被CPU调度的机会,但事实上往往并不会如你所愿。
3.4 获取线程ID
线程ID在整个JVM进程中都会是唯一的,并且是从0开始逐次递增。
3.5 获取当前线程
3.6 设置线程上下文类加载器
3.7 线程interrupt
- interrupt
- interrupted
-
3.7.1 interrupt
如下方法的调用会使得当前线程进入阻塞状态,而调用当前线程的interrupt方法,就可以打断阻塞。
Object的wait方法
- Ojbect 的 wait(long)的方法
- Ojbect 的 wait(long,int)的方法
- Thread的sleep(long)方法
- Thread的sleep(long ,int)方法
- Thread的 join方法
- Thread的join(long,int)方法
- InterruptibleChannel的io操作。
- Selector的wakeup方法。
- 其他方法
打断一个线程并不等于该线程的生命周期的借宿,仅仅是打断了当前线程的阻塞状态。
public class ThreadInterrupt {public static void main(String[] args) throws InterruptedException {final Thread thread = new Thread(() -> {try {TimeUnit.MINUTES.sleep(1);} catch (InterruptedException e) {e.printStackTrace();System.out.println("Oh , i am be interrupted.");}});thread.start();TimeUnit.MILLISECONDS.sleep(2);thread.interrupt();}}
**如果当前线程正在执行可中断方法被阻塞时,调用interrupt方法将其中断,反而会导致flag被清除。**
3.8 线程join
与sleep一样也是一个可中断的方法,也就是说,如果有其他线程执行了当前线程的interrupt操作,它也会捕获中断信号,并且擦除线程interrupt标识。
public class ThreadJoin {public static void main(String[] args) throws InterruptedException {final List<Thread> threads = IntStream.range(1, 3).mapToObj(ThreadJoin::create).collect(Collectors.toList());//启动这两个线程threads.forEach(Thread::start);//执行join方法for (Thread thread : threads) {//jointhread.join();}for (int i = 0; i < 10; i++) {executor(i);}}static Thread create(int seq) {return new Thread(() -> {for (int i = 0; i < 10; i++) {executor(i);}}, String.valueOf(seq));}private static void executor(int i) {System.out.println(Thread.currentThread().getName() + "# " + i);shortSleep();}private static void shortSleep() {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}}}
join方法会使当前线程永远等待下去,直到期间被另外的线程中断,或者join的线程执行结束。
3.8.2 join方法结合实战
等待各个航班返回信息。
典型的串行任务局部并行化处理。
4 线程安全与数据同步
4.3.3 使用synchronized需要注意的问题
- 与monitor关联的对象不能为空
- synchronized的作用域不能太大。
- 不同的monitor企图锁相同的方法
- 多个锁的交叉导致死锁
4.5 程序死锁的原因以及如何诊断
4.5.1 程序死锁
5 线程间通信
5.1 同步阻塞与异步阻塞
5.1.2 异步非阻塞消息处理
同步阻塞的缺陷,尤其是在吞吐量低,很难应对比较高的业务并发量。如果将同步阻塞的方式换成异步阻塞的方式,则不仅可以提高系统的吞吐量,而且业务处理线程也能控制在一个固定的范围,以增加系统的稳定性。
5.2单线程间通信
public class EventQueue {private final int max;static class Event {}private final LinkedList<Event> eventQueue = new LinkedList<>();private final static int DEFAULT_MAX_EVENT = 10;public EventQueue() {this(DEFAULT_MAX_EVENT);}public EventQueue(int max) {this.max = max;}public void offer(Event event) {synchronized (eventQueue) {if (eventQueue.size() > max) {try {console(" the queue is full.");eventQueue.wait();} catch (InterruptedException e) {e.printStackTrace();}}console(" the new event is submitted ");eventQueue.addLast(event);// IllegalMonitorStateExceptioneventQueue.notify();}}public Event take() {synchronized (eventQueue) {if (eventQueue.isEmpty()) {try {console(" the queue is empty");eventQueue.wait();} catch (InterruptedException e) {e.printStackTrace();}}final Event event = eventQueue.removeFirst();this.eventQueue.notify();console(" the event " + event + " is handled.");return event;}}private void console(String msg) {System.out.printf("%s:%s\n", Thread.currentThread().getName(), msg);}}public class EventQueue {private final int max;static class Event {}private final LinkedList<Event> eventQueue = new LinkedList<>();private final static int DEFAULT_MAX_EVENT = 10;public EventQueue() {this(DEFAULT_MAX_EVENT);}public EventQueue(int max) {this.max = max;}public void offer(Event event) {synchronized (eventQueue) {if (eventQueue.size() > max) {try {console(" the queue is full.");eventQueue.wait();} catch (InterruptedException e) {e.printStackTrace();}}console(" the new event is submitted ");eventQueue.addLast(event);// IllegalMonitorStateExceptioneventQueue.notify();}}public Event take() {synchronized (eventQueue) {if (eventQueue.isEmpty()) {try {console(" the queue is empty");eventQueue.wait();} catch (InterruptedException e) {e.printStackTrace();}}final Event event = eventQueue.removeFirst();this.eventQueue.notify();console(" the event " + event + " is handled.");return event;}}private void console(String msg) {System.out.printf("%s:%s\n", Thread.currentThread().getName(), msg);}}
public class EventClient {
public static void main(String[] args) {
final EventQueue eventQueue = new EventQueue();
new Thread(() -> {
for (; ; ) {
eventQueue.offer(new EventQueue.Event());
}
}, "Producer").start();
new Thread(() -> {
for (; ; ) {
try {
eventQueue.take();
TimeUnit.MILLISECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "Consumer").start();
}
}
5.2.3 关于wait和notify的注意事项
- wait方法是可中断的,这也就意味着,当前线程一旦调用了wait方法进入阻塞状态,其他线程时可以打断。可中断被打断后收到中断异常InterruptedException,同时interrupt标识也会被擦除。
- 线程执行了某个对象的wait方法后,会加入与之对应的wait set中,每一个对象的monitor都有一个与之关联的wait set。
- 当线程进入wait set之后,notify方法可以将其唤醒,也就是从wait set中弹出,同时中断wait中的线程也会将其唤醒。
- 必须在同步方法中使用wait和notify方法,因为执行wait 和notify的前提条件是必须持有同步方法的monitor的所有权。
- 同步代码的monitor必须与执行wait notify方法的对象一致。否则抛出
IllegalMonitorStateException异常。
5.2.4 wait 和sleep
都可使现场进入阻塞状态,但是两者存在本质的区别。
- 都可以使线程进入阻塞状态。
- 都是可中断方法,被中断口都会收到中断异常
- wait是Object方法,sleep是Thread特有方法
- wait方法的执行必须在同步方法中进行,而sleep不需要
- 线程在同步方法中执行sleep方法时,并不会释放monitor的锁,而wait方法会释放monitor锁。
- sleep在短暂休眠之后就会退出阻塞,而wait(没有执行时间)则需要其他线程中断后才能退出阻塞。
5.3 多线程间通信
5.4 自定义显示BooleanLock
5.4.1 synchronized关键字的缺陷
synchronized关键字提供了一种排他式的数据同步机制。某个线程在获取monitor lock的时候可能被阻塞,而这种阻塞有两个明显的缺陷:第一,无法控制阻塞时长。第二,阻塞不可被中断。
**
7 Hook线程以及捕获线程执行异常
7.1 获取线程运行时异常
7.1.1 UncaughtExceptionHandler的介绍
线程在执行单元中是不允许抛出checked异常的,而且线程运行在自己的上下文中,派生它的线程将无法直接获得它运行中出现的异常信息。对此,Java提供了一个UncaughtExceptionHandler接口。当线程在运行过程中出现异常时,会回调UncaughtExceptionHandler接口,从而得知是哪个线程在运行时出错,以及出现了什么样的错误。
public class CaptureThreadException {
public static void main(String[] args) {
Thread.setDefaultUncaughtExceptionHandler((t, e) -> {
System.out.println(t.getName() + " occur exception");
e.printStackTrace();
});
final Thread thread = new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(1 / 0);
}, "Test-Thread");
thread.start();
}
}
7.2 注入钩子线程
7.2.1 Hook线程介绍
JVM进程的退出是由于JVM进程中没有活跃的非守护线程,或者收到了系统中断信号,向JVM程序注入一个Hook线程,在JVM进程退出的时候,Hook线程会启动执行,通过Runtime可以为JVM注入多个Hook线程。
在线上Java程序中经常遇到进程程挂掉,一些状态没有正确的保存下来,这时候就需要在JVM关掉的时候执行一些清理现场的代码。Java中得ShutdownHook提供了比较好的方案。
JDK在1.3之后提供了Java Runtime.addShutdownHook(Thread hook)方法,可以注册一个JVM关闭的钩子,这个钩子可以在以下几种场景被调用:
- 1)程序正常退出
- 2)使用System.exit()
- 3)终端使用Ctrl+C触发的中断
- 4)系统关闭
- 5)使用Kill pid命令干掉进程
注:在使用kill -9 pid是不会JVM注册的钩子不会被调用。
public class ThreadHook {
public static void main(String[] args) {
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("The hook thread 1 is running.");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("The hook thread will exit.");
}, "Sys-Hook"));
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("The hook thread 2 is running.");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("The hook thread 2 will exit.");
}, "Sys-Hook-2"));
System.out.println("The program will is stopping.");
}
}
7.2.3 Hook线程应用场景以及注意事项
- hook线程只有在收到退出信号的时候才会被执行,如果在kill的时候使用了参数-9 ,那么hook 线程不会得到执行,进程间立即退出
- Hook线程中也可以执行以下资源释放的工作,比如关闭文件句柄,socket链接,数据库connection等。
- 尽量不要在hook线程中执行一些耗时非常长的操作,因为其会导致线程迟迟不能退出。
8 线程池原理以及自定义线程池
8.1 线程池原理
一个完整的线程池已改具备如下要素。
- 任务队列:用于缓存提交的任务
- 线程数量管理功能:一个线程池必须能够很好地管理和控制线程池数量,可以通过如下三个参数来控制,比如创建线程池时初始化的线程数量init;线程池自动扩充时最大线程数量;在线程池空闲时需要释放但是也要维护一定数量的活跃数量或核心数量core。这三者关系init <=core<=max
- 任务拒绝策略:如果线程数量已经达到上限且任务队列已满,则需要有相应的拒绝策略来通知任务提交者
- 线程工厂:主要用于个性化定制线程,比如将线程设置为守护线程以及设置线程名称等。
- QueueSize:任务队列主要存放提交的Runnable,但是为了防止内存溢出,需要有limit数量对其进行控制。
- Keepedalive时间:该时间主要决定线程各个重要参数自动维护的时间间隔。
第三部分 深入理解volatile关键字
12 volatile关键字的介绍
volatile关键字只能修饰类变量和实例变量,对于方法参数、局部变量以及实例常量,类常量都不能进行修饰。
12.2.2 CPU缓存一致性问题
由于缓存的出现,极大地提高了CPU的吞吐能力,但是同时也引入缓存不一致的问题,比如i++这个操作,在程序运行的过程中,首先需要将主内存中的中的数据复制一份存放到CPU Cache 中,那么CPU寄存器在进行数值计算的时候就直接到Cache中读取和写入,当整个过程运算结束之后再将Cache的数据刷新到主内存当中。具体过程如下;
- 读取主内存的i到CPU Cache中
- 对i进行加一操作
- 将结果写会到CPU Cache中
- 将数据刷新到主内存中。
12.3 Java内存模型
Java的内存模型(Java Memory Mode ,JMM)指定了Java虚拟机如何与计算机的主存进行工作,
Java的内存模型决定了一个线程对共享变量的写入何时对其他线程可见,Java内存模型定义了线程和主内存之间的抽象关系,具体如下:
- 共享变量存储在主内存之中,每个线程都可以访问
- 每个线程都有私有的工作内存或本地内存
- 工作内存值存储该线程对共享变量的副本
- 线程不能直接操作主内存,只有先操作了工作内存之后才能写入主内存
- 工作内存和java内存模型一样也是一个概念,它其实并不存在,它涵盖了缓存、寄存器、编译器优化以及硬件等。
13 深入volatile关键字
13.1 并发编程的三个重要特征
原子性、有序性、可见性
13.1.1 原子性
所谓原子性是指在一次的操作或者多次操作中,要么所有的操作全部都得到执行并且不会受到任何因素的干扰而中断,要么所有的操作都不执行。
13.1.2 可见性
可见性是指:当一个线程对共享变量进行了修改,那么另外的线程可以立即看到修改后的最新值。
13.1.3 有序性
一般来说,处理器为了提高程序的运行效率,可能会对输入的代码指令做一定的优化,它不会百分百的保证代码的执行顺序严格按照编写代码中的顺序来进行,但是它会保证程序最终运算结果是编码是所期望的那样。
13.2 JMM如何保证三大特性
JVM采用内存模型的机制来屏蔽各个平台和操作系统之间的内存访问的差异,以实现让Java程序在各平台下达到一致的内存访问效果。
Java的内存模型规定了所有的变量都是存在于主内存当中,而每个线程都有自己的工作内存或本地内存(这一点很像CPU的Cache),线程对变量的所有操作都必须在自己的工作内存中进行,而不能直接对主内存进行操作。并且每一个线程都不能访问其他线程的工作内存或本地内存。
13.2.1 JMM与原子性
Java语言中,对基本数据类型的变量读取赋值操作都是原子性的,对引用类型的变量读取和赋值的操作也是原子性的,因此诸如此类的操作是不可被中断的,要么执行,要么不执行。
- y=x赋值操作
这条语句是非原子性的,因为它包含了两个重要的步骤
- 执行线程从主内存中读取x的值(如果x已经存在于执行线程的工作内存中,则直接获取)然后将其存入当前线程的工作内存之中。
- 在执行线程的工作内存中修改y的值为x,然后将y的值写入到主内存之中。
虽然第一步和第二步都是原子类型的操作,但是合在一起就不是原子操作了。
- 多个原子性的操作在一起就不再是原子性操作了
- 简单的读取与赋值操作是原子性的,将一个变量赋值给另外一个变量的操作不是原子性的。
- Java内存模型(JMM)只保证了基本读取回复赋值的原子性操作,其他的均不保证,如果想要使得某些代码片段具备原子性,需要使用synchronized或者JUC中的lock。
volatile关键字不具备保证原子性的语义
13.2.2 JMM与可见性
在多线程的环境下,如果某个线程首次读取共享变量,则首先到主内存中获取该变量,然后存入工作内存中,以后只需要在工作内存中读取该变量即可。同样如过对该变量执行了修改操作,则先将新值写入工作内存中,然后在刷新到主内存中。但是什么时候最新的值会被刷新到主内存中不太确定。
Java提供了三种方式来保证可见性
- 使用volatile关键字,当一个变量被volatile修饰时,对于共享资源的读操作会直接在主内存中进行(当然也会缓存到工作内存中,当其他线程对该共享资源进行了修改,则会导致当前线程在工作内存中的共享资源失效,所以必须从主内存中再次读取),对于共享变量的写操作先要修改工作内存,但是修改结束后会立即将其刷新到主内存中。
- 通过synchronized关键字能够保证可见性,synchronized关键字能够保证同一时刻只有一个线程获得锁,然后执行同步方法,并且还会确保在锁释放之前,会将变量的修改刷新到主内存中。
- 通过JUC提供的显示锁Lock也能够保证可见性。Lock的lock方法能够保证在同一时刻只有一个线程获得锁然后执行同步方法,并且会保证在锁释放之前会将变量的修改刷新到主内存中。
13.2.3 JMM与有序性
在Java的内存模型中,运行编译器和处理器对指令进行重排序,在单线程的情况下,重排序并不会引起什么问题,但是在多线程的情况下,重排序会影响到程序的正确运行,Java提供了三种保证有序性的方式。
- 使用volatile关键字来保证
- 使用synchronized关键字来保证
- 使用显示锁Lock来保证。
后两者采用了同步的机制,同步代码在执行的时候与单线程情况下一样自然能够保证顺序性(最终结果的顺序性)。
happens-before原则
Java内存模型具备一些天生的有序性规则,需要任何同步手段就能保证有序性,这个规则被称为Happens-before原则。如果两个操作的执行次序无法从happens-before原则推导出来,那么它们就无法保证有序性。
以下是happens-before原则
- 程序次序规则:在一个线程内,代码按照编写的次序执行,编写在后面的操作发生于编写在前面操作之后。(虚拟机可能会对代码指令进行重排序,只要确保在一个线程内最终的结果和代码顺序执行的结果一致即可。)
- 锁定规则:一个unlock操作要先行发生于对同一个锁的lock操作(无论是单线程还是多线程,如果同一个锁是锁定状态,那么必须先对其执行释放操作之后才能继续执行lock操作)
- volatile变量规则:对一个变量的写操作要早于对这个变量之后的读操作。(如果一个变量使用volatile关键字修饰,一个线程对它进行读操作,一个线程对它进行写操作,那么写入操作要发生在读操作)
- 传递规则:A操作先于B操作,B操作先于C操作,那么A操作先于C操作
- 线程启动规则:Thread对象的start方法先行发生于对该线程的任何动作
- 线程中断规则:对线程执行interrput方法肯定要优先于捕获到中断信号。
- 线程的终结规则:线程中所有的操作都要先行发生于线程的终止检测(线程的任务执行,逻辑单元执行肯定要发生于线程死亡之前)
- 对象的终结:一个对象初始化的完成先行发生于finalize方法之前。
volatile关键字具有保证顺序性的语义
13.3 volatile关键字深入解析
13.3.1 volatile关键字的语义
被volatile修饰的实例变量或者类变量具有如下两层语义:
- 保证了不同线程之间对共享变量操作时的可见性,也就是说当一个线程修改volatile修饰的变量时,另外一个线程会立即看到最新的值。
- 禁止对指令进行重排序操作。
(1)理解volatile保证可见性
在happend-before规则中:volatile变量规则:对一个变量的写操作要早于对这个变量之后的读操作。其具体步骤如下:
- Reader线程从主内存中获取init_value的值为0,并且将其缓存到工作内存中。
- Updater线程将init_value的值在本地工作内存中修改为1,然后立即刷新到主内存中。
- Reader线程在本地工作内存中的init_value失效。
- 由于Reader线程工作内存中的init_value失效,因此需要到主内存中重新读取init_value的值。
(2)理解volatile保证顺序性
volatile直接禁止JVM和处理器对volatile关键字修饰的指令重排序,但是对于volatile前后无依赖关系的指令可以随便怎么排序。
13.3.2 volatile的原理和机制
通过对openJDK 的unsafe.cpp源码,发现被volatile修饰的变量存在于一个“lock;”前缀,
lock前缀实际上相当于一个内存屏障,该内存屏障会为指令提供如下几个保障:
- 确保指令重排序时不会将其后面的代码排到内存屏障之前
- 确保指令重排序时不会将其前面的代码排序到内存屏障之后。
- 确保在执行到内存屏障修饰的指令时前面的代码全部执行完毕。
- 强制将线程工作内存中值的修改刷新到主内存中
- 如果是写操作,则会导致其他工作内存(CPU Cache)中的缓存数据失效
13.3.3 volatile的使用场景
虽然volatile有部分synchronized关键字的语义,但是volatile不可能完全替代synchronized关键字,因为volatile关键字不具备原子性操作语义,我们在使用volatile关键字的时候也是充分利用它的可见性以及有序性(防止重排序)特点。
- 利用开关控制可见性的特点
- 状态标记利用顺序的性特点
- Singleton设计模式的double-check也是利用了顺序性特点。
13.3.4 volatile和synchronized
两者的区别
- 使用上的区别
- volatile关键字只能用于修饰实例变量或者类变量,不能用于修饰方法以及方法参数和局部变量、常量等。
- synchronized关键字不能用于对变量的修饰,只能用于修饰方法或者语句块。
- volatile关键字修饰的变量可以为null,synchronized关键字同步语句块的monitor对象不能为null。
对原子性的保证
- volatile无法保证原子性
- 由于synchronized是一种排他的机制,因此synchronized关键字修饰的同步代码块是无法被中途打断的,因此其能够保证代码的原子性。
对可见性的保证
- 两者均可以保证共享资源在多线程间的可见性,但是实现机制完全不同
- synchronized借助于JVM指令monitor enter 和monitor exit 通过排他的方式使得同步代码串行化,在monitor exit时所有共享资源都将会被刷新到主内存中。
- volatile使用机器指令(偏硬件)“lock”的方式破事其他线程工作内存中的数据失效,不得到主内存中进行再次加载。
对有序性的保证
- volatile 关键字禁止JVM编译器以及处理器对其进行重排序,所以它能够保证有序性。
- synchronized关键字锁修饰的同步方法也可以保证顺序性,但是这种顺序性是以程序的串行化执行换来的,在synchronized关键字所修饰的代码块中代码指令也会发生重排序的情况。
其他
- volatile不会使线程陷入阻塞
- synchronized关键字会使线程进入阻塞状态。
14 7种单例设计模式的设计
14.1 饿汉式
//饿汉式
public final class Singleton {
//实例变量
private byte[] data =new byte[1024];
//在定义实例对象的时候直接初始化
private static Singleton instance = new Singleton();
//私有构造函数,不允许外部new
private Singleton() {
}
public static Singleton getInstance(){
return instance;
}
}
饿汉式的关键在于instance作为变量并且直接得到了初始化,如果主动使用Singleton类,那么instance实例将会直接完成创建,包括其中的实例变量都会得到初始化,比如1K空间的data将会同时被创建。
instance作为类变量在类初始化的过程中会被收集进
如果一个类中的成员属性比较少,且占用的内存资源不多,饿汉的方式也未尝不可,相反,如果一个类的成员都是比较重的资源,那么这种方式的方式就会有些不妥。
getInstance性能比较高,但是无法进行懒加载。
14.2 懒汉式
所谓懒汉式就是在使用实例的时候再去创造,这样就可以避免类在初始化是提前创建。
//final 不允许被继承
public final class Singleton {
//定义实例 但是不是直接初始化
private static Singleton instance = null;
//实例变量
private byte[] data = new byte[1024];
private Singleton() {
}
private static Singleton getInstance() {
if (instance == null) {
instance = new Singleton();
}
return instance;
}
}
Singleton的变量install=null,因此当Singleton.class被初始化的时候instance并不会被实例化,在getInstance方法中会判断instance是否被实例化,看起来没什么问题,但是getInstance方法在多线程环境下分析,则会导致instance被实例化一次以上,并不能保证单例的唯一性。
14.3 懒汉式+同步方法
懒汉式的instance是共享资源,多个线程访问时,需要保证数据的同步性。
//final 不允许被继承
public final class Singleton {
//定义实例 但是不是直接初始化
private static Singleton instance = null;
//实例变量
private byte[] data = new byte[1024];
private Singleton() {
}
// 加入同步控制,每次只能有1个线程进入
private static synchronized Singleton getInstance() {
if (instance == null) {
}
return instance;
}
}
采用懒汉式+数据同步的方式既能满足了懒加载又能百分之百保证instance实例的唯一性,但是synchronized关键字天生的排他性导致了getInstance方法只能在同一时刻被一个线程所访问,性能低下。
14.4 Double-Check
这是一种比较聪明的设计方式,他提供了一种高效的数据同步策略,那就是首次初始化时加锁,之后允许多个线程同时进行getInstance方法的调用来获得类的实例。
//final 不允许被继承
public final class Singleton {
private static Singleton instance = null;
private byte[] data = new byte[1024];
Connection conn;
Socket socket;
private Singleton() {
//初始化 conn
this.conn =
//初始化 socket
this.socket = new Socket();
}
public static Singleton getInstance() {
//当instance==null,进入同步代码块,同时该判断避免了每次都需要进入同步代码块
//可以提高效率
if (instance == null) {
synchronized (Singleton.class) {
if (instance == null) {
instance = new Singleton();
}
}
}
return instance;
}
}
当两个线程发现null==instance,只有一个线程有资格进入同步代码块,,完成对instance的实例化,随后的线程发现null==instance不成立则无须进行任何动作,以后对getInstacne的访问就不需要数据同步的保护了。
这种方式看起来是那么的完美和巧妙,即满足了懒加载,又保证了instance实例的唯一性,Double-Check的方式提供了高效的数据同步策略,可以允许多个线程同时对getInstance进行访问,但是这种方式在多线程的情况下可能会引起空指针异常。
因为:在Singleton的构造函数中,需要分别实例化conn和socket两个资源,还有Singleton自身。根据JVM允许时指令重排序和Happens-Before规则,这三者之间的实例化顺序并没前后的约束,那么极有可能是instance最先被实例化,而conn和socket并未完成实例化。为完成初始化的实例调用其方法将会抛出空指针异常。
14.5 Volatile+Double+Check
Double-Check虽然是一种巧妙的程序设计,但是有可能会引起类成员变量的实例化conn和socket发生在instance实例化之后,这一切均是由于JVM在运行时指令重排序锁导致的,而volatile关键字可以防止这种重排序的发生。因此代码稍作修改就可满足多线程下的单例、懒加载以及获取实例的高效性,
private volatile static Singleton instance = null;
14.6 Holder方式
Holder的方式完全是借助了类加载的特点。
public final class Singleton {
private byte[] data = new byte[1024];
private Singleton() {}
//在静态内部类中持有 Singleton实例,并且可以被直接初始化
private static class Holder {
private static Singleton instacne = new Singleton();
}
//调用getInstance 实际上是获得Holder的instance静态属性。
public static Singleton getInstance() {
return Holder.instacne;
}
}
在Singleton类中并没有instance的静态成员,而是将其放到了静态内部类Holder之中,因此在SIngleton类的初始化过程中并不会创建Singleton的实例。Holder类中定义了Singleton的静态变量,并且直接进行了实例化,当Holder被主动引用的时候则会创建Singleton的实例。Singleton实例的创建过程在Java编译时期收集至
14.7 枚举方式
枚举类型不允许被继承,同样是线程安全的且只能被实例化一次,但是枚举类型不能够懒加载。
//枚举本身是 final ,不允许被继承的
public enum Singleton {
INSTANCE;
//实例变量
private byte[] data = new byte[1024];
Singleton(){
System.out.println("INSTANCE will be initialized immediately");
}
public static void method(){
//调用该方法则会主动使用Singleton,INSTANCE 将会被实例化
}
public static Singleton getInstance(){
return INSTANCE;
}
}
第四部分 多线程设计架构模式
15 控制线程生命周期
15.1 场景
虽然Thread为我们提供了可获取状态,以及判断是否alive的方法,但是这些方法均是针对线程本身的,而我们提交的任务runnable在运行过程中所处的状态是无法直接获得的,比如它什么时候开始,什么时候结束,最不好的一种体验是无法获得runnable的任务执行后的结果,一般情况下想要获得最终结果,我们不得不为thread或者runnable传入共享变量,但是在多线程的情况下,共享变量将导致资源的竞争从而增加了数据不一致性的安全隐患。
15.2 当观察者模式遇到Thread
当某个状态发生状态改变需要通知第三方的时候,观察者模式就特别适合胜任这样的工作。观察者模式需要有事件源,也就是引发状态改变的源头,很明显Thread负责执行任务的逻辑单元,它最清楚整个过程的始末周期,而事件的接受者则是通知接受者一方,严格意义上的观察者模式是需要Obsever的集合,这里我们不需要这样严格,只需要将执行任务的每一个阶段都通知给观察者即可。
15.2.1 接口定义
Observable 接口定义
public interface Observable { //任务 生命周期的枚举类型 enum Cycle { STARTED, RUNNING, DONE, ERROR } //获取当任务的生命周期状态 Cycle getCycle(); //定义启动线程的方法,主要作用是为了屏蔽Thread的其他方法 void start(); //定义线程的打断方法,作用于start方法一样,也是为了屏蔽 Thread 的其他方法 void interrupt(); }该接口主要是暴露给调用者使用的
TaskLifecycle接口定义
public interface TaskLifecycle<T> { //任务启动时会触发 onStart方法 void onStart(Thread thread); //任务正在运行时会 触发 onRunning 方法 void onRunning(Thread thread); // 任务结束会触发 onFinish 方法,其中result 是任务结束的后果 void onFinish(Thread thread, T result); //任务执行报错时会触发onError 方法 void onError(Thread thread, Exception e); //生命周期接口的空实现(Adapter) class EmptyLifecycle<T> implements TaskLifecycle<T> { @Override public void onStart(Thread thread) { } @Override public void onRunning(Thread thread) { } @Override public void onFinish(Thread thread, T result) { } @Override public void onError(Thread thread, Exception e) { } } }该接口定义了在任务执行的生命周期中会被触发的接口,其中EmptyLifecycle是一个空接口,主要是为了让使用者保持对Thread类的使用习惯。
Task函数接口定义 ```java @FunctionalInterface public interface Task
{ //任务执行接口,该接口允许有返回值 T call(); }
<a name="Y9yY1"></a>
#### 15.2.2 ObservableThread实现
ObservableThread是任务监控的关键,它继承Thread类和Observable接口,并在构造期间传入Task的具体代码。
```java
public class ObservableThread<T> extends Thread implements Observable {
private final TaskLifecycle<T> lifecycle;
private final Task<T> task;
private Cycle cycle;
public ObservableThread(Task<T> task) {
this(new TaskLifecycle.EmptyLifecycle<>(), task);
}
public ObservableThread(TaskLifecycle<T> lifecycle, Task<T> task) {
super();
if (task == null) {
throw new IllegalArgumentException("The task is required.");
}
this.lifecycle = lifecycle;
this.task = task;
}
@Override
public void run() {
//在执行线程单元的时候,分别触发响应的事件
this.update(Cycle.STARTED, null, null);
try {
final T result = this.task.call();
this.update(Cycle.DONE, result, null);
} catch (Exception e) {
this.update(Cycle.ERROR, null, e);
}
}
private void update(Cycle cycle, T result, Exception e) {
this.cycle = cycle;
if (lifecycle == null) {
return;
}
try {
switch (cycle) {
case STARTED:
this.lifecycle.onStart(currentThread());
break;
case RUNNING:
this.lifecycle.onRunning(currentThread());
break;
case DONE:
this.lifecycle.onFinish(currentThread(), result);
break;
case ERROR:
this.lifecycle.onError(currentThread(), e);
break;
}
} catch (Exception ex) {
if (cycle == Cycle.ERROR) {
throw ex;
}
}
}
@Override
public Cycle getCycle() {
return this.cycle;
}
public static void main(String[] args) {
final TaskLifecycle.EmptyLifecycle<String> lifecycle = new TaskLifecycle.EmptyLifecycle<>() {
@Override
public void onFinish(Thread thread, String result) {
System.out.println("The result is " + result);
}
};
final ObservableThread<String> observableThread = new ObservableThread<>(lifecycle, () -> {
try {
TimeUnit.SECONDS.sleep(5);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("finished done");
return "hello Observer";
});
observableThread.start();
}
}
16 Single Thread Execution设计模式
该模式是指在同一时刻只能有一个线程去访问共享资源,这就像独木桥一样每次只允许一人通行,简单来说,Single Thread Execution就是采用排他式的操作保证在同一时刻只能有一个线程访问共享资源。
16.1 机场过安检
16.1.1 非线程安全
模拟一个非线程安全的安检口类,旅客(线程)分别手持登机牌和身份证接受安检。
public class FlightSecurity {
private int count = 0;
//登机牌
private String boardingPass = "null";
//身份证
private String idCard = "null";
public void pass(String boardingPass, String idCard) {
this.boardingPass = boardingPass;
this.idCard = idCard;
this.count++;
check();
}
private void check() {
if (boardingPass.charAt(0) != idCard.charAt(0)) {
throw new RuntimeException("======exception===" + toString());
}
}
@Override
public String toString() {
return "The " + count +
" passengers ,boardingPass[" + boardingPass + "] ,idCard [" + idCard + "]";
}
}
测试
public class FlightSecurityTest {
static class Passengers extends Thread {
//机场安检类
private final FlightSecurity flightSecurity;
//旅客身份证
private final String idCard;
//登机牌
private final String boardingPass;
public Passengers(FlightSecurity flightSecurity, String idCard, String boardingPass) {
this.flightSecurity = flightSecurity;
this.idCard = idCard;
this.boardingPass = boardingPass;
}
@Override
public void run() {
while (true) {
flightSecurity.pass(boardingPass, idCard);
}
}
}
public static void main(String[] args) {
//定义三个旅客
final FlightSecurity flightSecurity = new FlightSecurity();
new Passengers(flightSecurity, "A123456", "AF123456").start();
new Passengers(flightSecurity, "B123456", "BF123456").start();
new Passengers(flightSecurity, "C123456", "CF123456").start();
}
}
看起来每一个客户都是合法的,因为每一个客户的身份和登机牌首字母都一样,运行上面的程序却出现了错误,而且错误情况还不太一样,大概分为两类
======exception===The 41388 passengers ,boardingPass[AF123456] ,idCard [A123456]
======exception===The 693 passengers ,boardingPass[BF123456] ,idCard [C123456]
首字母相同检查不能通过和首字母不相同检查不能通过。明明传入的参数全是首字母相同的。
16.1.2 问题分析
虽然参数的传递百分之百能保证就是这两个值,但是在pass方法中对boardingPass和idCard的赋值很可能有交叉的,不能保证原子操作。
1 首字母相同却未通过检查
这种情况的执行步骤如下。
- 线程A 调用pass方法,传入“A123456”“AF123456”并且对IDCard赋值成功,由于CPU调度器时间片的轮转,CPU的执行权归B线程所有。
- 线程B调用pass方法,传入“B123456”“BF123456”,并且对IDCard赋值成功,覆盖A线程赋值的idCard.
- 线程A重新获取CPU执行权,将boardingPass赋予AF123456,因此check无法通过
- 在输出toString之前,B线程成功将boardingPass覆盖为BF123456
2 为何会出现首字母不相同的情况
这种情况的执行顺序如下:
- 线程A 调用pass方法,传入“A123456”“AF123456”并且对IDCard赋值成功,由于CPU调度器时间片的轮转,CPU的执行权归B线程所有。
- 线程B调用pass方法,传入“B123456”“BF123456”,并且对IDCard赋值成功,覆盖A线程赋值的idCard.
- 线程A重新获取CPU执行权,将boardingPass赋予AF123456,因此check无法通过
- 线程A检查不通过,输出idCard=A123456 和boardingPass BF123456
16.1.3 线程安全
虽然线程传递给pass方法的两个参数能够百分之百地保证首字母相同,可是为flightSecurity中的属性赋值的时候会出现多个线程交错的情况,需要对恭喜资源增加同步保护。
改进如下
public synchronized void pass(String boardingPass, String idCard) {
this.boardingPass = boardingPass;
this.idCard = idCard;
this.count++;
check();
}
为什么只在pass方法增加synchronized关键字,check以及toString方法都有对共享资源的访问,难道它们不加同步就不会引起错误?由于chek方法在pass方法中执行的,pass方法加同步已经保证了single thread execution ,因此 check方法不需要增加同步,
何时适用single thread execution 模式?
- 多线程访问资源的时候,被synchronized同步的方法总是排他性的。
- 多个线程对某个类的状态发生改变的时候。
16.2 吃面问题
16.2.1 吃面引起的死锁
虽然使用synchronized关键字可以保证single thread execution,但是如果使用不当则会导致死锁的情况发生。比如A手持刀等待B 放下叉,而B 手持叉等待A放下刀。
public class Tableware {
//餐具名称
private final String toolName;
public Tableware(String toolName) {
this.toolName = toolName;
}
@Override
public String toString() {
return "Tool:" + toolName;
}
}
public class EatNoodleThread extends Thread {
private final String name;
//左手餐具
private final Tableware leftTool;
//右手边餐具
private final Tableware rightTool;
public EatNoodleThread(String name, Tableware leftTool, Tableware rightTool) {
this.name = name;
this.leftTool = leftTool;
this.rightTool = rightTool;
}
@Override
public void run() {
while (true) {
eat();
}
}
private void eat() {
synchronized (leftTool) {
System.out.println(name + " take up " + leftTool + " (left)");
synchronized (rightTool) {
System.out.println(name + " take up " + rightTool + " (right)");
System.out.println(name + " is eating now.");
System.out.println(name + " put down " + rightTool + " (right)");
}
System.out.println(name + " put down " + leftTool + " (left)");
}
}
public static void main(String[] args) {
final Tableware fork = new Tableware("fork");
final Tableware knife = new Tableware("knife");
new EatNoodleThread("A", fork, knife).start();
new EatNoodleThread("B", fork, knife).start();
}
}
16.2.2 解决吃面引起的死锁问题
虽然使用了Single Thread Execution对eat加以控制,但是还是出现了死锁。其主要原因是交叉锁导致两个线程之间相互等待彼此释放持有的锁。
为了解决交叉锁,我们需要将刀叉进行封装,使刀叉同属一个类中,改进一下
public class EatNoodleThread2 extends Thread {
private final String name;
private final TablewarePair tablewarePair;
public EatNoodleThread2(String name, TablewarePair tablewarePair) {
this.name = name;
this.tablewarePair = tablewarePair;
}
@Override
public void run() {
while (true) {
eat();
}
}
private void eat() {
synchronized (tablewarePair) {
System.out.println(name + " take up " + tablewarePair.getLeftTool() + " (left)");
System.out.println(name + " take up " + tablewarePair.getRightTool() + " (right)");
System.out.println(name + " is eating now.");
System.out.println(name + " put down " + tablewarePair.getRightTool() + " (right)");
System.out.println(name + " put down " + tablewarePair.getLeftTool() + " (left)");
}
}
public static void main(String[] args) {
final Tableware fork = new Tableware("fork");
final Tableware knife = new Tableware("knife");
final TablewarePair pair = new TablewarePair(fork, knife);
new EatNoodleThread2("A", pair).start();
new EatNoodleThread2("B", pair).start();
}
}
public class TablewarePair {
private final Tableware leftTool;
private final Tableware rightTool;
public TablewarePair(Tableware leftTool, Tableware rightTool) {
this.leftTool = leftTool;
this.rightTool = rightTool;
}
public Tableware getLeftTool() {
return leftTool;
}
public Tableware getRightTool() {
return rightTool;
}
}
17 读写锁分离设计模式
共享资源在多个线程中同时进行读操作是不会引起冲突
| 线程 | 读 | 写 |
|---|---|---|
| 读 | 不冲突 | 冲突 |
| 写 | 冲突 | 冲突 |
如果对某个资源读的操作明显多于写的操作,那么多线程读时并不加锁,很明显对程序性能的提升会有很大的帮助。
17.2 读写分离设计
17.2.1 接口定义
1 lock接口定义
/**
* Lock接口定了了锁的基本操作,加锁和解锁,显示锁的操作强烈建议
* 与try finally语句块一起使用,
*/
public interface Lock {
//获取显示锁,没有获得锁的线程将被阻塞
void lock() throws InterruptedException;
//释放锁
void unlock();
}
2 ReadWriteLock接口定义
public interface ReadWriteLock {
// 工厂方法,创建 ReadWriteLock
static ReadWriteLock readWriteLock() {
return new ReadWriteLockImpl();
}
static ReadWriteLock readWriteLock(boolean preferWriter) {
return new ReadWriteLockImpl(preferWriter);
}
//创建reader 锁
Lock readLock();
//创建 write 锁
Lock writeLock();
//获取当前有多少线程正在执行写操作,最多是1个。
int getWritingWriters();
//获取 当前 线程有 多少线程等待获取写锁 而导致阻塞
int getWaitingWriters();
//获取当前 有多少 线程正等待获取 reader锁
int gerReadingReaders();
}
ReadWriteLock虽然名字中有lock,但是它并不是lock,它主要是用于创建read lock和write lock的,并且提供了查询当前有多少个read 和write 以及waiting中的write,根据之前分析,如果reader的个数大于0,那就意味着writer的个数等于0,反之writer的个数大于0(事实上write最多只能为1),则reader的个数等于0,由于读和写、写和写之间都存在冲突,因此这样的数字关系也就很正常。
17.2.2程序实现
public class ReadWriteLockImpl implements ReadWriteLock {
private final Object MUTEX = new Object();
//当前有多少个线程正在写入
private int writingWrties = 0;
// 当前有多少个线程正在等待写入
private int waitingWriters = 0;
// 当前有多少个线程正在read
private int readingReaders = 0;
//read 和write 的偏好设置
private boolean preferWriter;
public ReadWriteLockImpl(boolean preferWriter) {
this.preferWriter = preferWriter;
}
public ReadWriteLockImpl() {
this(true);
}
public Object getMUTEX() {
return MUTEX;
}
public boolean isPreferWriter() {
return preferWriter;
}
//创建 write lock
@Override
public Lock readLock() {
return new ReadLock(this);
}
@Override
public Lock writeLock() {
return new WriteLock(this);
}
//使写的数量增加
void incrementWritingWriters() {
this.waitingWriters++;
}
// 使等待 写入的数量增加
void incrementWaitingWriters() {
this.waitingWriters++;
}
//使读的线程数增加
void incrReadingReaders() {
this.readingReaders++;
}
//使写线程的数量减少
void decrWritingWriters() {
this.writingWrties--;
}
void decrWaitingWriters() {
this.waitingWriters--;
}
//使读线程的数量减少
void decrReadingReaders() {
this.readingReaders--;
}
//获取 当前有多少个 正在进行写操作
public int getWritingWrties() {
return writingWrties;
}
public int getReadingReaders() {
return readingReaders;
}
@Override
public int getWritingWriters() {
return this.writingWrties;
}
@Override
public int getWaitingWriters() {
return this.waitingWriters;
}
@Override
public int gerReadingReaders() {
return this.readingReaders;
}
//设置偏向锁
void changePrefer(boolean preferWriter) {
this.preferWriter = preferWriter;
}
}
public class ReadLock implements Lock {
private final ReadWriteLockImpl readWriteLock;
public ReadLock(ReadWriteLockImpl readWriteLock) {
this.readWriteLock = readWriteLock;
}
@Override
public void lock() throws InterruptedException {
// 使用Mutex 作为锁
synchronized (readWriteLock.getMUTEX()) {
// 若 此时有线程正在进行写操作,或者有线程正在等待偏向锁的表示为true,就会无法获得锁,只能被挂起。
while (readWriteLock.getWritingWriters() > 0
|| (readWriteLock.isPreferWriter()
&& readWriteLock.getWaitingWriters() > 0)) {
readWriteLock.getMUTEX().wait();
}
//成功获取锁,使得读的数量 增加
readWriteLock.incrReadingReaders();
}
}
@Override
public void unlock() {
// 使用Mutex 作为锁,并且进行同步
synchronized (readWriteLock.getMUTEX()) {
//释放锁的过程 就是使得当前 reading 的数量减一
//将perferWriter 设置为true ,可以使得writer线程获得更多的机会。
//通知唤醒与 Mutex 关联Monitor waitset中的数量
readWriteLock.decrReadingReaders();
readWriteLock.changePrefer(true);
readWriteLock.getMUTEX().notifyAll();
}
}
}
public class WriteLock implements Lock {
private final ReadWriteLockImpl readWriteLock;
public WriteLock(ReadWriteLockImpl readWriteLock) {
this.readWriteLock = readWriteLock;
}
@Override
public void lock() throws InterruptedException {
synchronized (readWriteLock.getMUTEX()) {
try {
//首先使等待写入锁的数字加一
readWriteLock.incrementWaitingWriters();
//如果有其他线程正在进行读操作,或者写操作,那么当前线程将被挂起
while (readWriteLock.getReadingReaders() > 0
|| readWriteLock.getWritingWriters() > 0) {
readWriteLock.getMUTEX().wait();
}
} finally {
//成功获取了写入锁,使得等待获取写入锁的计数器减一
readWriteLock.decrWaitingWriters();
}
//将正在写入的线程数量加一
readWriteLock.incrementWritingWriters();
}
}
@Override
public void unlock() {
synchronized (readWriteLock.getMUTEX()) {
// 减少正在写入锁的计数器
readWriteLock.decrWritingWriters();
//将偏好锁状态修改为false,可以使得读锁被最快的获取
readWriteLock.changePrefer(false);
//通知唤醒其他在 Mntex monitor waitset 中的 线程
readWriteLock.getMUTEX().notifyAll();
}
}
}
17.3 读写锁的使用
public class ShareData {
//定义 共享数据(资源)
private final List<Character> container = new ArrayList<>();
//构造ReadWriterLock
private final ReadWriteLock readWriteLock = ReadWriteLock.readWriteLock();
//创建读锁
private final Lock readLock = readWriteLock.readLock();
//创建写入锁
private final Lock writeLock = readWriteLock.writeLock();
private final int length;
public ShareData(int length) {
this.length = length;
for (int i = 0; i < length; i++) {
container.add(i, 'c');
}
}
public char[] read() throws InterruptedException {
try {
// 使用读锁
readLock.lock();
final char[] newBuffer = new char[length];
for (int i = 0; i < length; i++) {
newBuffer[i] = container.get(i);
}
slowly();
return newBuffer;
} finally {
readLock.unlock();
}
}
public void write(char c) throws InterruptedException {
try {
//使用写锁
writeLock.lock();
for (int i = 0; i < length; i++) {
this.container.add(i, c);
}
slowly();
} finally {
writeLock.unlock();
}
}
private void slowly() {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
}
}
public class ReadWriteLockTest {
private final static String text = "Thisistheexampleforreadwritelock";
public static void main(String[] args) {
// 定义共享数据
final ShareData shareData = new ShareData(50);
//2个线程进行数据写操作
for (int i = 0; i < 2; i++) {
new Thread(() -> {
for (int j = 0; j < text.length(); j++) {
final char c = text.charAt(j);
try {
shareData.write(c);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
//10个线程进行数据读操作
for (int i = 0; i < 10; i++) {
new Thread(() -> {
while (true) {
try {
System.out.println(Thread.currentThread() + " read " + new String(shareData.read()));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
}
18 不可变对象设计模式
18.1 线程安全
所谓共享的资源,是指在多个线程同时进行访问的情况下,各线程都会使其发生变化,而线程安全性的主要目的就在于受控的并发访问中防止数据发生变化,除了使用synchronized关键字同步对资源的写操作之外,还可以在线程之间不共享资源状态,甚至将资源的状态设置为不可变对象,这样就可以不用依赖于synchronized关键字的约束。
18.2 不可变对象的设计
18.2.1 非线程安全的累加器
不可变对象的核心的地方在于不给外部修改共享资源的机会,这样就避免多线程情况下的数据冲突而导致的数据不一致的情况,又能避免因为对锁的依赖而带来的性能降低。
//线程不安全的累加器
public class IntegerAccumulator {
private int init;
public IntegerAccumulator(int init) {
this.init = init;
}
public int add(int i) {
this.init += i;
return this.init;
}
public int getValue() {
return init;
}
public static void main(String[] args) {
//定义累加器
final IntegerAccumulator accumulator = new IntegerAccumulator(0);
//定义三个线程分别启动
IntStream.range(0, 3).forEach(i -> {
new Thread(() -> {
int cur = 0;
while (true) {
//首先获取old value
final int oldValue = accumulator.getValue();
//调动计算方法
final int result = accumulator.add(cur);
System.out.println(oldValue + " + " + cur + "=" + result);
if (cur + oldValue != result) {
System.out.println("Error");
}
cur++;
slowly();
}
}).start();
});
}
private static void slowly() {
try {
TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(5));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
18.2.3 不可变的累加器对象设计
public final class IntegerAccumulator1 {
private final int init;
public IntegerAccumulator1(int init) {
this.init = init;
}
public int getValue() {
return init;
}
//构造累加器,需要用到另外一个accumulator和初始值
public IntegerAccumulator1(IntegerAccumulator1 accumulator, int init) {
this.init = accumulator.getValue() + init;
}
//每一次相加都会产生一个新的
public IntegerAccumulator1 add(int i) {
return new IntegerAccumulator1(this, i);
}
public static void main(String[] args) {
//定义累加器
final IntegerAccumulator1 accumulator = new IntegerAccumulator1(0);
//定义三个线程分别启动
IntStream.range(0, 3).forEach(i -> {
new Thread(() -> {
int cur = 0;
while (true) {
//首先获取old value
final int oldValue = accumulator.getValue();
//调动计算方法
final int result = accumulator.add(cur).getValue();
System.out.println(oldValue + " + " + cur + "=" + result);
if (cur + oldValue != result) {
System.out.println("Error===========================");
}
cur++;
slowly();
}
}).start();
});
}
private static void slowly() {
try {
TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(5));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
重构后的IntegerAccumulator1 ,使用了final修饰的目的是为了防止由于继承重写而导致失去线程安全性,另外由于属性init被final修饰不允许线程对其进行改变,早构造函数中赋值后将不再会改变。
20 Guarded Suspension设计模式
是确保挂起的意思,当线程在访问某个对象时,发现条件不满足,就暂时挂起等待条件满足时再次访问,这一点和Balking设计模式刚好相反(Balking在遇到条件不足时会放弃),
Guarded Suspension设计模式是很多设计模式的基础,比如生产者消费者模式,Worker Thread设计模式,等。同样在Java并发包中的BlockingQueue中也大量使用到了Guarded Suspension设计模式
public class GuardedSuspensionQueue {
//定义存放 Integer 类型的 Queue
private final LinkedList<Integer> queue = new LinkedList<>();
//定义最大容量为 100
private final int LIMIT=100;
//往queue中插入数据,如果 元素超过了最大容量,则会陷入阻塞
public void offer(Integer data)throws InterruptedException{
synchronized (this){
//判断queue 的当前元素是否超过了LIMIT
while (queue.size()>=LIMIT){
//挂起当前线程,使其陷入阻塞
this.wait();
}
//插入元素,并唤醒take线程
queue.addLast(data);
this.notifyAll();
}
}
//从队列中获取元素,如果队列为空,则会使当前线程阻塞
public Integer take() throws InterruptedException{
synchronized (this){
if (queue.isEmpty()){
this.wait();
}
//通知offer可以继续插入数据
this.notifyAll();
return queue.removeFirst();
}
}
}
Guarded Suspension模式是一个非常基础的设计模式,它主要关注的是当某个条件(临界值)不满足时将操作的线程正确地挂起,以防止数据出现不一致或者操作超过临界值的控制范围。
21 线程上下文设计模式
21.1 什么是上下文
在设计上下文时,除了要考虑它的全局唯一性(单例模式保证),还要考虑有些成员只能被初始化一次。
21.3 ThreadLocal详解
ThreadLocal为每个使用该变量的线程提供了独立的版本,可以做到线程间的数据隔离,每一个线程都可以访问各自内部的副本变量。
21.3.1 ThreadLocal的使用场景以及注意事项
一般以下情况中会使用到ThreadLocal。
- 在进行对象跨层传递的时候,可以考虑使用ThreadLocal,避免方法多次传递,打破层次间的约束。
- 线程间数据隔离,
- 进行事物操作,用于存储线程事物信息。
ThreaLocal并不是解决多线程下共享资源的技术,一般情况下,每一个线程的ThreadLocal存储的都是一个全新的对象,如果在多线程的Thread-local存储了一个对象的引用,那么其还将面临资源竞争,数据不一致性等并发问题。
public class ThreadLocalExample {
public static void main(String[] args) {
ThreadLocal<Integer> tlocal = new ThreadLocal<>();
IntStream.range(0, 10).forEach(i -> {
new Thread(() -> {
try {
//每个线程都会设置爱 tlocal,但是彼此之间的数据是独立的
tlocal.set(i);
System.out.println(Thread.currentThread().getName() + " set i " + tlocal.get());
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName() + " get i " + tlocal.get());
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
});
}
}
22 Balking设计模式
22.1 什么是Balking设计
比如你去饭店吃饭,吃到途中,想再点一个菜,于是你举起手示意服务员,其中一个服务员看到你举手正准备走过来,发现距离你比较近的服务员已经准备要受理你的请求于是中途放弃了。
再比如,用word编写文档的时候,每次文字编辑都代表文档的状态法发生了改变,除了我们可以使用ctrl+s手动保存之外,world软件本身也定期触发自动保存,如果world自动保存文档的线程正准备执行保存动作时,恰巧我们进行了主动保存,那么自动保存就放弃了。
22.2 Balking模式之文档编辑
22.2.1 Document
文档类中有两个主要的方法,分别是保存文档和编辑文档
//代表正在编辑的文档
public class Document {
//如果文档发生变化,change设置为true
private boolean changed = false;
//一次保存的内容,可以理解为缓存
private List<String> content = new ArrayList<>();
private final FileWriter writer;
//自动保存文档的线程
private static AutoSavaThread autoSavaThread;
private Document(String documentPath, String documentNames) throws IOException {
this.writer = new FileWriter(new File(documentPath, documentNames));
}
//创建文档,顺便启动自动保存文档的线程
public static Document create(String documentPath, String documentNames) throws IOException {
final Document document = new Document(documentPath, documentNames);
autoSavaThread = new AutoSavaThread(document);
autoSavaThread.start();
return document;
}
//文档的编辑,其实就是往content队列中提交字符串
public void edit(String content) {
synchronized (this) {
this.content.add(content);
//文档改变
this.changed = true;
}
}
//文档关闭的时候,首先中断自动保存线程,然后关闭 writer释放资源
public void close() throws IOException {
autoSavaThread.interrupt();
writer.close();
}
//save 用于手动进行文档保存
public void save() throws IOException {
synchronized (this) {
//如果文档已经保存了,直接返回
if (!changed) {
return;
}
System.out.println(Thread.currentThread().getName() + " execute the save action");
//将内容写入文档中
for (String s : content) {
this.writer.write(s);
//获取不同系统的换行符
this.writer.write(System.lineSeparator());
}
this.writer.flush();
this.changed=false;//表明此刻没有新的内容编辑
this.content.clear();
}
}
}
在上述代码中:
- edit 和save 方法进行同步,其目的是防止文档在保存的过程中如果遇到新的内容编辑器引起的共享资源的冲突问题。
22.2.2 AutoSaveThread
public class AutoSavaThread extends Thread {
private final Document document;
public AutoSavaThread(Document document) {
super("DocumentAutoSaveThread");
this.document = document;
}
@Override
public void run() {
while (true) {
try {
//每隔1秒保存一次
document.save();
TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
break;
}
}
}
}
22.2.3 DocumentEditThread
AutoSavaThread用于文档自动保存,那么DocumentEditThread线程则类似于主动编辑文档的作者。
//该线程代表的是主动进行文档编辑,为了增加交互性,使用scanner
public class DocumentEditThread extends Thread {
private final String documentPath;
private final String documentName;
private final Scanner scanner = new Scanner(System.in);
public DocumentEditThread(String documentPath, String documentName) {
super("DocumentEditThread");
this.documentPath = documentPath;
this.documentName = documentName;
}
@Override
public void run() {
int times = 0;
try {
final Document document = Document.create(documentPath, documentName);
while (true) {
//获取用户的键盘输入
final String text = scanner.next();
if ("quit".equals(text)) {
document.close();
break;
}
//将内容编辑到document
document.edit(text);
//用户输入5次自动保存
if (times == 5) {
document.save();
times = 0;
}
times++;
}
} catch (IOException e) {
throw new RuntimeException(e.getMessage());
}
}
}
23 Latch设计模式
门阀设计模式,该模式指定一个屏障,只有所有条件都达到满足的时候,门阀才能打开。
23.2 CountDownLatch程序实现
23.2.1 无限等待的Latch
定义抽象类,其中属性limit至关重要,当limit降低到0时会打开阀门
public abstract class Lathch {
//用于控制多少个线程完成任务时才能打开阀门
protected int limit;
public Lathch(int limit) {
this.limit = limit;
}
//该方法会使得当前线程一直等待,直到所有的线程都完成工作,被阻塞的线程是运行被中断的。
public abstract void await() throws InterruptedException;
//当前任务完成工作之后调用该方法使得计数器减一
public abstract void countDown();
//获取当前还有多少个线程没有完成任务
public abstract int getUnarrived();
}
1 无限等待CountDownLatch实现
//无限等待CountDowLatch
public class CountDwonLath extends Lathch {
public CountDwonLath(int limit) {
super(limit);
}
@Override
public void await() throws InterruptedException {
synchronized (this) {
//当limit 大于0 ,当前线程进入阻塞状态
while (limit > 0) {
this.wait();
}
}
}
@Override
public void countDown() {
synchronized (this) {
if (limit <= 0) {
throw new IllegalStateException("all of task already arrived");
}
//limit 减一,并通知阻塞线程
limit--;
this.notifyAll();
}
}
@Override
public int getUnarrived() {
//返回有多少任务还未完成任务
return limit;
}
}
2 程序员齐心协力打开门阀
public class ProgrammerTravel extends Thread {
//门阀
private final Lathch lathch;
//程序员
private final String programmer;
//交通工具
private final String transportation;
public ProgrammerTravel(Lathch lathch, String programmer, String transportation) {
this.lathch = lathch;
this.programmer = programmer;
this.transportation = transportation;
}
public static void main(String[] args) throws InterruptedException {
final CountDwonLath latch = new CountDwonLath(4);
new ProgrammerTravel(latch, "Alex", "Bus").start();
new ProgrammerTravel(latch, "Tom", "Walking").start();
new ProgrammerTravel(latch, "Jack", "Subway").start();
new ProgrammerTravel(latch, "sl", "Bicycle").start();
latch.await();
System.out.println(" all of programmer arrive ");
}
@Override
public void run() {
System.out.println(programmer + " start take the transportation:" + transportation);
try {
TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(5));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(programmer + " arrived by " + transportation);
//计数器减一
lathch.countDown();
}
}
23.2.2 有超时设置的latch
/**
* 超时机制
* 为了方便计算,我们将所有时间都换算成了纳秒,但是wait方法只能接受毫秒,因此该方法还涉及了时间的换算。
* 如果的等待时间不足1毫秒,那么 会抛出超时异常。
* @param unit
* @param time
* @throws InterruptedException
*/
@Override
public void await(TimeUnit unit, long time) throws InterruptedException, WaitTimeoutException {
if (time <= 0) {
throw new IllegalStateException("the time is invalid ");
}
long reaminingNanos = unit.toNanos(time);
//等待任务将在 endNanos纳秒后超时
final long endNamos = System.nanoTime() + reaminingNanos;
synchronized (this) {
while (limit > 0) {
//如果超时则抛出Wait
if (TimeUnit.NANOSECONDS.toMillis(reaminingNanos) <= 0) {
throw new WaitTimeoutException("the wait time over specify time");
}
//等待 remainingNanos,在等待的过程中可能被中断,重新计算 remainingNanos
this.wait(TimeUnit.NANOSECONDS.toMillis(reaminingNanos));
reaminingNanos = endNamos - System.nanoTime();
}
}
}
24 Thread-Per-Message设计模式
24.1 什么是Thread-Per-Message模式
Thread-Per-Message的意思是为每一个消息的处理开辟一个线程使得消息能够并发方式进行处理,从而提高系统整体吞吐量。
25 Two Phase Termination设计模式
终止处理又被称为线程结束的第二个阶段,而受理终止要求呗称为线程结束的第一个阶段。
在进行两阶段终结的过程中需要考虑
- 第二阶段终止保证安全,比如设计对共享资源的操作。
- 要百分百地确保线程结束,
