Reentrantlock
ReentrantLock 和 synchronized 的区别
- 锁分类
- synchronized 是非公平锁
- ReentrantLock 可自定义锁是公平还是非公平(具体原理后面解释)
- 灵活性
- synchronized 使用形式非常固定,就是单纯的加锁
- ReentrantLock 可以尝试获取锁(tryLock)、带超时时间的获取锁、可响应中断(lockInterruptibly)
- 可重入(线程可重复多次加锁)
- synchronized、ReentrantLock 都是可重入的。
- 使用方式
- synchronized 修饰方式加锁,自动释放锁(没有显示的unlock操作)
- ReentrantLock 显示加锁,显示释放锁(有lock、unlock操作)。
- 实现原理
- synchronized 基于monitor(监视器)模式
- ReentrantLock 基于 AQS
ReentrantLock 场景
1.可尝试获取锁
比如我有个定时任务,每间隔5秒钟执行一次的任务。但是为了防止前一次任务还没结束,后一次任务又触发了,我就在每次任务执行前先尝试获取一下锁,调用 lock.tryLock(),如果获取锁失败,直接跳过这次任务,等下一次任务执行。
2.公平锁
ReentrantLock是公平的。
3.带条件等待
使用synchronized 结合Object上的 wait和 notify方法可以实现线程间的等待通知机制。但是ReentrantLock结合Condition接口同样可以实现这个功能。而且相比synchronized 使用起来更清晰。
synchronized 只解决了有和没有的问题,但是锁的场景和多样性方面还很欠缺,例如: 带超时时间的获取锁、获取锁非阻塞(尝试获取锁)、带等待条件的请求锁。
AQS
AQS是AbstractQueuedSynchronizer 的缩写,抽象队列同步器,我们很多同步工具ReentrantLock、Semaphore、CountDownLatch、ReadWriteLock,CyclicBarrier 都是基于AQS 实现的。
AQS 基本的原理是它提供了一套共享资源的访问的规范,通过CLH(一个双向链表)的方式把线程等待管理起来。
它底层采用的是状态标志位(state变量)+FIFO队列的方式来记录获取锁、释放锁、竞争锁等一系列锁操作;
对于AQS而言,其中的state变量可以看做是锁,队列采用的是先进先出的双向链表,state共享状态变量表示锁状态,内部使用CAS对state进行原子操作修改来完成锁状态变更(锁的持有和释放)。
AQS使用一个Volatile的int类型的成员变量来表示同步状态,通过内置的FIFO队列来完成资源获取的排队工作,通过CAS完成对State值的修改。 在获取同步状态时,同步器维护一个同步队列,获取状态失败的线程都会被构造为节点并加入到队列中,并在在队列中进行自旋,其中第一个 Node 称为 Dummy(哑元)或哨兵,用来占位,并不关联线程,移出队列的条件是前驱节点为哨兵节点并且成功获取到了同步状态。在释放同步队列时,同步器调用tryRelease()方法释放同步状态,然后唤醒哨兵节点的后继节点。
线程间通信的几种实现方式
方式一:使用 volatile 关键字
方式二:使用Object类的wait() 和 notify() 方法
方式三:使用JUC工具类 CountDownLatch
方式五:基本LockSupport实现线程间的阻塞和唤醒
1.synchornized原理
基础原理
Java对象头
以 32 位虚拟机为例,普通对象的对象头结构如下,其中的Klass Word为指针,指向对应的Class对象;
其中 Mark Word 结构为:
所以一个对象的结构如下:
synchronized 底层实现是在编译之后在同步代码块前后加上 monitorenter 和 monitorexit 字节码指令,它依赖操作系统底层互斥锁的实现。它的作用主要是实现原子性操作和解决共享变量内存可见性问题。
当执行 monitorenter 指令时,线程试图获取锁也就是获取对象监视器 monitor 的持有权。如果对象没有被锁定或者已经获得了锁,锁的计数器+1。 此时其他竞争锁的线程则会进⼊等待队列中 。 执行monitorexit指令时则会把计数器-1,当计数器值为0时,则锁释放,处于等待队列中的线程再继续竞争锁
synchronized是排它锁,当⼀个线程获得锁之后,其他线程必须等待该线程释放锁后才能获得锁,而且 由于Java中的线程和操作系统原生线程是⼀⼀对应的,线程被阻塞或者唤醒时时会从用户态切换到内核态,这种转换非常消耗性能。
Monitor 原理
Mointor可以被翻译为管程,每个Java对象都可以关联一个Mointor,如果使用synchronized给对象上锁(重量级),该对象头的Mark Word中就被设置为指向Monitor对象的指针。
- 刚开始时Monitor中的Owner为null
- 当Thread-2 执行synchronized(obj){}代码时就会将Monitor的所有者Owner 设置为 Thread-2,上锁成功,Monitor中同一时刻只能有一个Owner。
- 当Thread-2 占据锁时,如果线程Thread-3,Thread-4也来执行synchronized(obj){}代码,就会进入EntryList(阻塞队列)中变成BLOCKED状态。
- Thread-2 执行完同步代码块的内容,然后唤醒 EntryList 中等待的线程来竞争锁,竞争时是非公平的。
- 图中 WaitSet 中的 Thread-0,Thread-1 是之前获得过锁,但条件不满足进入 WAITING 状态的线程,后面讲wait-notify 时会分析
P.S:synchronized必须是进入同一个对象的monitor才有上述效果
进阶原理
1.轻量级锁
轻量级锁使用场景:如果一个对象虽然有多线程进行访问,但是多线程的访问时间是错开的(也就是没有竞争),这就可以用来轻量级锁来进行优化。轻量级锁对使用者是透明的,即语法仍然是synchronized,假设有两个方法同步块,利用同一个对象加锁
static final Object obj = new Object();
public static void method1() {
synchronized( obj ) {
// 同步块 A
method2();
}
}
public static void method2() {
synchronized( obj ) {
// 同步块 B
}
}
- 创建锁记录(Lock Record)对象,每个线程的栈帧都会包含一个锁记录的结构,内部可以存储锁定对象的Mark Word
- 让锁记录的object reference指向锁对象,并尝试用cas替换Object的Mark Word。将Mark Word的值存入锁记录
- 如果CAS替换成功,对象头中存储锁记录的地址和状态,表示由该线程给对象加锁。
- 如果CAS失败,有两种情况:
- 如果是其他线程已经持有了该Object的轻量级锁,这时表明有竞争,进入锁膨胀的过程。
- 如果是自己执行了synchronized锁重入,那么再添加一条Lock Record作为重入的计数
- 当线程退出synchronized代码块的时候,如果有取值为null的锁记录,表示有重入,这时重置锁记录,表示重入计数减一
- 当线程退出synchronized代码块的时候,如果获取的锁记录取值不为 null,那么使用cas将Mark Word的值恢复给对象
- 成功则解锁成功
- 失败,则说明轻量级锁进行了锁膨胀或已经升级为重量级锁,进入重量级锁解锁流程
2.锁膨胀
如果在尝试加轻量级锁的过程中,cas操作无法成功,这是有一种情况就是其它线程已经为这个对象加上了轻量级锁,这是就要进行锁膨胀,将轻量级锁变成重量级锁。
- 当 Thread-1 进行轻量级加锁时,Thread-0 已经对该对象加了轻量级锁
- 这时 Thread-1 加轻量级锁失败,进入锁膨胀流程,即为对象申请Monitor锁,让Object指向重量级锁地址,然后自己进入Monitor 的EntryList 变成BLOCKED状态
- 当Thread-0 推出synchronized同步块时,使用cas将Mark Word的值恢复给对象头,失败,那么会进入重量级锁的解锁过程,即按照Monitor的地址找到Monitor对象,将Owner设置为null,唤醒EntryList 中的Thread-1线程
3.自旋优化
重量级锁竞争的时候,还可以使用自旋来进行优化,如果当前线程自旋成功(即在自旋的时候持锁的线程释放了锁),那么当前线程就可以不用进行上下文切换就获得了锁
自旋会占用 CPU 时间,单核 CPU 自旋就是浪费,多核 CPU 自旋才能发挥优势。在 Java 6 之后自旋锁是自适应的,比如对象刚刚的一次自旋操作成功过,那么认为这次自旋成功的可能性会高,就多自旋几次;反之,就少自旋甚至不自旋,
4.偏向锁
在轻量级的锁中,我们可以发现,如果同一个线程对同一个对象进行重入锁时,也需要执行CAS操作,这是有点耗时,那么java6开始引入了偏向锁的,只有第一次使用CAS时将对象的Mark Word头设置为入锁线程ID,之后这个入锁线程再进行重入锁时,发现线程ID是自己的,那么就不用再进行CAS了
**1.撤销偏向锁-hashcode方法
当调用对象的hashcode方法的时候就会撤销这个对象的偏向锁,因为使用偏向锁时没有位置存hashcode的值了(轻量级锁和重量级锁会把hashcode存在栈帧中和monitor对象中)
**2.撤销偏向锁-其它线程使用对象
当有其它线程使用偏向锁对象时,会将偏向锁升级为轻量级锁
**3.撤销偏向锁 - 调用 wait/notify
会使对象的锁变成重量级锁,因为wait/notify方法之后重量级锁才支持
2.共享模型之内存
2.1 Java内存模型
JMM即Java Memory Model,JMM定义了线程和主内存之间的抽象关系:线程之间的共享变量存储在主内存中,每个线程都有一个私有的本地内存,本地内存中存储了该线程读/写共享变量的副本。本地内存是一个抽象概念,它涵盖了缓存、写缓冲区、寄存器以及其他的硬件。
JMM体现在以下几个方面:
- 原子性:保证指令不会受到线程上下文切换的影响
- 可见性:保证指令不会受cpu缓存的影响
- 有序性:保证指令不会受到cpu指令并行优化的影响
happens-before
happens-before来保证操作之间的可见性,在JMM中,如果一个操作的执行结果需要对另一个操作可见(并不意味着前一个操作必须要在后一个操作之前执行),那么这两个操作之间必须要存在happens-before关系。这两个操作可以在一个线程之内,也可以在不同线程之间。
happens-before规则:
- 一个线程中的每个操作,happens-before于该线程中的任意后续操作。
- 对一个锁的解锁,happens-before于随后对这个锁的加锁。
- 对一个volatile域的写,happens-before于任意后续对这个volatile的读
- A happens-before B,且B happens-before C, 则A happens-before C
2.2 volatile原理
volatile 的底层实现原理是内存屏障,Memory Barrier(Memory Fence)
- 对 volatile 变量的写指令后会加入写屏障 ,保证在该屏障之前的,对共享变量的改动,都会同步到主存中
- 对 volatile 变量的读指令前会加入读屏障 ,保证在该屏障之后,对共享变量的读取,加载的都是主存中最新的数据
2.2 volatile应用场景(单例模式)
double-checked locking 解决
public final class Singleton {
private Singleton() {}
private static volatile Singleton INSTANCE;
public static Singleton getInstance() {
if (INSTANCE == null) {
synchronized (Singleton.class) {
if (INSTANCE == null) {
INSTANCE = new Singleton();
}
}
}
return INSTANCE;
}
}
以上的实现特点是:
- 懒惰实例化
- 首次使用getInstance()才使用synchronized加锁,后续无需加锁
- 有隐含的,但很关键的是:第一个if使用了INSTANCE变量,但是在同步代码块之外
1,2两个线程同时经过第一个if判断后进入,1已经初始化完释放锁,2获得锁进入同步代码块内,如果不加if判断,就会重新创建对象,破坏单例。
(构造方法被排序到赋值操作的后面),用volatile保证synchronized代码块外的共享变量的重排序的问题。
2.2 单例模式的线程安全问题
单例模式有很多实现方法,饿汉、懒汉、静态内部类、枚举类,试分析每种实现下获取单例对象(即调用 getInstance)时的线程安全,并思考注释中的问题
饿汉式:类加载就会导致该单实例对象被创建
懒汉式:类加载不会导致该单实例对象被创建,而是首次使用该对象时才会创建
实现1:饿汉式
// 问题1:为什么加 final,防止子类继承后更改,破坏单例
public final class Singleton {
// 问题3:为什么设置为私有? 只在此类中使用,保证不会再其他地方创建出新的对象。
// 是否能防止反射创建新的实例?不能。
private Singleton() {}
// 问题4:这样初始化是否能保证单例对象创建时的线程安全?
//没有,静态成员变量,是jvm在类加载阶段就进行了初始化,jvm保证了此操作的线程安全性
//.instance需要在调用getInstance时候被初始化,只有static的成员才能在没有创建对象时进行初始化。
//且类的静态成员在类第一次被使用时初始化后就不会再被初始化,保证了单例。
private static final Singleton INSTANCE = new Singleton();
// 问题5:为什么提供静态方法而不是直接将 INSTANCE 设置为 public, 说出你知道的理由。
//1.提供更好的封装性;2.提供范型的支持
public static Singleton getInstance() {
return INSTANCE;
}
}
//getInstance()调用为静态方法,INSTANCE由该方法返回,如果INSTANCE非静态,无法被getInstance()调用
//为什么在⼀个静态方法内调用⼀个非静态成员为什么是非法的?
//静态⽅法可以不通过对象进行调用,而通过类名进行访问。
实现2:饿汉式
// 问题1:枚举单例是如何限制实例个数的:创建枚举类的时候就已经定义好了,每个枚举常量其实就是枚举类的一个静态成员变量
// 问题2:枚举单例在创建时是否有并发问题:没有,这是静态成员变量
// 问题3:枚举单例能否被反射破坏单例:不能
// 问题4:枚举单例能否被反序列化破坏单例:枚举类默认实现了序列化接口,枚举类已经考虑到此问题,无需担心破坏单例
// 问题5:枚举单例属于懒汉式还是饿汉式:饿汉式
// 问题6:枚举单例如果希望加入一些单例创建时的初始化逻辑该如何做:加构造方法就行了
enum Singleton {
INSTANCE;
}
实现3:懒汉式
public final class Singleton {
private Singleton() { }
private static Singleton INSTANCE = null;
// 分析这里的线程安全, 并说明有什么缺点:
//synchronized加载静态方法上,可以保证线程安全。缺点就是锁的范围过大
public static synchronized Singleton getInstance() {
if( INSTANCE != null ){
return INSTANCE;
}
INSTANCE = new Singleton();
return INSTANCE;
}
}
实现4:DCL 懒汉式
public final class Singleton {
private Singleton() { }
// 问题1:解释为什么要加 volatile ?为了防止重排序问题
private static volatile Singleton INSTANCE = null;
// 问题2:对比实现3, 说出这样做的意义:提高了效率
public static Singleton getInstance() {
if (INSTANCE != null) {
return INSTANCE;
}
synchronized (Singleton.class) {
// 问题3:为什么还要在这里加为空判断, 之前不是判断过了吗?这是为了第一次判断时的并发问题。
if (INSTANCE != null) { // t2
return INSTANCE;
}
INSTANCE = new Singleton();
return INSTANCE;
}
}
}
//如果在第一次t1判断INSTANCE为null,进入synchronized同步代码块,但还没有创建对象,
//此时t2在进行第一次判断时因为INSTANCE为null,也会进入同步代码块,如果不加第二次判断就会生成新的对象
实现5:
public final class Singleton {
private Singleton() { }
// 问题1:属于懒汉式还是饿汉式:懒汉式,这是一个静态内部类。类加载本身就是懒惰的,在没有调用getInstance方法时是没有执行LazyHolder内部类的类加载操作的。
private static class LazyHolder {
static final Singleton INSTANCE = new Singleton();
}
// 问题2:在创建时是否有并发问题,这是线程安全的,类加载时,jvm保证类加载操作的线程安全
public static Singleton getInstance() {
return LazyHolder.INSTANCE;
}
}
3.共享模型之无锁
管程即monitor是阻塞式的悲观锁实现并发控制,这章我们将通过非阻塞式的乐观锁的来实现并发控制
3.2CAS和volatile
@Override
public void withdraw(Integer amount) {
// 核心代码
// 需要不断尝试,直到成功为止
while (true){
// 比如拿到了旧值 1000
int pre = getBalance();
// 在这个基础上 1000-10 = 990
int next = pre - amount;
/*
compareAndSet 正是做这个检查,在 set 前,先比较 prev 与当前值
- 不一致了,next 作废,返回 false 表示失败
比如,别的线程已经做了减法,当前值已经被减成了 990
那么本线程的这次 990 就作废了,进入 while 下次循环重试
- 一致,以 next 设置为新值,返回 true 表示成功
*/
if (atomicInteger.compareAndSet(pre,next)){
break;
}
}
}
其中的关键是 compareAndSet,它的简称就是 CAS (也有 Compare And Swap 的说法),它必须是原子操作。
CAS的缺点主要有3点:
ABA问题:ABA的问题指的是在CAS更新的过程中,当读取到的值是A,然后准备赋值的时候仍然是A, 但是实际上有可能A的值被改成了B,然后⼜被改回了A,这个CAS更新的漏洞就叫做ABA。只是ABA的问 题⼤部分场景下都不影响并发的最终效果。 Java中有AtomicStampedReference来解决这个问题。
循环时间长开销大:⾃旋CAS的⽅式如果⻓时间不成功,会给CPU带来很⼤的开销。
只能保证⼀个共享变量的原子操作:只对⼀个共享变量操作可以保证原⼦性,但是多个则不⾏,多个可 以通过AtomicReference来处理或者使用锁synchronized实现
volatile
在上面代码中的AtomicInteger,保存值的value属性使用了volatile 。获取共享变量时,为了保证该变量的可见性,需要使用 volatile 修饰。
它可以用来修饰成员变量和静态成员变量,他可以避免线程从自己的工作缓存中查找变量的值,必须到主存中获取它的值,线程操作 volatile 变量都是直接操作主存。即一个线程对 volatile 变量的修改,对另一个线程可见。CAS 必须借助 volatile 才能读取到共享变量的最新值来实现【比较并交换】的效果
为什么无锁效率高
- 无锁情况下,即使重试失败,线程始终在高速运行,没有停歇,而 synchronized 会让线程在没有获得锁的时候,发生上下文切换,进入阻塞。
- 但无锁情况下,因为线程要保持运行,需要额外 CPU 的支持,CPU 在这里就好比高速跑道,没有额外的跑道,线程想高速运行也无从谈起,虽然不会进入阻塞,但由于没有分到时间片,仍然会进入可运行状态,还是会导致上下文切换。(多核CPU才能发挥优势)
CAS 的特点
结合 CAS 和 volatile 可以实现无锁并发,适用于线程数少、多核 CPU 的场景下。
- CAS 是基于乐观锁的思想:最乐观的估计,不怕别的线程来修改共享变量,就算改了也没关系,我吃亏点再重试呗。
- synchronized 是基于悲观锁的思想:最悲观的估计,得防着其它线程来修改共享变量,我上了锁你们都别想改,我改完了解开锁,你们才有机会。
- CAS 体现的是无锁并发、无阻塞并发,请仔细体会这两句话的意思
- 因为没有使用 synchronized,所以线程不会陷入阻塞,这是效率提升的因素之一
- 但如果竞争激烈(写操作多),可以想到重试必然频繁发生,反而效率会受影响
3.3原子整数
java.util.concurrent.atomic并发包提供了一些并发工具类,这里把它分成五类:
- 使用原子的方式更新基本类型上面三个类提供的方法几乎相同,所以我们将以 AtomicInteger 为例子来介绍。
- AtomicInteger:整型原子类
- AtomicLong:长整型原子类
- AtomicBoolean :布尔型原子类
- 原子引用
- 原子数组
- 字段更新器
- 原子累加器
下面先讨论原子整数类,以 AtomicInteger 为例讨论它的api接口:通过观察源码可以发现,AtomicInteger 内部都是通过cas的原理来实现的!!
3.3原子引用
为什么需要原子引用类型?保证引用类型的共享变量是线程安全的(确保这个原子引用没有引用过别人)。基本类型原子类只能更新一个变量,如果需要原子更新多个变量,需要使用引用类型原子类。
- AtomicReference:引用类型原子类
- AtomicStampedReference:原子更新带有版本号的引用类型。该类将整数值与引用关联起来,可用于解决原子的更新数据和数据的版本号,可以解决使用 CAS 进行原子更新时可能出现的 ABA 问题。
ABA 问题及解决
主线程仅能判断出共享变量的值与最初值 A 是否相同,不能感知到这种从 A 改为 B 又改回 A 的情况,如果主线程希望:只要有其它线程【动过了】共享变量,那么自己的 cas 就算失败,这时,仅比较值是不够的,需要再加一个版本号。使用AtomicStampedReference来解决。
3.4Unsafe
Unsafe 对象提供了非常底层的,操作内存、线程的方法,Unsafe 对象不能直接调用,只能通过反射获得。LockSupport的park方法,cas相关的方法底层都是通过Unsafe类来实现的。
static Unsafe unsafe;
static {
try {
// Unsafe 使用了单例模式,unsafe对象是类中的一个私有的变量
Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
theUnsafe.setAccessible(true);
unsafe = (Unsafe) theUnsafe.get(null);
} catch (NoSuchFieldException | IllegalAccessException e) {
throw new Error(e);
}
}
static Unsafe getUnsafe() {
return unsafe;
}
4.共享模型之不可变
4.1不可变设计
public final class String
implements java.io.Serializable, Comparable<String>, CharSequence {
/** The value is used for character storage. */
private final char value[];
/** Cache the hash code for the string */
private int hash; // Default to 0
// ...
}
发现该类、类中所有属性都是 final 的,属性用 final 修饰保证了该属性是只读的,不能修改,类用 final 修饰保证了该类中的方法不能被覆盖,防止子类无意间破坏不可变性。
保护性拷贝
但有同学会说,使用字符串时,也有一些跟修改相关的方法啊,比如 substring 等,那么下面就看一看这些方法是 如何实现的,就以 substring 为例:
public String substring(int beginIndex, int endIndex) {
if (beginIndex < 0) {
throw new StringIndexOutOfBoundsException(beginIndex);
}
if (endIndex > value.length) {
throw new StringIndexOutOfBoundsException(endIndex);
}
int subLen = endIndex - beginIndex;
if (subLen < 0) {
throw new StringIndexOutOfBoundsException(subLen);
}
// 上面是一些校验,下面才是真正的创建新的String对象
return ((beginIndex == 0) && (endIndex == value.length)) ? this
: new String(value, beginIndex, subLen);
}
发现其内部是调用 String 的构造方法创建了一个新字符串,再进入这个构造看看,是否对 final char[] value 做出 了修改:结果发现也没有,构造新字符串对象时,会生成新的 char[] value,对内容进行复制。这种通过创建副本对象来避免共享的手段称之为【保护性拷贝(defensive copy)】
public String(char value[], int offset, int count) {
if (offset < 0) {
throw new StringIndexOutOfBoundsException(offset);
}
if (count <= 0) {
if (count < 0) {
throw new StringIndexOutOfBoundsException(count);
}
if (offset <= value.length) {
this.value = "".value;
return;
}
}
// Note: offset or count might be near -1>>>1.
if (offset > value.length - count) {
throw new StringIndexOutOfBoundsException(offset + count);
}
// 上面是一些安全性的校验,下面是给String对象的value赋值,新创建了一个数组来保存String对象的值
this.value = Arrays.copyOfRange(value, offset, offset+count);
}
4.2享元模式
享元模式(Flyweight Pattern)主要用于减少创建对象的数量,以减少内存占用和提高性能。这种类型的设计模式属于结构型模式,它提供了减少对象数量从而改善应用所需的对象结构的方式。
- 简介定义英文名称:Flyweight pattern. 当需要重用数量有限的同一类对象时
- 体现
- 在JDK中 Boolean,Byte,Short,Integer,Long,Character 等包装类提供了 valueOf 方法,例如 Long 的valueOf 会缓存 -128~127 之间的 Long 对象,在这个范围之间会重用对象,大于这个范围,才会新建 Long 对象:
- String 串池
5.线程池
5.1自定义线程池
class BlockingQueue<T> {
//1.任务队列
private Deque<T> queue = new ArrayDeque<>();
//2.锁
private ReentrantLock lock = new ReentrantLock();
//3.生产者条件变量
private Condition fullWaitSet = lock.newCondition();
//4.消费者条件变量
private Condition emptyWaitSet = lock.newCondition();
//5.容量
private int capcity;
//阻塞获取
public T take() {
lock.lock();
try{
while(queue.isEmpty()) {
emptyWaitSet.await();
}
T t = queue.removeFirst();
//取走了元素,队列不再满了,把队列满了等待这个条件变量唤醒
fullWaitSet.signal();//唤醒
return t;
}finally{
lock.unlock();
}
}
//阻塞添加
public void put(T element) {
lock.lock();
try {
while (queue.size() == capcity) {
//队列满了进入条件变量阻塞
fullWaitSet.await();
}
queue.addList(element);
//添加了元素,队列不为空,把队列为空等待这个条件变量唤醒
emptyWaitSet.signal();//唤醒
}finally {
lock.unlock();
}
}
//获取大小
public int size() {
lock.lock();
try {
return queue.size();
}finally {
lock.unlock();
}
}
}
class ThreadPool {
// 任务队列
private BlockingQueue<Runnable> taskQueue;
// 线程集合
private HashSet<Worker> Allworkers = new HashSet<>();
private int coreSize;
private long timeout;
private TimeUnit timeUnit;
private RejectPolicy<Runnable> rejectPolicy;
// 执行任务
public void execute(Runnable task) {
// 当任务数没有超过 coreSize 时,直接交给 worker 对象执行
// 如果任务数超过 coreSize 时,加入任务队列暂存
synchronized (workers) {
if(workers.size() < coreSize) {
Worker worker = new Worker(task);
Allworkers.add(worker);
worker.start();
} else {
taskQueue.put(task);
}
}
}
public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapcity, RejectPolicy<Runnable> rejectPolicy) {
this.coreSize = coreSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.taskQueue = new BlockingQueue<>(queueCapcity);
this.rejectPolicy = rejectPolicy;
}
class Worker extends Thread{
private Runnable task;
public Worker(Runnable task) {
this.task = task;
}
@Override
public void run() {
// 执行任务
// 1) 当 task 不为空,执行任务
// 2) 当 task 执行完毕,再接着从任务队列获取任务并执行
while(task != null || (task = taskQueue.take()) != null) {
try {
log.debug("正在执行...{}", task);
task.run();
} finally {
task = null;
}
}
synchronized (Allworkers) {
log.debug("worker 被移除{}", this);
Allworkers.remove(this);
}
}
}
}
5.2ThreadPoolExecutor
1)Executor 框架结构(主要由三大部分组成)
- 任务(Runnable /Callable)
执行任务需要实现的 Runnable 接口 或 Callable接口。Runnable 接口或 Callable 接口 实现类都可以被 ThreadPoolExecutor 或 ScheduledThreadPoolExecutor 执行。
- 任务的执行(Executor)
任务执行机制的核心接口 Executor ,以及继承自 Executor 接口的 ExecutorService 接口。ThreadPoolExecutor 和 ScheduledThreadPoolExecutor 这两个关键类实现了 ExecutorService 接口。
这里提了很多底层的类关系,但是,实际上我们需要更多关注的是 ThreadPoolExecutor 这个类,这个类在我们实际使用线程池的过程中,使用频率还是非常高的。
- 异步计算的结果(Future)
Future 接口以及Future接口的实现类 FutureTask 类都可以代表异步计算的结果。
当我们把 Runnable接口 或 Callable 接口 的实现类提交给 ThreadPoolExecutor 或 ScheduledThreadPoolExecutor 执行。(调用 submit() 方法时会返回一个 FutureTask 对象)
- Executor 框架的使用示意图
- 主线程首先要创建实现 Runnable 或者 Callable 接口的任务对象。
- 把创建完成的实现 Runnable/Callable接口的 对象直接交给 ExecutorService 执行: ExecutorService.execute(Runnable command))或者(ExecutorService.submit(Runnable task)或 ExecutorService.submit(Callable
task))。 - 如果执行 ExecutorService.submit(…),ExecutorService 将返回一个实现Future接口的对象(我们刚刚也提到过了执行 execute()方法和 submit()方法的区别,submit()会返回一个 FutureTask 对象)。
- 最后,主线程可以执行 FutureTask.get()方法来等待任务执行完成。主线程也可以执行 FutureTask.cancel(boolean mayInterruptIfRunning)来取消此任务的执行
循环打印ABC
public class multiThread {
public static void main(String[] args) throws InterruptedException {
Semaphore a = new Semaphore(1);
Semaphore b = new Semaphore(0);
Semaphore c = new Semaphore(0);
ExecutorService poolService = Executors.newFixedThreadPool(3);
Integer count = 10;
poolService.execute(new Worker(a, b, "A", count));//一开始是信号量a执行acquire(),因为会使信号量减1,所以要初始化为1
poolService.execute(new Worker(b, c, "B", count));
poolService.execute(new Worker(c, a, "C", count));
Thread.sleep(1000);
poolService.shutdownNow();
}
public static class Worker implements Runnable {
private String key;
private Semaphore current;
private Semaphore next;
private Integer count;
public Worker(Semaphore current, Semaphore next, String key, Integer count) {
this.current = current;
this.next = next;
this.key = key;
this.count = count;
}
public void run() {
for(int i = 0; i < count; i++) {
try {
//获取当前的锁
current.acquire(); //current - 1
System.out.println(i+","+key);
//释放next的锁
next.release(); //next + 1
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
2)构造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler){
}
- corePoolSize 核心线程数目 (最多保留的线程数)
- maximumPoolSize 最大线程数目(核心线程数加上救急线程数)
- keepAliveTime 救急线程的生存时间(核心线程没有生存时间这个东西,核心线程会一直运行)
- unit 时间单位 - 针对救急线程
- workQueue 阻塞队列
- threadFactory 线程工厂 - 可以为线程创建时起个好名字
- handler 拒绝策略
CPU密集型:核心线程数 = CPU核数 + 1
IO密集型:核心线程数 = CPU核数 * 2
- 线程池中刚开始没有线程,当一个任务提交给线程池后,线程池会创建一个新线程来执行任务。
- 当线程数达到 corePoolSize 并没有线程空闲,这时再加入任务,新加的任务会被加入workQueue 队列排队,直到有空闲的线程。
- 如果队列选择了有界队列,那么任务超过了队列大小时,会创建 maximumPoolSize - corePoolSize 数目的线程来救急。
如果线程到达 maximumPoolSize 仍然有新任务这时会执行拒绝策略。拒绝策略 jdk 提供了 下面的前4 种实现,其它著名框架也提供了实现
- ThreadPoolExecutor.AbortPolicy让调用者抛出 RejectedExecutionException 异常,这是默认策略
- ThreadPoolExecutor.CallerRunsPolicy 让调用者运行任务
- ThreadPoolExecutor.DiscardPolicy 放弃本次任务
- ThreadPoolExecutor.DiscardOldestPolicy 放弃队列中最早的任务,本任务取而代之
- Netty 的实现,是创建一个新线程来执行任务
- ActiveMQ 的实现,带超时等待(60s)尝试放入队列,类似我们之前自定义的拒绝策略
当高峰过去后,超过corePoolSize 的救急线程如果一段时间没有任务做,需要结束节省资源,这个时间由 keepAliveTime 和 unit 来控制。(救急线程有生存时间,核心线程没有生存时间)
3)newFixedThreadPool(固定大小的线程池)
这个是Executors类提供的工厂方法来创建线程池!Executors 是Executor 框架的工具类!
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
通过源码可以看到 new ThreadPoolExecutor(xxx)方法其实是是调用了之前说的完整参数的构造方法,使用了默认的线程工厂和拒绝策略!
特点
- 核心线程数 == 最大线程数(没有救急线程被创建),因此也无需超时时间
- 阻塞队列是无界的,可以放任意数量的任务
- 适用于任务量已知,相对耗时的任务
4)newCachedThreadPool(带缓冲的线程池)
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
特点
- 核心线程数是 0, 最大线程数是 Integer.MAX_VALUE,救急线程的空闲生存时间是 60s,意味着
- 全部都是救急线程(60s 后可以回收)
- 救急线程可以无限创建
- 队列采用了 SynchronousQueue 实现特点是,它没有容量,没有线程来取是放不进去的
- 整个线程池表现为线程数会根据任务量不断增长,没有上限,当任务执行完毕,空闲 1分钟后释放线 程。 适合任务数比较密集,但每个任务执行时间较短的情况
5)newSingleThreadExecutor(单线程线程池)
public static ExecutorService newSingleThreadExecutor() {
//没有直接返回ThreadPoolExecutor(相对于Fixed),应用装饰器模式
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
}
使用场景:
- 希望多个任务排队执行。线程数固定为 1,任务数多于 1 时,会放入无界队列排队。任务执行完毕,这唯一的线程也不会被释放。
- 区别:
- 和自己创建单线程执行任务的区别:自己创建一个单线程串行执行任务,如果任务执行失败而终止那么没有任何补救措施,而线程池还会新建一个线程,保证池的正常工作
- Executors.newSingleThreadExecutor() 线程个数始终为1,不能修改
- FinalizableDelegatedExecutorService 应用的是装饰器模式,只对外暴露了 ExecutorService 接口,因 此不能调用 ThreadPoolExecutor 中特有的方法
- 和Executors.newFixedThreadPool(1) 初始时为1时的区别:Executors.newFixedThreadPool(1) 初始时为1,以后还可以修改,对外暴露的是 ThreadPoolExecutor 对象,可以强转后调用 setCorePoolSize 等方法进行修改
5.3线程池调参
1)生命周期
ThreadPoolExecutor的运行状态有5种,分别为:
2)参数设置
动态化线程池的核心设计包括以下三个方面:
- 简化线程池配置:线程池构造参数有8个,但是最核心的是3个:corePoolSize、maximumPoolSize,workQueue,它们最大程度地决定了线程池的任务分配和线程分配策略。考虑到在实际应用中我们获取并发性的场景主要是两种:(1)并行执行子任务,提高响应速度。这种情况下,应该使用同步队列,没有什么任务应该被缓存下来,而是应该立即执行。(2)并行执行大批次任务,提升吞吐量。这种情况下,应该使用有界队列,使用队列去缓冲大批量的任务,队列容量必须声明,防止任务无限制堆积。所以线程池只需要提供这三个关键参数的配置,并且提供两种队列的选择,就可以满足绝大多数的业务需求,Less is More。
- 参数可动态修改:为了解决参数不好配,修改参数成本高等问题。在Java线程池留有高扩展性的基础上,封装线程池,允许线程池监听同步外部的消息,根据消息进行修改配置。将线程池的配置放置在平台侧,允许开发同学简单的查看、修改线程池配置。
- 增加线程池监控:对某事物缺乏状态的观测,就对其改进无从下手。在线程池执行任务的生命周期添加监控能力,帮助开发同学了解线程池状态。
6.AQS
6.1AQS原理
- 概述:全称是 AbstractQueuedSynchronizer,是阻塞式锁和相关的同步器工具的框架
- 特点:
- 用 state 属性来表示资源的状态(分独占模式和共享模式),子类需要定义如何维护这个状态,控制如何获取锁和释放锁
- getState - 获取 state 状态
- setState - 设置 state 状态
- compareAndSetState - cas 机制设置 state 状态
- 独占模式是只有一个线程能够访问资源,而共享模式可以允许多个线程访问资源
- 提供了基于 FIFO 的等待队列,类似于 Monitor 的 EntryList
- 条件变量来实现等待、唤醒机制,支持多个条件变量,类似于 Monitor 的 WaitSet
- 用 state 属性来表示资源的状态(分独占模式和共享模式),子类需要定义如何维护这个状态,控制如何获取锁和释放锁
子类主要实现这样一些方法(默认抛出 UnsupportedOperationException)
- tryAcquire
- tryRelease
- tryAcquireShared
- tryReleaseShared
- isHeldExclusively ```java //获取锁的姿势 // 如果获取锁失败 if (!tryAcquire(arg)) { // 入队, 可以选择阻塞当前线程 park unpark }
//释放锁的姿势 // 如果释放锁成功 if (tryRelease(arg)) { // 让阻塞线程恢复运行 }
<a name="XgwPR"></a>
## 6.2ReentrantLock原理

<a name="CHjoo"></a>
### <br />1. 非公平锁实现原理
加锁解锁流程,先从构造器开始看,默认为非公平锁实现
```java
public ReentrantLock() {
sync = new NonfairSync();
}
static final class NonfairSync extends Sync {
final void lock() {
//AQS状态,0表示未加锁, 1表示加锁
if(compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());//把owner线程改成当前线程
else
acquire(1);
}
}
public final void acquire(int arg) {
// 再次尝试加锁, 然后为 true 就不走下面逻辑,为 false,则创建一个 Node 节点对象加入到等待队列中去
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
NonfairSync 继承自 AQS
没有竞争时
第一个竞争出现时,查看源码的NonfairSync的lock方法
Thread-1 执行了
- lock方法中CAS 尝试将 state 由 0 改为 1,结果失败
- lock方法中进一步调用acquire方法,进入 tryAcquire 逻辑,这里我们认为这时 state 已经是1,结果仍然失败
- 接下来进入 acquire方法的addWaiter 逻辑,构造 Node 队列
- 图中黄色三角表示该 Node 的 waitStatus 状态,其中 0 为默认正常状态
- Node 的创建是懒惰的
- 其中第一个 Node 称为 Dummy(哑元)或哨兵,用来占位,并不关联线程
当前线程进入 acquire方法的 acquireQueued 逻辑
- acquireQueued 会在一个死循环中不断尝试获得锁,失败后进入 park 阻塞
- 如果自己是紧邻着 head(排第二位),那么再次 tryAcquire 尝试获取锁,我们这里设置这时 state 仍为 1,失败
- 进入 shouldParkAfterFailedAcquire 逻辑,将前驱 node,即 head 的 waitStatus 改为 -1,这次返回 false,-1表示有责任唤醒它后继的节点。
- shouldParkAfterFailedAcquire 执行完毕回到 acquireQueued ,再次 tryAcquire 尝试获取锁,当然这时 state 仍为 1,失败
- 当再次进入 shouldParkAfterFailedAcquire 时,这时因为其前驱 node 的 waitStatus 已经是 -1,这次返回 true
- 进入 parkAndCheckInterrupt, Thread-1 park(灰色表示已经阻塞)
再次有多个线程经历上述过程竞争失败,变成这个样子
Thread-0 调用unlock方法里的release方法释放锁,进入tryRelease流程,如果成功,设置 exclusiveOwnerThread 为 null,state = 0
unlock方法里的release方法方法中,如果当前队列不为 null,并且 head 的 waitStatus = -1,进入 unparkSuccessor 流程: unparkSuccessor中会找到队列中离 head 最近的一个 Node(没取消的),unpark 恢复其运行,本例中即为 Thread-1 回到 Thread-1 的 acquireQueued 流程
如果加锁成功(没有竞争),会设置 (acquireQueued 方法中)
- exclusiveOwnerThread 为 Thread-1,state = 1
- head 指向刚刚 Thread-1 所在的 Node,该 Node 清空 Thread
- 原本的 head 因为从链表断开,而可被垃圾回收
如果这时候有其它线程来竞争(非公平的体现),例如这时有 Thread-4 来了
如果不巧又被 Thread-4 占了先
- Thread-4 被设置为 exclusiveOwnerThread,state = 1
Thread-1 再次进入 acquireQueued 流程,获取锁失败,重新进入 park 阻塞
加锁源码// Sync 继承自 AQS
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
// 加锁实现
final void lock() {
// 首先用 cas 尝试(仅尝试一次)将 state 从 0 改为 1, 如果成功表示获得了独占锁
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
// 如果尝试失败,进入 ㈠
acquire(1);
}
// ㈠ AQS 继承过来的方法, 方便阅读, 放在此处
public final void acquire(int arg) {
// ㈡ tryAcquire
if (
!tryAcquire(arg) &&
// 当 tryAcquire 返回为 false 时, 先调用 addWaiter ㈣, 接着 acquireQueued ㈤
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
) {
selfInterrupt();
}
}
// ㈡ 进入 ㈢
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
// ㈢ Sync 继承过来的方法, 方便阅读, 放在此处
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 如果还没有获得锁
if (c == 0) {
// 尝试用 cas 获得, 这里体现了非公平性: 不去检查 AQS 队列
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 如果已经获得了锁, 线程还是当前线程, 表示发生了锁重入
else if (current == getExclusiveOwnerThread()) {
// state++
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// 获取失败, 回到调用处
return false;
}
// ㈣ AQS 继承过来的方法, 方便阅读, 放在此处
private Node addWaiter(Node mode) {
// 将当前线程关联到一个 Node 对象上, 模式为独占模式,
//新建的Node的waitstatus默认为0,因为waitstatus是成员变量,默认被初始化为0
Node node = new Node(Thread.currentThread(), mode);
// 如果 tail 不为 null, cas 尝试将 Node 对象加入 AQS 队列尾部
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
// 双向链表
pred.next = node;
return node;
}
}
//如果tail为null,尝试将 Node 加入 AQS, 进入 ㈥
enq(node);
return node;
}
// ㈥ AQS 继承过来的方法, 方便阅读, 放在此处
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
// 还没有, 设置 head 为哨兵节点(不对应线程,状态为 0)
if (compareAndSetHead(new Node())) {
tail = head;
}
} else {
// cas 尝试将 Node 对象加入 AQS 队列尾部
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
// ㈤ AQS 继承过来的方法, 方便阅读, 放在此处
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// 上一个节点是 head, 表示轮到自己(当前线程对应的 node)了, 尝试获取
if (p == head && tryAcquire(arg)) {
// 获取成功, 设置自己(当前线程对应的 node)为 head
setHead(node);
// 上一个节点 help GC
p.next = null;
failed = false;
// 返回中断标记 false
return interrupted;
}
if (
// 判断是否应当 park, 进入 ㈦
shouldParkAfterFailedAcquire(p, node) &&
// park 等待, 此时 Node 的状态被置为 Node.SIGNAL ㈧
parkAndCheckInterrupt()
) {
interrupted = true;
}
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// ㈦ AQS 继承过来的方法, 方便阅读, 放在此处
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取上一个节点的状态
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) {
// 上一个节点都在阻塞, 那么自己也阻塞好了
return true;
}
// > 0 表示取消状态
if (ws > 0) {
// 上一个节点取消, 那么重构删除前面所有取消的节点, 返回到外层循环重试
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 这次还没有阻塞
// 但下次如果重试不成功, 则需要阻塞,这时需要设置上一个节点状态为 Node.SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
// ㈧ 阻塞当前线程
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
}
解锁源码// Sync 继承自 AQS
static final class NonfairSync extends Sync {
// 解锁实现
public void unlock() {
sync.release(1);
}
// AQS 继承过来的方法, 方便阅读, 放在此处
public final boolean release(int arg) {
// 尝试释放锁, 进入 ㈠
if (tryRelease(arg)) {
// 队列头节点 unpark
Node h = head;
if (
// 队列不为 null
h != null &&
// waitStatus == Node.SIGNAL 才需要 unpark
h.waitStatus != 0
) {
// unpark AQS 中等待的线程, 进入 ㈡
unparkSuccessor(h);
}
return true;
}
return false;
}
// ㈠ Sync 继承过来的方法, 方便阅读, 放在此处
protected final boolean tryRelease(int releases) {
// state--
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 支持锁重入, 只有 state 减为 0, 才释放成功
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
// ㈡ AQS 继承过来的方法, 方便阅读, 放在此处
private void unparkSuccessor(Node node) {
// 如果状态为 Node.SIGNAL 尝试重置状态为 0, 如果线程获取到了锁那么后来头结点会被抛弃掉
// 不成功也可以
int ws = node.waitStatus;
if (ws < 0) {
compareAndSetWaitStatus(node, ws, 0);
}
// 找到需要 unpark 的节点, 但本节点从 AQS 队列中脱离, 是由唤醒节点完成的
Node s = node.next;
// 不考虑已取消的节点, 从 AQS 队列从后至前找到队列最前面需要 unpark 的节点
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
}
可重入原理
/**
*1.加锁时重入,state++
*2.解锁时,state--
*/
static final class NonfairSync extends Sync {
// ...
// Sync 继承过来的方法, 方便阅读, 放在此处
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 如果已经获得了锁, 线程还是当前线程, 表示发生了锁重入
else if (current == getExclusiveOwnerThread()) {
// state++
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
// Sync 继承过来的方法, 方便阅读, 放在此处
protected final boolean tryRelease(int releases) {
// state--
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 支持锁重入, 只有 state 减为 0, 才释放成功
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
}
公平锁实现原理
与非公平锁主要区别在于 tryAcquire 方法的实现
先检查 AQS 队列中是否有前驱节点, 没有才去竞争
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
// AQS 继承过来的方法, 方便阅读, 放在此处
public final void acquire(int arg) {
if (
!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
) {
selfInterrupt();
}
}
// 与非公平锁主要区别在于 tryAcquire 方法的实现
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 先检查 AQS 队列中是否有前驱节点, 没有才去竞争
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
// ㈠ AQS 继承过来的方法, 方便阅读, 放在此处
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
// h != t 时表示队列中有 Node
return h != t &&
(
// (s = h.next) == null 表示队列中还有没有老二
(s = h.next) == null || // 或者队列中老二线程不是此线程
s.thread != Thread.currentThread()
);
}
}
条件变量实现原理
每个条件变量其实就对应着一个等待队列,其实现类是 ConditionObjectawait 流程
开始 Thread-0 持有锁,调用 await,进入 ConditionObject 的 addConditionWaiter 流程 创建新的 Node 状态为 -2(Node.CONDITION),关联 Thread-0,加入等待队列尾部
接下来进入 AQS 的 fullyRelease 流程,释放同步器上的锁
unpark AQS 队列中的下一个节点,竞争锁,假设没有其他竞争线程,那么 Thread-1 竞争成功
park 阻塞 Thread-0
signal 流程
假设 Thread-1 要来唤醒 Thread-0
进入 ConditionObject 的 doSignal 流程,取得等待队列中第一个 Node,即 Thread-0 所在 Node
执行 transferForSignal 流程,将该 Node 加入 AQS 队列尾部,将 Thread-0 的 waitStatus 改为 0,Thread-3 的waitStatus 改为 -1Thread-1 释放锁,进入 unlock 流程。
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
// 第一个等待节点
private transient Node firstWaiter;
// 最后一个等待节点
private transient Node lastWaiter;
public ConditionObject() { }
// ㈠ 添加一个 Node 至等待队列
private Node addConditionWaiter() {
Node t = lastWaiter;
// 所有已取消的 Node 从队列链表删除, 见 ㈡
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
// 创建一个关联当前线程的新 Node, 添加至队列尾部
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
// 唤醒 - 将没取消的第一个节点转移至 AQS 队列
private void doSignal(Node first) {
do {
// 已经是尾节点了
if ( (firstWaiter = first.nextWaiter) == null) {
lastWaiter = null;
}
first.nextWaiter = null;
} while (
// 将等待队列中的 Node 转移至 AQS 队列, 不成功且还有节点则继续循环 ㈢
!transferForSignal(first) &&
// 队列还有节点
(first = firstWaiter) != null
);
}
// 外部类方法, 方便阅读, 放在此处
// ㈢ 如果节点状态是取消, 返回 false 表示转移失败, 否则转移成功
final boolean transferForSignal(Node node) {
// 设置当前node状态为0(因为处在队列末尾),如果状态已经不是 Node.CONDITION, 说明被取消了
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// 加入 AQS 队列尾部
Node p = enq(node);
int ws = p.waitStatus;
if (
// 插入节点的上一个节点被取消
ws > 0 ||
// 插入节点的上一个节点不能设置状态为 Node.SIGNAL
!compareAndSetWaitStatus(p, ws, Node.SIGNAL)
) {
// unpark 取消阻塞, 让线程重新同步状态
LockSupport.unpark(node.thread);
}
return true;
}
// 全部唤醒 - 等待队列的所有节点转移至 AQS 队列
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
// ㈡
private void unlinkCancelledWaiters() {
// ...
}
// 唤醒 - 必须持有锁才能唤醒, 因此 doSignal 内无需考虑加锁
public final void signal() {
// 如果没有持有锁,会抛出异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
// 全部唤醒 - 必须持有锁才能唤醒, 因此 doSignalAll 内无需考虑加锁
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
// 不可打断等待 - 直到被唤醒
public final void awaitUninterruptibly() {
// 添加一个 Node 至等待队列, 见 ㈠
Node node = addConditionWaiter();
// 释放节点持有的锁, 见 ㈣
int savedState = fullyRelease(node);
boolean interrupted = false;
// 如果该节点还没有转移至 AQS 队列, 阻塞
while (!isOnSyncQueue(node)) {
// park 阻塞
LockSupport.park(this);
// 如果被打断, 仅设置打断状态
if (Thread.interrupted())
interrupted = true;
}
// 唤醒后, 尝试竞争锁, 如果失败进入 AQS 队列
if (acquireQueued(node, savedState) || interrupted)
selfInterrupt();
}
// 外部类方法, 方便阅读, 放在此处
// ㈣ 因为某线程可能重入,需要将 state 全部释放,获取state,然后把它全部减掉,以全部释放
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
// 唤醒等待队列队列中的下一个节点
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
// 打断模式 - 在退出等待时重新设置打断状态
private static final int REINTERRUPT = 1;
// 打断模式 - 在退出等待时抛出异常
private static final int THROW_IE = -1;
// 判断打断模式
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
// ㈤ 应用打断模式
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
// 等待 - 直到被唤醒或打断
public final void await() throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
}
// 添加一个 Node 至等待队列, 见 ㈠
Node node = addConditionWaiter();
// 释放节点持有的锁
int savedState = fullyRelease(node);
int interruptMode = 0;
// 如果该节点还没有转移至 AQS 队列, 阻塞
while (!isOnSyncQueue(node)) {
// park 阻塞
LockSupport.park(this);
// 如果被打断, 退出等待队列
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 退出等待队列后, 还需要获得 AQS 队列的锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 所有已取消的 Node 从队列链表删除, 见 ㈡
if (node.nextWaiter != null)
unlinkCancelledWaiters();
// 应用打断模式, 见 ㈤
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
// 等待 - 直到被唤醒或打断或超时
public final long awaitNanos(long nanosTimeout) throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
}
// 添加一个 Node 至等待队列, 见 ㈠
Node node = addConditionWaiter();
// 释放节点持有的锁
int savedState = fullyRelease(node);
// 获得最后期限
final long deadline = System.nanoTime() + nanosTimeout;
int interruptMode = 0;
// 如果该节点还没有转移至 AQS 队列, 阻塞
while (!isOnSyncQueue(node)) {
// 已超时, 退出等待队列
if (nanosTimeout <= 0L) {
transferAfterCancelledWait(node);
break;
}
// park 阻塞一定时间, spinForTimeoutThreshold 为 1000 ns
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
// 如果被打断, 退出等待队列
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
// 退出等待队列后, 还需要获得 AQS 队列的锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 所有已取消的 Node 从队列链表删除, 见 ㈡
if (node.nextWaiter != null)
unlinkCancelledWaiters();
// 应用打断模式, 见 ㈤
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return deadline - System.nanoTime();
}
// 等待 - 直到被唤醒或打断或超时, 逻辑类似于 awaitNanos
public final boolean awaitUntil(Date deadline) throws InterruptedException {
// ...
}
// 等待 - 直到被唤醒或打断或超时, 逻辑类似于 awaitNanos
public final boolean await(long time, TimeUnit unit) throws InterruptedException {
// ...
}
// 工具方法 省略 ...
}
6.3 Semaphore
信号量,用来限制能同时访问共享资源的线程上限。
public static void main(String[] args) {
// 1. 创建一个对象
Semaphore semaphore = new Semaphore(3);
// 2. 开 10 个线程
for(int i = 0; i < 10; i++) {
new Thread(() -> {
// 获取一个许可
try {
semaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
log.info("start ...");
Thread.sleep(1000);
log.info("end ....");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
}, "t" + (i + 1)).start();;
}
}
Semaphore 有点像一个停车场,permits 就好像停车位数量,当线程获得了 permits 就像是获得了停车位,然后停车场显示空余车位减一刚开始,permits(state)为 3,这时 5 个线程来获取资源。
假设其中 Thread-1,Thread-2,Thread-4 cas 竞争成功,而 Thread-0 和 Thread-3 竞争失败,进入 AQS 队列park 阻塞
这时 Thread-4 释放了 permits,状态如下
接下来 Thread-0 竞争成功,permits 再次设置为 0,设置自己为 head 节点,断开原来的 head 节点,unpark 接下来的 Thread-3 节点,但由于 permits 是 0,因此 Thread-3 在尝试不成功后再次进入 park 状态
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
// permits 即 state
super(permits);
}
// Semaphore 方法, 方便阅读, 放在此处
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
// AQS 继承过来的方法, 方便阅读, 放在此处
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
// 尝试获得共享锁
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
// Sync 继承过来的方法, 方便阅读, 放在此处
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (
// 如果许可已经用完, 返回负数, 表示获取失败, 进入 doAcquireSharedInterruptibly
remaining < 0 ||
// 如果 cas 重试成功, 返回正数, 表示获取成功
compareAndSetState(available, remaining)
) {
return remaining;
}
}
}
// AQS 继承过来的方法, 方便阅读, 放在此处
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
// 再次尝试获取许可
int r = tryAcquireShared(arg);
if (r >= 0) {
// 成功后本线程出队(AQS), 所在 Node设置为 head
// 如果 head.waitStatus == Node.SIGNAL ==> 0 成功, 下一个节点 unpark
// 如果 head.waitStatus == 0 ==> Node.PROPAGATE
// r 表示可用资源数, 为 0 则不会继续传播
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// 不成功, 设置上一个节点 waitStatus = Node.SIGNAL, 下轮进入 park 阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// Semaphore 方法, 方便阅读, 放在此处
public void release() {
sync.releaseShared(1);
}
// AQS 继承过来的方法, 方便阅读, 放在此处
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
// Sync 继承过来的方法, 方便阅读, 放在此处
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
}
6.4 CountdownLatch
CountDownLatch 允许多线程阻塞在一个地方,直至所有线程的任务都执行完毕。在 Java 并发中,CountDownLatch 的概念是一个常见的面试题,所以一定要确保你很好的理解了它。
CountDownLatch 是共享锁的一种实现,它默认构造 AQS 的 state 值为 count。当线程使用 countDown方法时,其实使用了 tryReleaseShared 方法以CAS 的操作来减少 state ,直至 state 为 0 就代表所有的线程都调用了countDown方法。当调用 await 方法的时候,如果 state 不为0,就代表仍然有线程没有调用 countDown 方法,那么就把已经调用过 countDown 的线程都放入阻塞队列 Park ,并自旋 CAS 判断 state == 0,直至最后一个线程调用了 countDown ,使得 state == 0,于是阻塞的线程便判断成功,全部往下执行。
public class CountDownLatch {
private static final class Sync extends AbstractQueuedSynchronizer {
// 版本号
private static final long serialVersionUID = 4982264981922014374L;
// 构造器
Sync(int count) {
setState(count);
}
// 返回当前计数
int getCount() {
return getState();
}
// 试图在共享模式下获取对象状态
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
// 试图设置状态来反映共享模式下的一个释放
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
// 无限循环
for (;;) {
// 获取状态
int c = getState();
if (c == 0) // 没有被线程占有
return false;
// 下一个状态
int nextc = c-1;
if (compareAndSetState(c, nextc)) // 比较并且设置成功
return nextc == 0;
}
}
}
// 同步队列
private final Sync sync;
}
用来进行线程同步协作,等待所有线程完成倒计时。 其中构造参数用来初始化等待计数值,await() 用来等待计数归零,countDown() 用来让计数减一。
@Slf4j(topic = "c.CountDownLatch")
public class Code_16_CountDownLatchTest {
public static void main(String[] args) throws InterruptedException {
method3();
}
public static void method1() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(3);
new Thread(() -> {
log.info("t1 start ...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("t1 end ...");
countDownLatch.countDown();
}, "t1").start();
new Thread(() -> {
log.info("t2 start ...");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("t2 end ...");
countDownLatch.countDown();
}, "t2").start();
new Thread(() -> {
log.info("t3 start ...");
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("t3 end ...");
countDownLatch.countDown();
}, "t3").start();
log.info("main wait ...");
countDownLatch.await();
log.info("main wait end ...");
}
public static void method2() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(3);
ExecutorService executorService = Executors.newFixedThreadPool(4);
executorService.submit(() -> {
log.info("t1 start ...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
countDownLatch.countDown();
log.info("t1 end ...{}", countDownLatch.getCount());
});
executorService.submit(() -> {
log.info("t2 start ...");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("t2 end ...{}", countDownLatch.getCount());
countDownLatch.countDown();
});
executorService.submit(() -> {
log.info("t3 start ...");
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("t3 end ...{}", countDownLatch.getCount());
countDownLatch.countDown();
});
executorService.submit(() -> {
log.info("main wait ...");
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("main wait end ...");
executorService.shutdown();
});
}
public static void method3() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(10);
ExecutorService executorService = Executors.newFixedThreadPool(10);
String[] all = new String[10];
Random random = new Random();
for(int i = 0; i < 10; i++) {
int id = i;
executorService.submit(() -> {
for (int j = 0; j <= 100; j++) {
try {
Thread.sleep(random.nextInt(100));
} catch (InterruptedException e) {
e.printStackTrace();
}
all[id] = j + "%";
System.out.print("\r" + Arrays.toString(all));
}
countDownLatch.countDown();
});
}
countDownLatch.await();
System.out.println();
System.out.println("游戏开始");
executorService.shutdown();
}
}
微服务中调用各个服务可以用CountDownLatch,每个服务调用完加一行latch.countDown(),主线程latch.await()等待
6.5 cyclicbarrier
循环栅栏,用来进行线程协作,等待线程满足某个计数。构造时设置『计数个数』,每个线程执行到某个需要“同步”的时刻调用 await() 方法进行等待,当等待的线程数满足『计数个数』时,继续执行。跟 CountdownLatch 一样,但这个可以重用。
7.线程安全集合类
7.1 重点
重点介绍 java.util.concurrent.* 下的线程安全集合类,可以发现它们有规律,里面包含三类关键词: Blocking、CopyOnWrite、Concurrent
- Blocking :大部分实现基于锁,并提供用来阻塞的方法
- CopyOnWrite 之类容器:修改开销相对较重
- Concurrent类型的容器
- 内部很多操作使用 cas 优化,一般可以提供较高吞吐量
- 弱一致性
- 遍历时弱一致性,例如,当利用迭代器遍历时,如果容器发生修改,迭代器仍然可以继续进行遍 历,这时内容是旧的(fast-safe机制)
- 求大小弱一致性,size 操作未必是 100% 准确
- 读取弱一致性
7.2 CopyOnWriteArrayList
大多数业务场景都是一种“读多写少”的情形,CopyOnWriteArrayList就是为适应这种场景而诞生的。
CopyOnWriteArrayList,运用了一种“写时复制”的思想。通俗的理解就是当我们需要修改(增/删/改)列表中的元素时,不直接进行修改,而是先将列表Copy,然后在新的副本上进行修改,修改完成之后,再将引用从原列表指向新列表。
这样做的好处是读/写是不会冲突的,可以并发进行,读操作还是在原列表,写操作在新列表。仅仅当有多个线程同时进行写操作时,才会进行同步。
内部结构
public class CopyOnWriteArrayList<E>
implements List<E>, RandomAccess, Cloneable, java.io.Serializable {
/**
* 排它锁, 用于同步修改操作
*/
final transient ReentrantLock lock = new ReentrantLock();
/**
* 内部数组
*/
private transient volatile Object[] array;
}
查询——get方法
public E get(int index) {
return get(getArray(), index);
}
private E get(Object[] a, int index) {
return (E) a[index];
}
添加——add方法
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray(); // 旧数组
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1); // 复制并创建新数组
newElements[len] = e; // 将元素插入到新数组末尾
setArray(newElements); // 内部array引用指向新数组
return true;
} finally {
lock.unlock();
}
}
add方法首先会进行加锁,保证只有一个线程能进行修改;然后会创建一个新数组(大小为n+1),并将原数组的值复制到新数组,新元素插入到新数组的最后;最后,将字段array指向新数组。
迭代
CopyOnWriteArrayList对元素进行迭代时,仅仅返回一个当前内部数组的快照,也就是说,如果此时有其它线程正在修改元素,并不会在迭代中反映出来,因为修改都是在新数组中进行的。
public Iterator<E> iterator() {
return new COWIterator<E>(getArray(), 0);
}
static final class COWIterator<E> implements ListIterator<E> {
/**
* Snapshot of the array
*/
private final Object[] snapshot;
/**
* Index of element to be returned by subsequent call to next.
*/
private int cursor;
private COWIterator(Object[] elements, int initialCursor) {
cursor = initialCursor;
snapshot = elements;
}
public boolean hasNext() {
return cursor < snapshot.length;
}
public E next() {
if (!hasNext())
throw new NoSuchElementException();
return (E) snapshot[cursor++];
}
// ...
}
1. 内存的使用
由于CopyOnWriteArrayList使用了“写时复制”,所以在进行写操作的时候,内存里会同时存在两个array数组,如果数组内存占用的太大,那么可能会造成频繁GC,所以CopyOnWriteArrayList并不适合大数据量的场景。
2. 数据一致性
CopyOnWriteArrayList只能保证数据的最终一致性,不能保证数据的实时一致性——读操作读到的数据只是一份快照。所以如果希望写入的数据可以立刻被读到,那CopyOnWriteArrayList并不适合。
8.Threadlocal
ThreadLocal为变量在每个线程中都创建了一个副本,那么每个线程可以访问自己内部的副本变量。
在spring MVC中,常用ThreadLocal保存当前登陆用户信息,这样线程在任意地方都可以取到用户信息了。
Threadlocal 主要用来做线程变量的隔离
//存放用户信息的ThreadLocal
private static final ThreadLocal<UserInfo> userInfoThreadLocal = new ThreadLocal<>();
public Response handleRequest(UserInfo userInfo) {
Response response = new Response();
try {
// 1.用户信息set到线程局部变量中
userInfoThreadLocal.set(userInfo);
doHandle();
} finally {
// 3.使用完移除掉
userInfoThreadLocal.remove();
}
return response;
}
//业务逻辑处理
private void doHandle () {
// 2.实际用的时候取出来
UserInfo userInfo = userInfoThreadLocal.get();
//查询用户资产
queryUserAsset(userInfo);
}
首先我们通过ThreadLocal<UserInfo> userInfoThreadLocal = new ThreadLocal() 初始化了一个Threadlocal 对象,就是上图中说的Threadlocal 引用,这个引用指向堆中的ThreadLocal 对象。
**我们知道 Thread 类有个 ThreadLocalMap 成员变量,这个Map key是Threadlocal 对象,value是你要存放的线程局部变量(线程隔离的变量)。**
- 然后我们调用userInfoThreadLocal.set(userInfo); 这里做了什么事呢?我们把源代码拿出来,看一看就清晰了。我们知道 Thread 类有个 ThreadLocalMap 成员变量,这个Map key是Threadlocal 对象,value是你要存放的线程局部变量。
这里是在当前线程对象的ThreadlocalMap中put了一个元素(Entry),key是Threadlocal对象,value是userInfo。ThreadLocalMap 类的定义在 Threadlocal中。
Thread 类有个 ThreadlocalMap 属性的成员变量,但是ThreadlocalMap 的定义却在Threadlocal 中,为什么这么做?
就是让使用者知道ThreadLocalMap就只做保存线程局部变量这一件事的。
既然是线程局部变量,那为什么不用线程对象(Thread对象)作为key,这样不是更清晰,直接用线程作为key获取线程变量?
这样设计会有个问题,比如: 我已经把用户信息存在线程变量里了,这个时候需要新增加一个线程变量,比方说新增用户地理位置信息,我们ThreadlocalMap 的key用的是线程,再存一个地理位置信息,key都是同一个线程(key一样),不就把原来的用户信息覆盖了嘛。
那新增地理位置信息应该怎么做?
新创建一个Threadlocal对象就好了,因为ThreadLocalMap的key是Threadlocal 对象,比如新增地理位置,我就再 Threadlocal < Geo> geo = new Threadlocal(), 存放地理位置信息,这样线程的ThreadlocalMap里面会有二个元素,一个是用户信息,一个是地理位置。
ThreadlocalMap 中key是 WeakReference类型,能讲讲Java中有几种类似的引用,什么区别吗?
**强引用**是使用最普遍的引用。如果一个对象具有强引用,那垃圾回收器绝不会回收它,当内存空间不足时,Java虚拟机宁愿抛出OutOfMemoryError错误,使程序异常终止,也不会靠随意回收具有强引用的对象来解决内存不足的问题
如果一个对象只具有**软引用**,则内存空间充足时,垃圾回收器就不会回收它;如果内存空间不足了,就会回收这些对象的内存。
**弱引用**与**软引用**的区别在于:只具有**弱引用**的对象拥有**更短暂**的**生命周期**。在垃圾回收器线程扫描内存区域时,一旦发现了只具有**弱引用**的对象,不管当前**内存空间足够与否**,都会**回收**它的内存。
**虚引用**顾名思义,就是形同虚设。与其他几种引用都不同,虚引用并不会决定对象的生命周期。如果一个对象仅持有虚引用,那么它就和没有任何引用一样,在任何时候都可能被垃圾回收器回收。
为什么ThreadlocalMap 中key 设计成 WeakReference(弱引用)类型吗?
- 避免内存泄漏
private static final ThreadLocal<UserInfo> userInfoThreadLocal = new ThreadLocal<>();
userInfoThreadLocal.set(userInfo);
这里的引用关系是userInfoThreadLocal 引用了ThreadLocal对象,这是个强引用,ThreadLocal对象同时也被ThreadlocalMap的key引用,这是个WeakReference引用,我们前面说GC要回收ThreadLocal对象的前提是它只被WeakReference引用,没有任何强引用
。可以看到一旦一个对象只被弱引用引用,GC的时候就会回收这个对象。所以只要ThreadLocal对象如果还被 userInfoThreadLocal(强引用) 引用着,GC是不会回收被WeakReference引用的对象的
那既然ThreadLocal对象有强引用,回收不掉,干嘛还要设计成WeakReference类型呢?
ThreadLocal的设计者考虑到线程往往生命周期很长,比如经常会用到线程池,线程一直存活着,根据JVM 根搜索算法,一直存在 Thread -> ThreadLocalMap -> Entry(元素)这样一条引用链路,如果key不设计成WeakReference类型,是强引用的话,就一直不会被GC回收,key就一直不会是null,不为null Entry元素就不会被清理(ThreadLocalMap是根据key是否为null来判断是否清理Entry)
所以ThreadLocal的设计者认为只要ThreadLocal 所在的作用域结束了工作被清理了,GC回收的时候就会把key引用对象回收,key置为null,ThreadLocal会尽力保证Entry清理掉来最大可能避免内存泄漏。
那如果Threadlocal 对象一直有强引用,那怎么办?岂不是有内存泄漏风险。
最佳实践是用完手动调用remove函数。
循环打印ABC
public class multiThread {
public static void main(String[] args) throws InterruptedException {
Semaphore a = new Semaphore(1);
Semaphore b = new Semaphore(0);
Semaphore c = new Semaphore(0);
ExecutorService poolService = Executors.newFixedThreadPool(3);
Integer count = 10;
poolService.execute(new Worker(a, b, "A", count));//一开始是信号量a执行acquire(),因为会使信号量减1,所以要初始化为1
poolService.execute(new Worker(b, c, "B", count));
poolService.execute(new Worker(c, a, "C", count));
Thread.sleep(1000);
poolService.shutdownNow();
}
public static class Worker implements Runnable {
private String key;
private Semaphore current;
private Semaphore next;
private Integer count;
public Worker(Semaphore current, Semaphore next, String key, Integer count) {
this.current = current;
this.next = next;
this.key = key;
this.count = count;
}
public void run() {
for(int i = 0; i < count; i++) {
try {
//获取当前的锁
current.acquire(); //current - 1
System.out.println(i+","+key);
//释放next的锁
next.release(); //next + 1
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
9.Fork/Join
思想:分而治之。将一个大任务分割成若干小任务,最终汇总每个小任务的结果得到这个大任务的结果。
举例说明
1、我们举个例子:如果要计算一个超大数组的和,最简单的做法是用一个循环在一个线程内完成:
2、还有一种方法,可以把数组拆成两部分,分别计算,最后加起来就是最终结果,这样可以用两个线程并行执行:
3、如果拆成两部分还是很大,我们还可以继续拆,用4个线程并行执行:
这就是Fork/Join任务的原理:判断一个任务是否足够小。如果是,直接计算,否则,就分拆成几个小任务分别计算。这个过程可以反复“裂变”成一系列小任务。
编码实现
整个任务流程如下所示:
- 首先继承任务,覆写任务的执行方法
- 通过判断阈值,判断该线程是否可以执行
- 如果不能执行,则将任务继续递归分配,利用fork方法,并行执行
- 如果是有返回值的,才需要调用join方法,汇集数据。
主要的两个类:
- RecursiveAction一个递归无结果的ForkJoinTask(没有返回值)
- RecursiveTask 一个递归有结果的ForkJoinTask(有返回值)
整个流程需要三个类完成:
1、ForkJoinPool
- 既然任务是被逐渐的细化的,那就需要把这些任务存在一个池子里面,这个池子就是ForkJoinPool。
- 它与其它的ExecutorService区别主要在于它使用工作窃取,那什么是工作窃取呢?
- 工作窃取:一个大任务会被划分成无数个小任务,这些任务被分配到不同的队列,这些队列有些干活干的块,有些干得慢。于是干得快的,一看自己没任务需要执行了,就去隔壁的队列里面拿去任务执行。
2、ForkJoinTask
ForkJoinTask就是ForkJoinPool里面的每一个任务。他主要有两个子类:RecursiveAction和RecursiveTask。然后通过fork()方法去分配任务执行任务,通过join()方法汇总任务结果。
Fork/Join是一种基于“分治”的算法:通过分解任务,并行执行,最后合并结果得到最终结果。
ForkJoinPool线程池可以把一个大任务分拆成小任务并行执行,任务类必须继承自RecursiveTask或RecursiveAction。
使用Fork/Join模式可以进行并行计算以提高效率。