image.png

Reentrantlock

ReentrantLock 和 synchronized 的区别

  1. 锁分类
    • synchronized 是非公平锁
    • ReentrantLock 可自定义锁是公平还是非公平(具体原理后面解释)
  2. 灵活性
    • synchronized 使用形式非常固定,就是单纯的加锁
    • ReentrantLock 可以尝试获取锁(tryLock)、带超时时间的获取锁、可响应中断(lockInterruptibly)
  3. 可重入(线程可重复多次加锁)
    • synchronized、ReentrantLock 都是可重入的。
  4. 使用方式
    • synchronized 修饰方式加锁,自动释放锁(没有显示的unlock操作)
    • ReentrantLock 显示加锁,显示释放锁(有lock、unlock操作)。
  5. 实现原理
    • synchronized 基于monitor(监视器)模式
    • ReentrantLock 基于 AQS

ReentrantLock 场景

1.可尝试获取锁
比如我有个定时任务,每间隔5秒钟执行一次的任务。但是为了防止前一次任务还没结束,后一次任务又触发了,我就在每次任务执行前先尝试获取一下锁,调用 lock.tryLock(),如果获取锁失败,直接跳过这次任务,等下一次任务执行。
2.公平锁
ReentrantLock是公平的。
3.带条件等待
使用synchronized 结合Object上的 wait和 notify方法可以实现线程间的等待通知机制。但是ReentrantLock结合Condition接口同样可以实现这个功能。而且相比synchronized 使用起来更清晰。

synchronized 只解决了有和没有的问题,但是锁的场景和多样性方面还很欠缺,例如: 带超时时间的获取锁、获取锁非阻塞(尝试获取锁)、带等待条件的请求锁。

AQS

AQS是AbstractQueuedSynchronizer 的缩写,抽象队列同步器,我们很多同步工具ReentrantLock、Semaphore、CountDownLatch、ReadWriteLock,CyclicBarrier 都是基于AQS 实现的。

AQS 基本的原理是它提供了一套共享资源的访问的规范,通过CLH(一个双向链表)的方式把线程等待管理起来。

微信图片_20210819172541.png
它底层采用的是状态标志位(state变量)+FIFO队列的方式来记录获取锁、释放锁、竞争锁等一系列锁操作;
对于AQS而言,其中的state变量可以看做是锁,队列采用的是先进先出的双向链表,state共享状态变量表示锁状态,内部使用CAS对state进行原子操作修改来完成锁状态变更(锁的持有和释放)。

AQS使用一个Volatile的int类型的成员变量来表示同步状态,通过内置的FIFO队列来完成资源获取的排队工作,通过CAS完成对State值的修改。 在获取同步状态时,同步器维护一个同步队列,获取状态失败的线程都会被构造为节点并加入到队列中,并在在队列中进行自旋,其中第一个 Node 称为 Dummy(哑元)或哨兵,用来占位,并不关联线程,移出队列的条件是前驱节点为哨兵节点并且成功获取到了同步状态。在释放同步队列时,同步器调用tryRelease()方法释放同步状态,然后唤醒哨兵节点的后继节点。

线程间通信的几种实现方式

方式一:使用 volatile 关键字
方式二:使用Object类的wait() 和 notify() 方法
方式三:使用JUC工具类 CountDownLatch
方式五:基本LockSupport实现线程间的阻塞和唤醒


1.synchornized原理

基础原理

Java对象头

以 32 位虚拟机为例,普通对象的对象头结构如下,其中的Klass Word为指针,指向对应的Class对象;
📀 JAVA多线程 - 图4
其中 Mark Word 结构为:
📀 JAVA多线程 - 图5
所以一个对象的结构如下:
📀 JAVA多线程 - 图6

synchronized 底层实现是在编译之后在同步代码块前后加上 monitorenter 和 monitorexit 字节码指令,它依赖操作系统底层互斥锁的实现它的作用主要是实现原子性操作和解决共享变量内存可见性问题。
当执行 monitorenter 指令时,线程试图获取锁也就是获取对象监视器 monitor 的持有权。如果对象没有被锁定或者已经获得了锁,锁的计数器+1。 此时其他竞争锁的线程则会进⼊等待队列中 。 执行monitorexit指令时则会把计数器-1,当计数器值为0时,则锁释放,处于等待队列中的线程再继续竞争锁

synchronized是排它锁,当⼀个线程获得锁之后,其他线程必须等待该线程释放锁后才能获得锁,而且 由于Java中的线程和操作系统原生线程是⼀⼀对应的,线程被阻塞或者唤醒时时会从用户态切换到内核态,这种转换非常消耗性能。


Monitor 原理

Mointor可以被翻译为管程,每个Java对象都可以关联一个Mointor,如果使用synchronized给对象上锁(重量级),该对象头的Mark Word中就被设置为指向Monitor对象的指针。
📀 JAVA多线程 - 图7

  • 刚开始时Monitor中的Owner为null
  • 当Thread-2 执行synchronized(obj){}代码时就会将Monitor的所有者Owner 设置为 Thread-2,上锁成功,Monitor中同一时刻只能有一个Owner。
  • 当Thread-2 占据锁时,如果线程Thread-3,Thread-4也来执行synchronized(obj){}代码,就会进入EntryList(阻塞队列)中变成BLOCKED状态。
  • Thread-2 执行完同步代码块的内容,然后唤醒 EntryList 中等待的线程来竞争锁,竞争时是非公平的。
  • 图中 WaitSet 中的 Thread-0,Thread-1 是之前获得过锁,但条件不满足进入 WAITING 状态的线程,后面讲wait-notify 时会分析

P.S:synchronized必须是进入同一个对象的monitor才有上述效果

进阶原理

1.轻量级锁

轻量级锁使用场景:如果一个对象虽然有多线程进行访问,但是多线程的访问时间是错开的(也就是没有竞争),这就可以用来轻量级锁来进行优化。轻量级锁对使用者是透明的,即语法仍然是synchronized,假设有两个方法同步块,利用同一个对象加锁

  1. static final Object obj = new Object();
  2. public static void method1() {
  3. synchronized( obj ) {
  4. // 同步块 A
  5. method2();
  6. }
  7. }
  8. public static void method2() {
  9. synchronized( obj ) {
  10. // 同步块 B
  11. }
  12. }
  1. 创建锁记录(Lock Record)对象,每个线程的栈帧都会包含一个锁记录的结构,内部可以存储锁定对象的Mark Word

📀 JAVA多线程 - 图8

  1. 让锁记录的object reference指向锁对象,并尝试用cas替换Object的Mark Word。将Mark Word的值存入锁记录

📀 JAVA多线程 - 图9

  1. 如果CAS替换成功,对象头中存储锁记录的地址和状态,表示由该线程给对象加锁。

📀 JAVA多线程 - 图10

  1. 如果CAS失败,有两种情况:
  • 如果是其他线程已经持有了该Object的轻量级锁,这时表明有竞争,进入锁膨胀的过程。
  • 如果是自己执行了synchronized锁重入,那么再添加一条Lock Record作为重入的计数

📀 JAVA多线程 - 图11

  1. 当线程退出synchronized代码块的时候,如果有取值为null的锁记录,表示有重入,这时重置锁记录,表示重入计数减一

📀 JAVA多线程 - 图12

  1. 当线程退出synchronized代码块的时候,如果获取的锁记录取值不为 null,那么使用cas将Mark Word的值恢复给对象
    1. 成功则解锁成功
    2. 失败,则说明轻量级锁进行了锁膨胀或已经升级为重量级锁,进入重量级锁解锁流程

2.锁膨胀

如果在尝试加轻量级锁的过程中,cas操作无法成功,这是有一种情况就是其它线程已经为这个对象加上了轻量级锁,这是就要进行锁膨胀,将轻量级锁变成重量级锁。

  1. 当 Thread-1 进行轻量级加锁时,Thread-0 已经对该对象加了轻量级锁

📀 JAVA多线程 - 图13

  1. 这时 Thread-1 加轻量级锁失败,进入锁膨胀流程,即为对象申请Monitor锁,让Object指向重量级锁地址,然后自己进入Monitor 的EntryList 变成BLOCKED状态

📀 JAVA多线程 - 图14

  1. 当Thread-0 推出synchronized同步块时,使用cas将Mark Word的值恢复给对象头,失败,那么会进入重量级锁的解锁过程,即按照Monitor的地址找到Monitor对象,将Owner设置为null,唤醒EntryList 中的Thread-1线程

3.自旋优化

重量级锁竞争的时候,还可以使用自旋来进行优化,如果当前线程自旋成功(即在自旋的时候持锁的线程释放了锁),那么当前线程就可以不用进行上下文切换就获得了锁
自旋会占用 CPU 时间,单核 CPU 自旋就是浪费,多核 CPU 自旋才能发挥优势。在 Java 6 之后自旋锁是自适应的,比如对象刚刚的一次自旋操作成功过,那么认为这次自旋成功的可能性会高,就多自旋几次;反之,就少自旋甚至不自旋,

4.偏向锁

在轻量级的锁中,我们可以发现,如果同一个线程对同一个对象进行重入锁时,也需要执行CAS操作,这是有点耗时,那么java6开始引入了偏向锁的,只有第一次使用CAS时将对象的Mark Word头设置为入锁线程ID,之后这个入锁线程再进行重入锁时,发现线程ID是自己的,那么就不用再进行CAS了
**1.撤销偏向锁-hashcode方法
当调用对象的hashcode方法的时候就会撤销这个对象的偏向锁,因为使用偏向锁时没有位置存hashcode的值了(轻量级锁和重量级锁会把hashcode存在栈帧中和monitor对象中)
**2.撤销偏向锁-其它线程使用对象
当有其它线程使用偏向锁对象时,会将偏向锁升级为轻量级锁
**3.撤销偏向锁 - 调用 wait/notify
会使对象的锁变成重量级锁,因为wait/notify方法之后重量级锁才支持


2.共享模型之内存

2.1 Java内存模型

JMM即Java Memory Model,JMM定义了线程和主内存之间的抽象关系:线程之间的共享变量存储在主内存中,每个线程都有一个私有的本地内存,本地内存中存储了该线程读/写共享变量的副本。本地内存是一个抽象概念,它涵盖了缓存、写缓冲区、寄存器以及其他的硬件。
JMM体现在以下几个方面:

  1. 原子性:保证指令不会受到线程上下文切换的影响
  2. 可见性:保证指令不会受cpu缓存的影响
  3. 有序性:保证指令不会受到cpu指令并行优化的影响

happens-before
happens-before来保证操作之间的可见性,在JMM中,如果一个操作的执行结果需要对另一个操作可见(并不意味着前一个操作必须要在后一个操作之前执行),那么这两个操作之间必须要存在happens-before关系。这两个操作可以在一个线程之内,也可以在不同线程之间。
happens-before规则:

  • 一个线程中的每个操作,happens-before于该线程中的任意后续操作。
  • 对一个锁的解锁,happens-before于随后对这个锁的加锁。
  • 对一个volatile域的写,happens-before于任意后续对这个volatile的读
  • A happens-before B,且B happens-before C, 则A happens-before C

2.2 volatile原理

volatile 的底层实现原理是内存屏障,Memory Barrier(Memory Fence)

  • 对 volatile 变量的写指令后会加入写屏障 ,保证在该屏障之前的,对共享变量的改动,都会同步到主存中
  • 对 volatile 变量的读指令前会加入读屏障 ,保证在该屏障之后,对共享变量的读取,加载的都是主存中最新的数据

2.2 volatile应用场景(单例模式)

double-checked locking 解决

  1. public final class Singleton {
  2. private Singleton() {}
  3. private static volatile Singleton INSTANCE;
  4. public static Singleton getInstance() {
  5. if (INSTANCE == null) {
  6. synchronized (Singleton.class) {
  7. if (INSTANCE == null) {
  8. INSTANCE = new Singleton();
  9. }
  10. }
  11. }
  12. return INSTANCE;
  13. }
  14. }

以上的实现特点是:

  • 懒惰实例化
  • 首次使用getInstance()才使用synchronized加锁,后续无需加锁
  • 有隐含的,但很关键的是:第一个if使用了INSTANCE变量,但是在同步代码块之外

1,2两个线程同时经过第一个if判断后进入,1已经初始化完释放锁,2获得锁进入同步代码块内,如果不加if判断,就会重新创建对象,破坏单例。

(构造方法被排序到赋值操作的后面),用volatile保证synchronized代码块外的共享变量的重排序的问题。

2.2 单例模式的线程安全问题

单例模式有很多实现方法,饿汉、懒汉、静态内部类、枚举类,试分析每种实现下获取单例对象(即调用 getInstance)时的线程安全,并思考注释中的问题
饿汉式:类加载就会导致该单实例对象被创建
懒汉式:类加载不会导致该单实例对象被创建,而是首次使用该对象时才会创建
实现1:饿汉式

  1. // 问题1:为什么加 final,防止子类继承后更改,破坏单例
  2. public final class Singleton {
  3. // 问题3:为什么设置为私有? 只在此类中使用,保证不会再其他地方创建出新的对象。
  4. // 是否能防止反射创建新的实例?不能。
  5. private Singleton() {}
  6. // 问题4:这样初始化是否能保证单例对象创建时的线程安全?
  7. //没有,静态成员变量,是jvm在类加载阶段就进行了初始化,jvm保证了此操作的线程安全性
  8. //.instance需要在调用getInstance时候被初始化,只有static的成员才能在没有创建对象时进行初始化。
  9. //且类的静态成员在类第一次被使用时初始化后就不会再被初始化,保证了单例。
  10. private static final Singleton INSTANCE = new Singleton();
  11. // 问题5:为什么提供静态方法而不是直接将 INSTANCE 设置为 public, 说出你知道的理由。
  12. //1.提供更好的封装性;2.提供范型的支持
  13. public static Singleton getInstance() {
  14. return INSTANCE;
  15. }
  16. }
  17. //getInstance()调用为静态方法,INSTANCE由该方法返回,如果INSTANCE非静态,无法被getInstance()调用
  18. //为什么在⼀个静态方法内调用⼀个非静态成员为什么是非法的?
  19. //静态⽅法可以不通过对象进行调用,而通过类名进行访问。

实现2:饿汉式

  1. // 问题1:枚举单例是如何限制实例个数的:创建枚举类的时候就已经定义好了,每个枚举常量其实就是枚举类的一个静态成员变量
  2. // 问题2:枚举单例在创建时是否有并发问题:没有,这是静态成员变量
  3. // 问题3:枚举单例能否被反射破坏单例:不能
  4. // 问题4:枚举单例能否被反序列化破坏单例:枚举类默认实现了序列化接口,枚举类已经考虑到此问题,无需担心破坏单例
  5. // 问题5:枚举单例属于懒汉式还是饿汉式:饿汉式
  6. // 问题6:枚举单例如果希望加入一些单例创建时的初始化逻辑该如何做:加构造方法就行了
  7. enum Singleton {
  8. INSTANCE;
  9. }

实现3:懒汉式

  1. public final class Singleton {
  2. private Singleton() { }
  3. private static Singleton INSTANCE = null;
  4. // 分析这里的线程安全, 并说明有什么缺点:
  5. //synchronized加载静态方法上,可以保证线程安全。缺点就是锁的范围过大
  6. public static synchronized Singleton getInstance() {
  7. if( INSTANCE != null ){
  8. return INSTANCE;
  9. }
  10. INSTANCE = new Singleton();
  11. return INSTANCE;
  12. }
  13. }

实现4:DCL 懒汉式

  1. public final class Singleton {
  2. private Singleton() { }
  3. // 问题1:解释为什么要加 volatile ?为了防止重排序问题
  4. private static volatile Singleton INSTANCE = null;
  5. // 问题2:对比实现3, 说出这样做的意义:提高了效率
  6. public static Singleton getInstance() {
  7. if (INSTANCE != null) {
  8. return INSTANCE;
  9. }
  10. synchronized (Singleton.class) {
  11. // 问题3:为什么还要在这里加为空判断, 之前不是判断过了吗?这是为了第一次判断时的并发问题。
  12. if (INSTANCE != null) { // t2
  13. return INSTANCE;
  14. }
  15. INSTANCE = new Singleton();
  16. return INSTANCE;
  17. }
  18. }
  19. }
  20. //如果在第一次t1判断INSTANCE为null,进入synchronized同步代码块,但还没有创建对象,
  21. //此时t2在进行第一次判断时因为INSTANCE为null,也会进入同步代码块,如果不加第二次判断就会生成新的对象

实现5:

  1. public final class Singleton {
  2. private Singleton() { }
  3. // 问题1:属于懒汉式还是饿汉式:懒汉式,这是一个静态内部类。类加载本身就是懒惰的,在没有调用getInstance方法时是没有执行LazyHolder内部类的类加载操作的。
  4. private static class LazyHolder {
  5. static final Singleton INSTANCE = new Singleton();
  6. }
  7. // 问题2:在创建时是否有并发问题,这是线程安全的,类加载时,jvm保证类加载操作的线程安全
  8. public static Singleton getInstance() {
  9. return LazyHolder.INSTANCE;
  10. }
  11. }

3.共享模型之无锁

管程即monitor是阻塞式的悲观锁实现并发控制,这章我们将通过非阻塞式的乐观锁的来实现并发控制

3.2CAS和volatile

  1. @Override
  2. public void withdraw(Integer amount) {
  3. // 核心代码
  4. // 需要不断尝试,直到成功为止
  5. while (true){
  6. // 比如拿到了旧值 1000
  7. int pre = getBalance();
  8. // 在这个基础上 1000-10 = 990
  9. int next = pre - amount;
  10. /*
  11. compareAndSet 正是做这个检查,在 set 前,先比较 prev 与当前值
  12. - 不一致了,next 作废,返回 false 表示失败
  13. 比如,别的线程已经做了减法,当前值已经被减成了 990
  14. 那么本线程的这次 990 就作废了,进入 while 下次循环重试
  15. - 一致,以 next 设置为新值,返回 true 表示成功
  16. */
  17. if (atomicInteger.compareAndSet(pre,next)){
  18. break;
  19. }
  20. }
  21. }

其中的关键是 compareAndSet,它的简称就是 CAS (也有 Compare And Swap 的说法),它必须是原子操作。
CAS的缺点主要有3点:
ABA问题:ABA的问题指的是在CAS更新的过程中,当读取到的值是A,然后准备赋值的时候仍然是A, 但是实际上有可能A的值被改成了B,然后⼜被改回了A,这个CAS更新的漏洞就叫做ABA。只是ABA的问 题⼤部分场景下都不影响并发的最终效果。 Java中有AtomicStampedReference来解决这个问题。
循环时间长开销大:⾃旋CAS的⽅式如果⻓时间不成功,会给CPU带来很⼤的开销。
只能保证⼀个共享变量的原子操作:只对⼀个共享变量操作可以保证原⼦性,但是多个则不⾏,多个可 以通过AtomicReference来处理或者使用锁synchronized实现

volatile
在上面代码中的AtomicInteger,保存值的value属性使用了volatile 。获取共享变量时,为了保证该变量的可见性,需要使用 volatile 修饰。
它可以用来修饰成员变量和静态成员变量,他可以避免线程从自己的工作缓存中查找变量的值,必须到主存中获取它的值,线程操作 volatile 变量都是直接操作主存。即一个线程对 volatile 变量的修改,对另一个线程可见。CAS 必须借助 volatile 才能读取到共享变量的最新值来实现【比较并交换】的效果

为什么无锁效率高

  1. 无锁情况下,即使重试失败,线程始终在高速运行,没有停歇,而 synchronized 会让线程在没有获得锁的时候,发生上下文切换,进入阻塞。
  2. 但无锁情况下,因为线程要保持运行,需要额外 CPU 的支持,CPU 在这里就好比高速跑道,没有额外的跑道,线程想高速运行也无从谈起,虽然不会进入阻塞,但由于没有分到时间片,仍然会进入可运行状态,还是会导致上下文切换。(多核CPU才能发挥优势)


CAS 的特点
结合 CAS 和 volatile 可以实现无锁并发,适用于线程数少、多核 CPU 的场景下。

  1. CAS 是基于乐观锁的思想:最乐观的估计,不怕别的线程来修改共享变量,就算改了也没关系,我吃亏点再重试呗。
  2. synchronized 是基于悲观锁的思想:最悲观的估计,得防着其它线程来修改共享变量,我上了锁你们都别想改,我改完了解开锁,你们才有机会。
  3. CAS 体现的是无锁并发、无阻塞并发,请仔细体会这两句话的意思
    1. 因为没有使用 synchronized,所以线程不会陷入阻塞,这是效率提升的因素之一
    2. 但如果竞争激烈(写操作多),可以想到重试必然频繁发生,反而效率会受影响

3.3原子整数

java.util.concurrent.atomic并发包提供了一些并发工具类,这里把它分成五类:

  1. 使用原子的方式更新基本类型上面三个类提供的方法几乎相同,所以我们将以 AtomicInteger 为例子来介绍。
    • AtomicInteger:整型原子类
    • AtomicLong:长整型原子类
    • AtomicBoolean :布尔型原子类
  2. 原子引用
  3. 原子数组
  4. 字段更新器
  5. 原子累加器

下面先讨论原子整数类,以 AtomicInteger 为例讨论它的api接口:通过观察源码可以发现,AtomicInteger 内部都是通过cas的原理来实现的!!

3.3原子引用

为什么需要原子引用类型?保证引用类型的共享变量是线程安全的(确保这个原子引用没有引用过别人)。基本类型原子类只能更新一个变量,如果需要原子更新多个变量,需要使用引用类型原子类。

  • AtomicReference:引用类型原子类
  • AtomicStampedReference:原子更新带有版本号的引用类型。该类将整数值与引用关联起来,可用于解决原子的更新数据和数据的版本号,可以解决使用 CAS 进行原子更新时可能出现的 ABA 问题。


ABA 问题及解决
主线程仅能判断出共享变量的值与最初值 A 是否相同,不能感知到这种从 A 改为 B 又改回 A 的情况,如果主线程希望:只要有其它线程【动过了】共享变量,那么自己的 cas 就算失败,这时,仅比较值是不够的,需要再加一个版本号。使用AtomicStampedReference来解决。

3.4Unsafe

Unsafe 对象提供了非常底层的,操作内存、线程的方法,Unsafe 对象不能直接调用,只能通过反射获得。LockSupport的park方法,cas相关的方法底层都是通过Unsafe类来实现的。

  1. static Unsafe unsafe;
  2. static {
  3. try {
  4. // Unsafe 使用了单例模式,unsafe对象是类中的一个私有的变量
  5. Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
  6. theUnsafe.setAccessible(true);
  7. unsafe = (Unsafe) theUnsafe.get(null);
  8. } catch (NoSuchFieldException | IllegalAccessException e) {
  9. throw new Error(e);
  10. }
  11. }
  12. static Unsafe getUnsafe() {
  13. return unsafe;
  14. }

4.共享模型之不可变

4.1不可变设计

  1. public final class String
  2. implements java.io.Serializable, Comparable<String>, CharSequence {
  3. /** The value is used for character storage. */
  4. private final char value[];
  5. /** Cache the hash code for the string */
  6. private int hash; // Default to 0
  7. // ...
  8. }

发现该类、类中所有属性都是 final 的,属性用 final 修饰保证了该属性是只读的,不能修改,类用 final 修饰保证了该类中的方法不能被覆盖,防止子类无意间破坏不可变性。


保护性拷贝

但有同学会说,使用字符串时,也有一些跟修改相关的方法啊,比如 substring 等,那么下面就看一看这些方法是 如何实现的,就以 substring 为例:

  1. public String substring(int beginIndex, int endIndex) {
  2. if (beginIndex < 0) {
  3. throw new StringIndexOutOfBoundsException(beginIndex);
  4. }
  5. if (endIndex > value.length) {
  6. throw new StringIndexOutOfBoundsException(endIndex);
  7. }
  8. int subLen = endIndex - beginIndex;
  9. if (subLen < 0) {
  10. throw new StringIndexOutOfBoundsException(subLen);
  11. }
  12. // 上面是一些校验,下面才是真正的创建新的String对象
  13. return ((beginIndex == 0) && (endIndex == value.length)) ? this
  14. : new String(value, beginIndex, subLen);
  15. }

发现其内部是调用 String 的构造方法创建了一个新字符串,再进入这个构造看看,是否对 final char[] value 做出 了修改:结果发现也没有,构造新字符串对象时,会生成新的 char[] value,对内容进行复制。这种通过创建副本对象来避免共享的手段称之为【保护性拷贝(defensive copy)】

  1. public String(char value[], int offset, int count) {
  2. if (offset < 0) {
  3. throw new StringIndexOutOfBoundsException(offset);
  4. }
  5. if (count <= 0) {
  6. if (count < 0) {
  7. throw new StringIndexOutOfBoundsException(count);
  8. }
  9. if (offset <= value.length) {
  10. this.value = "".value;
  11. return;
  12. }
  13. }
  14. // Note: offset or count might be near -1>>>1.
  15. if (offset > value.length - count) {
  16. throw new StringIndexOutOfBoundsException(offset + count);
  17. }
  18. // 上面是一些安全性的校验,下面是给String对象的value赋值,新创建了一个数组来保存String对象的值
  19. this.value = Arrays.copyOfRange(value, offset, offset+count);
  20. }

4.2享元模式

享元模式(Flyweight Pattern)主要用于减少创建对象的数量,以减少内存占用和提高性能。这种类型的设计模式属于结构型模式,它提供了减少对象数量从而改善应用所需的对象结构的方式。

  1. 简介定义英文名称:Flyweight pattern. 当需要重用数量有限的同一类对象时
  2. 体现
    1. 在JDK中 Boolean,Byte,Short,Integer,Long,Character 等包装类提供了 valueOf 方法,例如 Long 的valueOf 会缓存 -128~127 之间的 Long 对象,在这个范围之间会重用对象,大于这个范围,才会新建 Long 对象:
    2. String 串池

5.线程池

5.1自定义线程池

  1. class BlockingQueue<T> {
  2. //1.任务队列
  3. private Deque<T> queue = new ArrayDeque<>();
  4. //2.锁
  5. private ReentrantLock lock = new ReentrantLock();
  6. //3.生产者条件变量
  7. private Condition fullWaitSet = lock.newCondition();
  8. //4.消费者条件变量
  9. private Condition emptyWaitSet = lock.newCondition();
  10. //5.容量
  11. private int capcity;
  12. //阻塞获取
  13. public T take() {
  14. lock.lock();
  15. try{
  16. while(queue.isEmpty()) {
  17. emptyWaitSet.await();
  18. }
  19. T t = queue.removeFirst();
  20. //取走了元素,队列不再满了,把队列满了等待这个条件变量唤醒
  21. fullWaitSet.signal();//唤醒
  22. return t;
  23. }finally{
  24. lock.unlock();
  25. }
  26. }
  27. //阻塞添加
  28. public void put(T element) {
  29. lock.lock();
  30. try {
  31. while (queue.size() == capcity) {
  32. //队列满了进入条件变量阻塞
  33. fullWaitSet.await();
  34. }
  35. queue.addList(element);
  36. //添加了元素,队列不为空,把队列为空等待这个条件变量唤醒
  37. emptyWaitSet.signal();//唤醒
  38. }finally {
  39. lock.unlock();
  40. }
  41. }
  42. //获取大小
  43. public int size() {
  44. lock.lock();
  45. try {
  46. return queue.size();
  47. }finally {
  48. lock.unlock();
  49. }
  50. }
  51. }
  1. class ThreadPool {
  2. // 任务队列
  3. private BlockingQueue<Runnable> taskQueue;
  4. // 线程集合
  5. private HashSet<Worker> Allworkers = new HashSet<>();
  6. private int coreSize;
  7. private long timeout;
  8. private TimeUnit timeUnit;
  9. private RejectPolicy<Runnable> rejectPolicy;
  10. // 执行任务
  11. public void execute(Runnable task) {
  12. // 当任务数没有超过 coreSize 时,直接交给 worker 对象执行
  13. // 如果任务数超过 coreSize 时,加入任务队列暂存
  14. synchronized (workers) {
  15. if(workers.size() < coreSize) {
  16. Worker worker = new Worker(task);
  17. Allworkers.add(worker);
  18. worker.start();
  19. } else {
  20. taskQueue.put(task);
  21. }
  22. }
  23. }
  24. public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapcity, RejectPolicy<Runnable> rejectPolicy) {
  25. this.coreSize = coreSize;
  26. this.timeout = timeout;
  27. this.timeUnit = timeUnit;
  28. this.taskQueue = new BlockingQueue<>(queueCapcity);
  29. this.rejectPolicy = rejectPolicy;
  30. }
  31. class Worker extends Thread{
  32. private Runnable task;
  33. public Worker(Runnable task) {
  34. this.task = task;
  35. }
  36. @Override
  37. public void run() {
  38. // 执行任务
  39. // 1) 当 task 不为空,执行任务
  40. // 2) 当 task 执行完毕,再接着从任务队列获取任务并执行
  41. while(task != null || (task = taskQueue.take()) != null) {
  42. try {
  43. log.debug("正在执行...{}", task);
  44. task.run();
  45. } finally {
  46. task = null;
  47. }
  48. }
  49. synchronized (Allworkers) {
  50. log.debug("worker 被移除{}", this);
  51. Allworkers.remove(this);
  52. }
  53. }
  54. }
  55. }

5.2ThreadPoolExecutor

1)Executor 框架结构(主要由三大部分组成)

  1. 任务(Runnable /Callable)

执行任务需要实现的 Runnable 接口 或 Callable接口。Runnable 接口或 Callable 接口 实现类都可以被 ThreadPoolExecutor 或 ScheduledThreadPoolExecutor 执行。

  1. 任务的执行(Executor)

任务执行机制的核心接口 Executor ,以及继承自 Executor 接口的 ExecutorService 接口。ThreadPoolExecutor 和 ScheduledThreadPoolExecutor 这两个关键类实现了 ExecutorService 接口。
这里提了很多底层的类关系,但是,实际上我们需要更多关注的是 ThreadPoolExecutor 这个类,这个类在我们实际使用线程池的过程中,使用频率还是非常高的。

  1. 异步计算的结果(Future)

Future 接口以及Future接口的实现类 FutureTask 类都可以代表异步计算的结果。
当我们把 Runnable接口 或 Callable 接口 的实现类提交给 ThreadPoolExecutor 或 ScheduledThreadPoolExecutor 执行。(调用 submit() 方法时会返回一个 FutureTask 对象)

  1. Executor 框架的使用示意图

image.png

  1. 主线程首先要创建实现 Runnable 或者 Callable 接口的任务对象。
  2. 把创建完成的实现 Runnable/Callable接口的 对象直接交给 ExecutorService 执行: ExecutorService.execute(Runnable command))或者(ExecutorService.submit(Runnable task)或 ExecutorService.submit(Callable task))。
  3. 如果执行 ExecutorService.submit(…),ExecutorService 将返回一个实现Future接口的对象(我们刚刚也提到过了执行 execute()方法和 submit()方法的区别,submit()会返回一个 FutureTask 对象)。
  4. 最后,主线程可以执行 FutureTask.get()方法来等待任务执行完成。主线程也可以执行 FutureTask.cancel(boolean mayInterruptIfRunning)来取消此任务的执行

循环打印ABC

  1. public class multiThread {
  2. public static void main(String[] args) throws InterruptedException {
  3. Semaphore a = new Semaphore(1);
  4. Semaphore b = new Semaphore(0);
  5. Semaphore c = new Semaphore(0);
  6. ExecutorService poolService = Executors.newFixedThreadPool(3);
  7. Integer count = 10;
  8. poolService.execute(new Worker(a, b, "A", count));//一开始是信号量a执行acquire(),因为会使信号量减1,所以要初始化为1
  9. poolService.execute(new Worker(b, c, "B", count));
  10. poolService.execute(new Worker(c, a, "C", count));
  11. Thread.sleep(1000);
  12. poolService.shutdownNow();
  13. }
  14. public static class Worker implements Runnable {
  15. private String key;
  16. private Semaphore current;
  17. private Semaphore next;
  18. private Integer count;
  19. public Worker(Semaphore current, Semaphore next, String key, Integer count) {
  20. this.current = current;
  21. this.next = next;
  22. this.key = key;
  23. this.count = count;
  24. }
  25. public void run() {
  26. for(int i = 0; i < count; i++) {
  27. try {
  28. //获取当前的锁
  29. current.acquire(); //current - 1
  30. System.out.println(i+","+key);
  31. //释放next的锁
  32. next.release(); //next + 1
  33. } catch (InterruptedException e) {
  34. e.printStackTrace();
  35. }
  36. }
  37. }
  38. }
  39. }

2)构造方法

  1. public ThreadPoolExecutor(int corePoolSize,
  2. int maximumPoolSize,
  3. long keepAliveTime,
  4. TimeUnit unit,
  5. BlockingQueue<Runnable> workQueue,
  6. ThreadFactory threadFactory,
  7. RejectedExecutionHandler handler){
  8. }
  1. corePoolSize 核心线程数目 (最多保留的线程数)
  2. maximumPoolSize 最大线程数目(核心线程数加上救急线程数)
  3. keepAliveTime 救急线程的生存时间(核心线程没有生存时间这个东西,核心线程会一直运行)
  4. unit 时间单位 - 针对救急线程
  5. workQueue 阻塞队列
  6. threadFactory 线程工厂 - 可以为线程创建时起个好名字
  7. handler 拒绝策略

CPU密集型:核心线程数 = CPU核数 + 1
IO密集型:核心线程数 = CPU核数 * 2

  1. 线程池中刚开始没有线程,当一个任务提交给线程池后,线程池会创建一个新线程来执行任务。
  2. 当线程数达到 corePoolSize 并没有线程空闲,这时再加入任务,新加的任务会被加入workQueue 队列排队,直到有空闲的线程。
  3. 如果队列选择了有界队列,那么任务超过了队列大小时,会创建 maximumPoolSize - corePoolSize 数目的线程来救急。
  4. 如果线程到达 maximumPoolSize 仍然有新任务这时会执行拒绝策略。拒绝策略 jdk 提供了 下面的前4 种实现,其它著名框架也提供了实现

    1. ThreadPoolExecutor.AbortPolicy让调用者抛出 RejectedExecutionException 异常,这是默认策略
    2. ThreadPoolExecutor.CallerRunsPolicy 让调用者运行任务
    3. ThreadPoolExecutor.DiscardPolicy 放弃本次任务
    4. ThreadPoolExecutor.DiscardOldestPolicy 放弃队列中最早的任务,本任务取而代之
    5. Netty 的实现,是创建一个新线程来执行任务
    6. ActiveMQ 的实现,带超时等待(60s)尝试放入队列,类似我们之前自定义的拒绝策略
  5. 当高峰过去后,超过corePoolSize 的救急线程如果一段时间没有任务做,需要结束节省资源,这个时间由 keepAliveTime 和 unit 来控制。(救急线程有生存时间,核心线程没有生存时间)

3)newFixedThreadPool(固定大小的线程池)

这个是Executors类提供的工厂方法来创建线程池!Executors 是Executor 框架的工具类!

  1. public static ExecutorService newFixedThreadPool(int nThreads) {
  2. return new ThreadPoolExecutor(nThreads, nThreads,
  3. 0L, TimeUnit.MILLISECONDS,
  4. new LinkedBlockingQueue<Runnable>());
  5. }

通过源码可以看到 new ThreadPoolExecutor(xxx)方法其实是是调用了之前说的完整参数的构造方法,使用了默认的线程工厂和拒绝策略!
特点

  1. 核心线程数 == 最大线程数(没有救急线程被创建),因此也无需超时时间
  2. 阻塞队列是无界的,可以放任意数量的任务
  3. 适用于任务量已知,相对耗时的任务

4)newCachedThreadPool(带缓冲的线程池)

  1. public static ExecutorService newCachedThreadPool() {
  2. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
  3. 60L, TimeUnit.SECONDS,
  4. new SynchronousQueue<Runnable>());
  5. }

特点

  1. 核心线程数是 0, 最大线程数是 Integer.MAX_VALUE,救急线程的空闲生存时间是 60s,意味着
    1. 全部都是救急线程(60s 后可以回收)
    2. 救急线程可以无限创建
  2. 队列采用了 SynchronousQueue 实现特点是,它没有容量,没有线程来取是放不进去的
  3. 整个线程池表现为线程数会根据任务量不断增长,没有上限,当任务执行完毕,空闲 1分钟后释放线 程。 适合任务数比较密集,但每个任务执行时间较短的情况

5)newSingleThreadExecutor(单线程线程池)

  1. public static ExecutorService newSingleThreadExecutor() {
  2. //没有直接返回ThreadPoolExecutor(相对于Fixed),应用装饰器模式
  3. return new FinalizableDelegatedExecutorService
  4. (new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
  5. }

使用场景:

  1. 希望多个任务排队执行。线程数固定为 1,任务数多于 1 时,会放入无界队列排队。任务执行完毕,这唯一的线程也不会被释放。
  2. 区别:
    1. 和自己创建单线程执行任务的区别:自己创建一个单线程串行执行任务,如果任务执行失败而终止那么没有任何补救措施,而线程池还会新建一个线程,保证池的正常工作
    2. Executors.newSingleThreadExecutor() 线程个数始终为1,不能修改
      1. FinalizableDelegatedExecutorService 应用的是装饰器模式,只对外暴露了 ExecutorService 接口,因 此不能调用 ThreadPoolExecutor 中特有的方法
    3. 和Executors.newFixedThreadPool(1) 初始时为1时的区别:Executors.newFixedThreadPool(1) 初始时为1,以后还可以修改,对外暴露的是 ThreadPoolExecutor 对象,可以强转后调用 setCorePoolSize 等方法进行修改

5.3线程池调参

1)生命周期

ThreadPoolExecutor的运行状态有5种,分别为:
image.png
image.png

2)参数设置

image.png
动态化线程池的核心设计包括以下三个方面:

  1. 简化线程池配置:线程池构造参数有8个,但是最核心的是3个:corePoolSize、maximumPoolSize,workQueue,它们最大程度地决定了线程池的任务分配和线程分配策略。考虑到在实际应用中我们获取并发性的场景主要是两种:(1)并行执行子任务,提高响应速度。这种情况下,应该使用同步队列,没有什么任务应该被缓存下来,而是应该立即执行。(2)并行执行大批次任务,提升吞吐量。这种情况下,应该使用有界队列,使用队列去缓冲大批量的任务,队列容量必须声明,防止任务无限制堆积。所以线程池只需要提供这三个关键参数的配置,并且提供两种队列的选择,就可以满足绝大多数的业务需求,Less is More。
  2. 参数可动态修改:为了解决参数不好配,修改参数成本高等问题。在Java线程池留有高扩展性的基础上,封装线程池,允许线程池监听同步外部的消息,根据消息进行修改配置。将线程池的配置放置在平台侧,允许开发同学简单的查看、修改线程池配置。
  3. 增加线程池监控:对某事物缺乏状态的观测,就对其改进无从下手。在线程池执行任务的生命周期添加监控能力,帮助开发同学了解线程池状态。

image.png

6.AQS

6.1AQS原理

  1. 概述:全称是 AbstractQueuedSynchronizer,是阻塞式锁和相关的同步器工具的框架
  2. 特点:
    1. 用 state 属性来表示资源的状态(分独占模式和共享模式),子类需要定义如何维护这个状态,控制如何获取锁和释放锁
      1. getState - 获取 state 状态
      2. setState - 设置 state 状态
      3. compareAndSetState - cas 机制设置 state 状态
      4. 独占模式是只有一个线程能够访问资源,而共享模式可以允许多个线程访问资源
    2. 提供了基于 FIFO 的等待队列,类似于 Monitor 的 EntryList
    3. 条件变量来实现等待、唤醒机制,支持多个条件变量,类似于 Monitor 的 WaitSet

子类主要实现这样一些方法(默认抛出 UnsupportedOperationException)

  1. tryAcquire
  2. tryRelease
  3. tryAcquireShared
  4. tryReleaseShared
  5. isHeldExclusively ```java //获取锁的姿势 // 如果获取锁失败 if (!tryAcquire(arg)) { // 入队, 可以选择阻塞当前线程 park unpark }

//释放锁的姿势 // 如果释放锁成功 if (tryRelease(arg)) { // 让阻塞线程恢复运行 }

  1. <a name="XgwPR"></a>
  2. ## 6.2ReentrantLock原理
  3. ![](https://cdn.nlark.com/yuque/0/2021/png/12730136/1622620859415-fc408910-9a51-4c47-bae3-f848b845faf1.png#clientId=u28093bf3-f0f8-4&from=paste&id=ua4b316ba&margin=%5Bobject%20Object%5D&originHeight=557&originWidth=1181&originalType=url&ratio=1&status=done&style=none&taskId=u9d9ce6f4-26f6-4b21-9b70-e12085aafbe)
  4. <a name="CHjoo"></a>
  5. ### <br />1. 非公平锁实现原理
  6. 加锁解锁流程,先从构造器开始看,默认为非公平锁实现
  7. ```java
  8. public ReentrantLock() {
  9. sync = new NonfairSync();
  10. }
  11. static final class NonfairSync extends Sync {
  12. final void lock() {
  13. //AQS状态,0表示未加锁, 1表示加锁
  14. if(compareAndSetState(0, 1))
  15. setExclusiveOwnerThread(Thread.currentThread());//把owner线程改成当前线程
  16. else
  17. acquire(1);
  18. }
  19. }
  20. public final void acquire(int arg) {
  21. // 再次尝试加锁, 然后为 true 就不走下面逻辑,为 false,则创建一个 Node 节点对象加入到等待队列中去
  22. if (!tryAcquire(arg) &&
  23. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  24. selfInterrupt();
  25. }

NonfairSync 继承自 AQS
没有竞争时
📀 JAVA多线程 - 图20
第一个竞争出现时,查看源码的NonfairSync的lock方法
📀 JAVA多线程 - 图21
Thread-1 执行了

  1. lock方法中CAS 尝试将 state 由 0 改为 1,结果失败
  2. lock方法中进一步调用acquire方法,进入 tryAcquire 逻辑,这里我们认为这时 state 已经是1,结果仍然失败
  3. 接下来进入 acquire方法的addWaiter 逻辑,构造 Node 队列
    1. 图中黄色三角表示该 Node 的 waitStatus 状态,其中 0 为默认正常状态
    2. Node 的创建是懒惰的
    3. 其中第一个 Node 称为 Dummy(哑元)或哨兵,用来占位,并不关联线程

📀 JAVA多线程 - 图22
当前线程进入 acquire方法的 acquireQueued 逻辑

  1. acquireQueued 会在一个死循环中不断尝试获得锁,失败后进入 park 阻塞
  2. 如果自己是紧邻着 head(排第二位),那么再次 tryAcquire 尝试获取锁,我们这里设置这时 state 仍为 1,失败
  3. 进入 shouldParkAfterFailedAcquire 逻辑,将前驱 node,即 head 的 waitStatus 改为 -1,这次返回 false,-1表示有责任唤醒它后继的节点。

📀 JAVA多线程 - 图23

  1. shouldParkAfterFailedAcquire 执行完毕回到 acquireQueued ,再次 tryAcquire 尝试获取锁,当然这时 state 仍为 1,失败
  2. 当再次进入 shouldParkAfterFailedAcquire 时,这时因为其前驱 node 的 waitStatus 已经是 -1,这次返回 true
  3. 进入 parkAndCheckInterrupt, Thread-1 park(灰色表示已经阻塞)📀 JAVA多线程 - 图24

再次有多个线程经历上述过程竞争失败,变成这个样子
📀 JAVA多线程 - 图25
Thread-0 调用unlock方法里的release方法释放锁,进入tryRelease流程,如果成功,设置 exclusiveOwnerThread 为 null,state = 0
📀 JAVA多线程 - 图26
unlock方法里的release方法方法中,如果当前队列不为 null,并且 head 的 waitStatus = -1,进入 unparkSuccessor 流程: unparkSuccessor中会找到队列中离 head 最近的一个 Node(没取消的),unpark 恢复其运行,本例中即为 Thread-1 回到 Thread-1 的 acquireQueued 流程
📀 JAVA多线程 - 图27
如果加锁成功(没有竞争),会设置 (acquireQueued 方法中)

  1. exclusiveOwnerThread 为 Thread-1,state = 1
  2. head 指向刚刚 Thread-1 所在的 Node,该 Node 清空 Thread
  3. 原本的 head 因为从链表断开,而可被垃圾回收

如果这时候有其它线程来竞争(非公平的体现),例如这时有 Thread-4 来了
📀 JAVA多线程 - 图28
如果不巧又被 Thread-4 占了先

  1. Thread-4 被设置为 exclusiveOwnerThread,state = 1
  2. Thread-1 再次进入 acquireQueued 流程,获取锁失败,重新进入 park 阻塞


    加锁源码

    1. // Sync 继承自 AQS
    2. static final class NonfairSync extends Sync {
    3. private static final long serialVersionUID = 7316153563782823691L;
    4. // 加锁实现
    5. final void lock() {
    6. // 首先用 cas 尝试(仅尝试一次)将 state 从 0 改为 1, 如果成功表示获得了独占锁
    7. if (compareAndSetState(0, 1))
    8. setExclusiveOwnerThread(Thread.currentThread());
    9. else
    10. // 如果尝试失败,进入 ㈠
    11. acquire(1);
    12. }
    13. // ㈠ AQS 继承过来的方法, 方便阅读, 放在此处
    14. public final void acquire(int arg) {
    15. // ㈡ tryAcquire
    16. if (
    17. !tryAcquire(arg) &&
    18. // 当 tryAcquire 返回为 false 时, 先调用 addWaiter ㈣, 接着 acquireQueued ㈤
    19. acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
    20. ) {
    21. selfInterrupt();
    22. }
    23. }
    24. // ㈡ 进入 ㈢
    25. protected final boolean tryAcquire(int acquires) {
    26. return nonfairTryAcquire(acquires);
    27. }
    28. // ㈢ Sync 继承过来的方法, 方便阅读, 放在此处
    29. final boolean nonfairTryAcquire(int acquires) {
    30. final Thread current = Thread.currentThread();
    31. int c = getState();
    32. // 如果还没有获得锁
    33. if (c == 0) {
    34. // 尝试用 cas 获得, 这里体现了非公平性: 不去检查 AQS 队列
    35. if (compareAndSetState(0, acquires)) {
    36. setExclusiveOwnerThread(current);
    37. return true;
    38. }
    39. }
    40. // 如果已经获得了锁, 线程还是当前线程, 表示发生了锁重入
    41. else if (current == getExclusiveOwnerThread()) {
    42. // state++
    43. int nextc = c + acquires;
    44. if (nextc < 0) // overflow
    45. throw new Error("Maximum lock count exceeded");
    46. setState(nextc);
    47. return true;
    48. }
    49. // 获取失败, 回到调用处
    50. return false;
    51. }
    52. // ㈣ AQS 继承过来的方法, 方便阅读, 放在此处
    53. private Node addWaiter(Node mode) {
    54. // 将当前线程关联到一个 Node 对象上, 模式为独占模式,
    55. //新建的Node的waitstatus默认为0,因为waitstatus是成员变量,默认被初始化为0
    56. Node node = new Node(Thread.currentThread(), mode);
    57. // 如果 tail 不为 null, cas 尝试将 Node 对象加入 AQS 队列尾部
    58. Node pred = tail;
    59. if (pred != null) {
    60. node.prev = pred;
    61. if (compareAndSetTail(pred, node)) {
    62. // 双向链表
    63. pred.next = node;
    64. return node;
    65. }
    66. }
    67. //如果tail为null,尝试将 Node 加入 AQS, 进入 ㈥
    68. enq(node);
    69. return node;
    70. }
    71. // ㈥ AQS 继承过来的方法, 方便阅读, 放在此处
    72. private Node enq(final Node node) {
    73. for (;;) {
    74. Node t = tail;
    75. if (t == null) {
    76. // 还没有, 设置 head 为哨兵节点(不对应线程,状态为 0)
    77. if (compareAndSetHead(new Node())) {
    78. tail = head;
    79. }
    80. } else {
    81. // cas 尝试将 Node 对象加入 AQS 队列尾部
    82. node.prev = t;
    83. if (compareAndSetTail(t, node)) {
    84. t.next = node;
    85. return t;
    86. }
    87. }
    88. }
    89. }
    90. // ㈤ AQS 继承过来的方法, 方便阅读, 放在此处
    91. final boolean acquireQueued(final Node node, int arg) {
    92. boolean failed = true;
    93. try {
    94. boolean interrupted = false;
    95. for (;;) {
    96. final Node p = node.predecessor();
    97. // 上一个节点是 head, 表示轮到自己(当前线程对应的 node)了, 尝试获取
    98. if (p == head && tryAcquire(arg)) {
    99. // 获取成功, 设置自己(当前线程对应的 node)为 head
    100. setHead(node);
    101. // 上一个节点 help GC
    102. p.next = null;
    103. failed = false;
    104. // 返回中断标记 false
    105. return interrupted;
    106. }
    107. if (
    108. // 判断是否应当 park, 进入 ㈦
    109. shouldParkAfterFailedAcquire(p, node) &&
    110. // park 等待, 此时 Node 的状态被置为 Node.SIGNAL ㈧
    111. parkAndCheckInterrupt()
    112. ) {
    113. interrupted = true;
    114. }
    115. }
    116. } finally {
    117. if (failed)
    118. cancelAcquire(node);
    119. }
    120. }
    121. // ㈦ AQS 继承过来的方法, 方便阅读, 放在此处
    122. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    123. // 获取上一个节点的状态
    124. int ws = pred.waitStatus;
    125. if (ws == Node.SIGNAL) {
    126. // 上一个节点都在阻塞, 那么自己也阻塞好了
    127. return true;
    128. }
    129. // > 0 表示取消状态
    130. if (ws > 0) {
    131. // 上一个节点取消, 那么重构删除前面所有取消的节点, 返回到外层循环重试
    132. do {
    133. node.prev = pred = pred.prev;
    134. } while (pred.waitStatus > 0);
    135. pred.next = node;
    136. } else {
    137. // 这次还没有阻塞
    138. // 但下次如果重试不成功, 则需要阻塞,这时需要设置上一个节点状态为 Node.SIGNAL
    139. compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    140. }
    141. return false;
    142. }
    143. // ㈧ 阻塞当前线程
    144. private final boolean parkAndCheckInterrupt() {
    145. LockSupport.park(this);
    146. return Thread.interrupted();
    147. }
    148. }


    解锁源码

    1. // Sync 继承自 AQS
    2. static final class NonfairSync extends Sync {
    3. // 解锁实现
    4. public void unlock() {
    5. sync.release(1);
    6. }
    7. // AQS 继承过来的方法, 方便阅读, 放在此处
    8. public final boolean release(int arg) {
    9. // 尝试释放锁, 进入 ㈠
    10. if (tryRelease(arg)) {
    11. // 队列头节点 unpark
    12. Node h = head;
    13. if (
    14. // 队列不为 null
    15. h != null &&
    16. // waitStatus == Node.SIGNAL 才需要 unpark
    17. h.waitStatus != 0
    18. ) {
    19. // unpark AQS 中等待的线程, 进入 ㈡
    20. unparkSuccessor(h);
    21. }
    22. return true;
    23. }
    24. return false;
    25. }
    26. // ㈠ Sync 继承过来的方法, 方便阅读, 放在此处
    27. protected final boolean tryRelease(int releases) {
    28. // state--
    29. int c = getState() - releases;
    30. if (Thread.currentThread() != getExclusiveOwnerThread())
    31. throw new IllegalMonitorStateException();
    32. boolean free = false;
    33. // 支持锁重入, 只有 state 减为 0, 才释放成功
    34. if (c == 0) {
    35. free = true;
    36. setExclusiveOwnerThread(null);
    37. }
    38. setState(c);
    39. return free;
    40. }
    41. // ㈡ AQS 继承过来的方法, 方便阅读, 放在此处
    42. private void unparkSuccessor(Node node) {
    43. // 如果状态为 Node.SIGNAL 尝试重置状态为 0, 如果线程获取到了锁那么后来头结点会被抛弃掉
    44. // 不成功也可以
    45. int ws = node.waitStatus;
    46. if (ws < 0) {
    47. compareAndSetWaitStatus(node, ws, 0);
    48. }
    49. // 找到需要 unpark 的节点, 但本节点从 AQS 队列中脱离, 是由唤醒节点完成的
    50. Node s = node.next;
    51. // 不考虑已取消的节点, 从 AQS 队列从后至前找到队列最前面需要 unpark 的节点
    52. if (s == null || s.waitStatus > 0) {
    53. s = null;
    54. for (Node t = tail; t != null && t != node; t = t.prev)
    55. if (t.waitStatus <= 0)
    56. s = t;
    57. }
    58. if (s != null)
    59. LockSupport.unpark(s.thread);
    60. }
    61. }

可重入原理

  1. /**
  2. *1.加锁时重入,state++
  3. *2.解锁时,state--
  4. */
  5. static final class NonfairSync extends Sync {
  6. // ...
  7. // Sync 继承过来的方法, 方便阅读, 放在此处
  8. final boolean nonfairTryAcquire(int acquires) {
  9. final Thread current = Thread.currentThread();
  10. int c = getState();
  11. if (c == 0) {
  12. if (compareAndSetState(0, acquires)) {
  13. setExclusiveOwnerThread(current);
  14. return true;
  15. }
  16. }
  17. // 如果已经获得了锁, 线程还是当前线程, 表示发生了锁重入
  18. else if (current == getExclusiveOwnerThread()) {
  19. // state++
  20. int nextc = c + acquires;
  21. if (nextc < 0) // overflow
  22. throw new Error("Maximum lock count exceeded");
  23. setState(nextc);
  24. return true;
  25. }
  26. return false;
  27. }
  28. // Sync 继承过来的方法, 方便阅读, 放在此处
  29. protected final boolean tryRelease(int releases) {
  30. // state--
  31. int c = getState() - releases;
  32. if (Thread.currentThread() != getExclusiveOwnerThread())
  33. throw new IllegalMonitorStateException();
  34. boolean free = false;
  35. // 支持锁重入, 只有 state 减为 0, 才释放成功
  36. if (c == 0) {
  37. free = true;
  38. setExclusiveOwnerThread(null);
  39. }
  40. setState(c);
  41. return free;
  42. }
  43. }

公平锁实现原理

与非公平锁主要区别在于 tryAcquire 方法的实现
先检查 AQS 队列中是否有前驱节点, 没有才去竞争

  1. static final class FairSync extends Sync {
  2. private static final long serialVersionUID = -3000897897090466540L;
  3. final void lock() {
  4. acquire(1);
  5. }
  6. // AQS 继承过来的方法, 方便阅读, 放在此处
  7. public final void acquire(int arg) {
  8. if (
  9. !tryAcquire(arg) &&
  10. acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
  11. ) {
  12. selfInterrupt();
  13. }
  14. }
  15. // 与非公平锁主要区别在于 tryAcquire 方法的实现
  16. protected final boolean tryAcquire(int acquires) {
  17. final Thread current = Thread.currentThread();
  18. int c = getState();
  19. if (c == 0) {
  20. // 先检查 AQS 队列中是否有前驱节点, 没有才去竞争
  21. if (!hasQueuedPredecessors() &&
  22. compareAndSetState(0, acquires)) {
  23. setExclusiveOwnerThread(current);
  24. return true;
  25. }
  26. }
  27. else if (current == getExclusiveOwnerThread()) {
  28. int nextc = c + acquires;
  29. if (nextc < 0)
  30. throw new Error("Maximum lock count exceeded");
  31. setState(nextc);
  32. return true;
  33. }
  34. return false;
  35. }
  36. // ㈠ AQS 继承过来的方法, 方便阅读, 放在此处
  37. public final boolean hasQueuedPredecessors() {
  38. Node t = tail;
  39. Node h = head;
  40. Node s;
  41. // h != t 时表示队列中有 Node
  42. return h != t &&
  43. (
  44. // (s = h.next) == null 表示队列中还有没有老二
  45. (s = h.next) == null || // 或者队列中老二线程不是此线程
  46. s.thread != Thread.currentThread()
  47. );
  48. }
  49. }

条件变量实现原理

每个条件变量其实就对应着一个等待队列,其实现类是 ConditionObjectawait 流程
开始 Thread-0 持有锁,调用 await,进入 ConditionObject 的 addConditionWaiter 流程 创建新的 Node 状态为 -2(Node.CONDITION),关联 Thread-0,加入等待队列尾部
image.png
接下来进入 AQS 的 fullyRelease 流程,释放同步器上的锁
image.png
unpark AQS 队列中的下一个节点,竞争锁,假设没有其他竞争线程,那么 Thread-1 竞争成功
image.png
park 阻塞 Thread-0
image.png
signal 流程
假设 Thread-1 要来唤醒 Thread-0
image.png
进入 ConditionObject 的 doSignal 流程,取得等待队列中第一个 Node,即 Thread-0 所在 Node
image.png
执行 transferForSignal 流程,将该 Node 加入 AQS 队列尾部,将 Thread-0 的 waitStatus 改为 0,Thread-3 的waitStatus 改为 -1
image.pngThread-1 释放锁,进入 unlock 流程。

  1. public class ConditionObject implements Condition, java.io.Serializable {
  2. private static final long serialVersionUID = 1173984872572414699L;
  3. // 第一个等待节点
  4. private transient Node firstWaiter;
  5. // 最后一个等待节点
  6. private transient Node lastWaiter;
  7. public ConditionObject() { }
  8. // ㈠ 添加一个 Node 至等待队列
  9. private Node addConditionWaiter() {
  10. Node t = lastWaiter;
  11. // 所有已取消的 Node 从队列链表删除, 见 ㈡
  12. if (t != null && t.waitStatus != Node.CONDITION) {
  13. unlinkCancelledWaiters();
  14. t = lastWaiter;
  15. }
  16. // 创建一个关联当前线程的新 Node, 添加至队列尾部
  17. Node node = new Node(Thread.currentThread(), Node.CONDITION);
  18. if (t == null)
  19. firstWaiter = node;
  20. else
  21. t.nextWaiter = node;
  22. lastWaiter = node;
  23. return node;
  24. }
  25. // 唤醒 - 将没取消的第一个节点转移至 AQS 队列
  26. private void doSignal(Node first) {
  27. do {
  28. // 已经是尾节点了
  29. if ( (firstWaiter = first.nextWaiter) == null) {
  30. lastWaiter = null;
  31. }
  32. first.nextWaiter = null;
  33. } while (
  34. // 将等待队列中的 Node 转移至 AQS 队列, 不成功且还有节点则继续循环 ㈢
  35. !transferForSignal(first) &&
  36. // 队列还有节点
  37. (first = firstWaiter) != null
  38. );
  39. }
  40. // 外部类方法, 方便阅读, 放在此处
  41. // ㈢ 如果节点状态是取消, 返回 false 表示转移失败, 否则转移成功
  42. final boolean transferForSignal(Node node) {
  43. // 设置当前node状态为0(因为处在队列末尾),如果状态已经不是 Node.CONDITION, 说明被取消了
  44. if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
  45. return false;
  46. // 加入 AQS 队列尾部
  47. Node p = enq(node);
  48. int ws = p.waitStatus;
  49. if (
  50. // 插入节点的上一个节点被取消
  51. ws > 0 ||
  52. // 插入节点的上一个节点不能设置状态为 Node.SIGNAL
  53. !compareAndSetWaitStatus(p, ws, Node.SIGNAL)
  54. ) {
  55. // unpark 取消阻塞, 让线程重新同步状态
  56. LockSupport.unpark(node.thread);
  57. }
  58. return true;
  59. }
  60. // 全部唤醒 - 等待队列的所有节点转移至 AQS 队列
  61. private void doSignalAll(Node first) {
  62. lastWaiter = firstWaiter = null;
  63. do {
  64. Node next = first.nextWaiter;
  65. first.nextWaiter = null;
  66. transferForSignal(first);
  67. first = next;
  68. } while (first != null);
  69. }
  70. // ㈡
  71. private void unlinkCancelledWaiters() {
  72. // ...
  73. }
  74. // 唤醒 - 必须持有锁才能唤醒, 因此 doSignal 内无需考虑加锁
  75. public final void signal() {
  76. // 如果没有持有锁,会抛出异常
  77. if (!isHeldExclusively())
  78. throw new IllegalMonitorStateException();
  79. Node first = firstWaiter;
  80. if (first != null)
  81. doSignal(first);
  82. }
  83. // 全部唤醒 - 必须持有锁才能唤醒, 因此 doSignalAll 内无需考虑加锁
  84. public final void signalAll() {
  85. if (!isHeldExclusively())
  86. throw new IllegalMonitorStateException();
  87. Node first = firstWaiter;
  88. if (first != null)
  89. doSignalAll(first);
  90. }
  91. // 不可打断等待 - 直到被唤醒
  92. public final void awaitUninterruptibly() {
  93. // 添加一个 Node 至等待队列, 见 ㈠
  94. Node node = addConditionWaiter();
  95. // 释放节点持有的锁, 见 ㈣
  96. int savedState = fullyRelease(node);
  97. boolean interrupted = false;
  98. // 如果该节点还没有转移至 AQS 队列, 阻塞
  99. while (!isOnSyncQueue(node)) {
  100. // park 阻塞
  101. LockSupport.park(this);
  102. // 如果被打断, 仅设置打断状态
  103. if (Thread.interrupted())
  104. interrupted = true;
  105. }
  106. // 唤醒后, 尝试竞争锁, 如果失败进入 AQS 队列
  107. if (acquireQueued(node, savedState) || interrupted)
  108. selfInterrupt();
  109. }
  110. // 外部类方法, 方便阅读, 放在此处
  111. // ㈣ 因为某线程可能重入,需要将 state 全部释放,获取state,然后把它全部减掉,以全部释放
  112. final int fullyRelease(Node node) {
  113. boolean failed = true;
  114. try {
  115. int savedState = getState();
  116. // 唤醒等待队列队列中的下一个节点
  117. if (release(savedState)) {
  118. failed = false;
  119. return savedState;
  120. } else {
  121. throw new IllegalMonitorStateException();
  122. }
  123. } finally {
  124. if (failed)
  125. node.waitStatus = Node.CANCELLED;
  126. }
  127. }
  128. // 打断模式 - 在退出等待时重新设置打断状态
  129. private static final int REINTERRUPT = 1;
  130. // 打断模式 - 在退出等待时抛出异常
  131. private static final int THROW_IE = -1;
  132. // 判断打断模式
  133. private int checkInterruptWhileWaiting(Node node) {
  134. return Thread.interrupted() ?
  135. (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
  136. 0;
  137. }
  138. // ㈤ 应用打断模式
  139. private void reportInterruptAfterWait(int interruptMode)
  140. throws InterruptedException {
  141. if (interruptMode == THROW_IE)
  142. throw new InterruptedException();
  143. else if (interruptMode == REINTERRUPT)
  144. selfInterrupt();
  145. }
  146. // 等待 - 直到被唤醒或打断
  147. public final void await() throws InterruptedException {
  148. if (Thread.interrupted()) {
  149. throw new InterruptedException();
  150. }
  151. // 添加一个 Node 至等待队列, 见 ㈠
  152. Node node = addConditionWaiter();
  153. // 释放节点持有的锁
  154. int savedState = fullyRelease(node);
  155. int interruptMode = 0;
  156. // 如果该节点还没有转移至 AQS 队列, 阻塞
  157. while (!isOnSyncQueue(node)) {
  158. // park 阻塞
  159. LockSupport.park(this);
  160. // 如果被打断, 退出等待队列
  161. if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  162. break;
  163. }
  164. // 退出等待队列后, 还需要获得 AQS 队列的锁
  165. if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
  166. interruptMode = REINTERRUPT;
  167. // 所有已取消的 Node 从队列链表删除, 见 ㈡
  168. if (node.nextWaiter != null)
  169. unlinkCancelledWaiters();
  170. // 应用打断模式, 见 ㈤
  171. if (interruptMode != 0)
  172. reportInterruptAfterWait(interruptMode);
  173. }
  174. // 等待 - 直到被唤醒或打断或超时
  175. public final long awaitNanos(long nanosTimeout) throws InterruptedException {
  176. if (Thread.interrupted()) {
  177. throw new InterruptedException();
  178. }
  179. // 添加一个 Node 至等待队列, 见 ㈠
  180. Node node = addConditionWaiter();
  181. // 释放节点持有的锁
  182. int savedState = fullyRelease(node);
  183. // 获得最后期限
  184. final long deadline = System.nanoTime() + nanosTimeout;
  185. int interruptMode = 0;
  186. // 如果该节点还没有转移至 AQS 队列, 阻塞
  187. while (!isOnSyncQueue(node)) {
  188. // 已超时, 退出等待队列
  189. if (nanosTimeout <= 0L) {
  190. transferAfterCancelledWait(node);
  191. break;
  192. }
  193. // park 阻塞一定时间, spinForTimeoutThreshold 为 1000 ns
  194. if (nanosTimeout >= spinForTimeoutThreshold)
  195. LockSupport.parkNanos(this, nanosTimeout);
  196. // 如果被打断, 退出等待队列
  197. if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  198. break;
  199. nanosTimeout = deadline - System.nanoTime();
  200. }
  201. // 退出等待队列后, 还需要获得 AQS 队列的锁
  202. if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
  203. interruptMode = REINTERRUPT;
  204. // 所有已取消的 Node 从队列链表删除, 见 ㈡
  205. if (node.nextWaiter != null)
  206. unlinkCancelledWaiters();
  207. // 应用打断模式, 见 ㈤
  208. if (interruptMode != 0)
  209. reportInterruptAfterWait(interruptMode);
  210. return deadline - System.nanoTime();
  211. }
  212. // 等待 - 直到被唤醒或打断或超时, 逻辑类似于 awaitNanos
  213. public final boolean awaitUntil(Date deadline) throws InterruptedException {
  214. // ...
  215. }
  216. // 等待 - 直到被唤醒或打断或超时, 逻辑类似于 awaitNanos
  217. public final boolean await(long time, TimeUnit unit) throws InterruptedException {
  218. // ...
  219. }
  220. // 工具方法 省略 ...
  221. }

6.3 Semaphore

信号量,用来限制能同时访问共享资源的线程上限。

  1. public static void main(String[] args) {
  2. // 1. 创建一个对象
  3. Semaphore semaphore = new Semaphore(3);
  4. // 2. 开 10 个线程
  5. for(int i = 0; i < 10; i++) {
  6. new Thread(() -> {
  7. // 获取一个许可
  8. try {
  9. semaphore.acquire();
  10. } catch (InterruptedException e) {
  11. e.printStackTrace();
  12. }
  13. try {
  14. log.info("start ...");
  15. Thread.sleep(1000);
  16. log.info("end ....");
  17. } catch (InterruptedException e) {
  18. e.printStackTrace();
  19. } finally {
  20. semaphore.release();
  21. }
  22. }, "t" + (i + 1)).start();;
  23. }
  24. }

Semaphore 有点像一个停车场,permits 就好像停车位数量,当线程获得了 permits 就像是获得了停车位,然后停车场显示空余车位减一刚开始,permits(state)为 3,这时 5 个线程来获取资源。
image.png
假设其中 Thread-1,Thread-2,Thread-4 cas 竞争成功,而 Thread-0 和 Thread-3 竞争失败,进入 AQS 队列park 阻塞
image.png
这时 Thread-4 释放了 permits,状态如下
image.png
接下来 Thread-0 竞争成功,permits 再次设置为 0,设置自己为 head 节点,断开原来的 head 节点,unpark 接下来的 Thread-3 节点,但由于 permits 是 0,因此 Thread-3 在尝试不成功后再次进入 park 状态
image.png

  1. static final class NonfairSync extends Sync {
  2. private static final long serialVersionUID = -2694183684443567898L;
  3. NonfairSync(int permits) {
  4. // permits 即 state
  5. super(permits);
  6. }
  7. // Semaphore 方法, 方便阅读, 放在此处
  8. public void acquire() throws InterruptedException {
  9. sync.acquireSharedInterruptibly(1);
  10. }
  11. // AQS 继承过来的方法, 方便阅读, 放在此处
  12. public final void acquireSharedInterruptibly(int arg)
  13. throws InterruptedException {
  14. if (Thread.interrupted())
  15. throw new InterruptedException();
  16. if (tryAcquireShared(arg) < 0)
  17. doAcquireSharedInterruptibly(arg);
  18. }
  19. // 尝试获得共享锁
  20. protected int tryAcquireShared(int acquires) {
  21. return nonfairTryAcquireShared(acquires);
  22. }
  23. // Sync 继承过来的方法, 方便阅读, 放在此处
  24. final int nonfairTryAcquireShared(int acquires) {
  25. for (;;) {
  26. int available = getState();
  27. int remaining = available - acquires;
  28. if (
  29. // 如果许可已经用完, 返回负数, 表示获取失败, 进入 doAcquireSharedInterruptibly
  30. remaining < 0 ||
  31. // 如果 cas 重试成功, 返回正数, 表示获取成功
  32. compareAndSetState(available, remaining)
  33. ) {
  34. return remaining;
  35. }
  36. }
  37. }
  38. // AQS 继承过来的方法, 方便阅读, 放在此处
  39. private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
  40. final Node node = addWaiter(Node.SHARED);
  41. boolean failed = true;
  42. try {
  43. for (;;) {
  44. final Node p = node.predecessor();
  45. if (p == head) {
  46. // 再次尝试获取许可
  47. int r = tryAcquireShared(arg);
  48. if (r >= 0) {
  49. // 成功后本线程出队(AQS), 所在 Node设置为 head
  50. // 如果 head.waitStatus == Node.SIGNAL ==> 0 成功, 下一个节点 unpark
  51. // 如果 head.waitStatus == 0 ==> Node.PROPAGATE
  52. // r 表示可用资源数, 为 0 则不会继续传播
  53. setHeadAndPropagate(node, r);
  54. p.next = null; // help GC
  55. failed = false;
  56. return;
  57. }
  58. }
  59. // 不成功, 设置上一个节点 waitStatus = Node.SIGNAL, 下轮进入 park 阻塞
  60. if (shouldParkAfterFailedAcquire(p, node) &&
  61. parkAndCheckInterrupt())
  62. throw new InterruptedException();
  63. }
  64. } finally {
  65. if (failed)
  66. cancelAcquire(node);
  67. }
  68. }
  69. // Semaphore 方法, 方便阅读, 放在此处
  70. public void release() {
  71. sync.releaseShared(1);
  72. }
  73. // AQS 继承过来的方法, 方便阅读, 放在此处
  74. public final boolean releaseShared(int arg) {
  75. if (tryReleaseShared(arg)) {
  76. doReleaseShared();
  77. return true;
  78. }
  79. return false;
  80. }
  81. // Sync 继承过来的方法, 方便阅读, 放在此处
  82. protected final boolean tryReleaseShared(int releases) {
  83. for (;;) {
  84. int current = getState();
  85. int next = current + releases;
  86. if (next < current) // overflow
  87. throw new Error("Maximum permit count exceeded");
  88. if (compareAndSetState(current, next))
  89. return true;
  90. }
  91. }
  92. }

6.4 CountdownLatch

CountDownLatch 允许多线程阻塞在一个地方,直至所有线程的任务都执行完毕。在 Java 并发中,CountDownLatch 的概念是一个常见的面试题,所以一定要确保你很好的理解了它。
CountDownLatch 是共享锁的一种实现,它默认构造 AQS 的 state 值为 count。当线程使用 countDown方法时,其实使用了 tryReleaseShared 方法以CAS 的操作来减少 state ,直至 state 为 0 就代表所有的线程都调用了countDown方法。当调用 await 方法的时候,如果 state 不为0,就代表仍然有线程没有调用 countDown 方法,那么就把已经调用过 countDown 的线程都放入阻塞队列 Park ,并自旋 CAS 判断 state == 0,直至最后一个线程调用了 countDown ,使得 state == 0,于是阻塞的线程便判断成功,全部往下执行。

  1. public class CountDownLatch {
  2. private static final class Sync extends AbstractQueuedSynchronizer {
  3. // 版本号
  4. private static final long serialVersionUID = 4982264981922014374L;
  5. // 构造器
  6. Sync(int count) {
  7. setState(count);
  8. }
  9. // 返回当前计数
  10. int getCount() {
  11. return getState();
  12. }
  13. // 试图在共享模式下获取对象状态
  14. protected int tryAcquireShared(int acquires) {
  15. return (getState() == 0) ? 1 : -1;
  16. }
  17. // 试图设置状态来反映共享模式下的一个释放
  18. protected boolean tryReleaseShared(int releases) {
  19. // Decrement count; signal when transition to zero
  20. // 无限循环
  21. for (;;) {
  22. // 获取状态
  23. int c = getState();
  24. if (c == 0) // 没有被线程占有
  25. return false;
  26. // 下一个状态
  27. int nextc = c-1;
  28. if (compareAndSetState(c, nextc)) // 比较并且设置成功
  29. return nextc == 0;
  30. }
  31. }
  32. }
  33. // 同步队列
  34. private final Sync sync;
  35. }

用来进行线程同步协作,等待所有线程完成倒计时。 其中构造参数用来初始化等待计数值,await() 用来等待计数归零,countDown() 用来让计数减一。

  1. @Slf4j(topic = "c.CountDownLatch")
  2. public class Code_16_CountDownLatchTest {
  3. public static void main(String[] args) throws InterruptedException {
  4. method3();
  5. }
  6. public static void method1() throws InterruptedException {
  7. CountDownLatch countDownLatch = new CountDownLatch(3);
  8. new Thread(() -> {
  9. log.info("t1 start ...");
  10. try {
  11. Thread.sleep(1000);
  12. } catch (InterruptedException e) {
  13. e.printStackTrace();
  14. }
  15. log.info("t1 end ...");
  16. countDownLatch.countDown();
  17. }, "t1").start();
  18. new Thread(() -> {
  19. log.info("t2 start ...");
  20. try {
  21. Thread.sleep(2000);
  22. } catch (InterruptedException e) {
  23. e.printStackTrace();
  24. }
  25. log.info("t2 end ...");
  26. countDownLatch.countDown();
  27. }, "t2").start();
  28. new Thread(() -> {
  29. log.info("t3 start ...");
  30. try {
  31. Thread.sleep(1500);
  32. } catch (InterruptedException e) {
  33. e.printStackTrace();
  34. }
  35. log.info("t3 end ...");
  36. countDownLatch.countDown();
  37. }, "t3").start();
  38. log.info("main wait ...");
  39. countDownLatch.await();
  40. log.info("main wait end ...");
  41. }
  42. public static void method2() throws InterruptedException {
  43. CountDownLatch countDownLatch = new CountDownLatch(3);
  44. ExecutorService executorService = Executors.newFixedThreadPool(4);
  45. executorService.submit(() -> {
  46. log.info("t1 start ...");
  47. try {
  48. Thread.sleep(1000);
  49. } catch (InterruptedException e) {
  50. e.printStackTrace();
  51. }
  52. countDownLatch.countDown();
  53. log.info("t1 end ...{}", countDownLatch.getCount());
  54. });
  55. executorService.submit(() -> {
  56. log.info("t2 start ...");
  57. try {
  58. Thread.sleep(2000);
  59. } catch (InterruptedException e) {
  60. e.printStackTrace();
  61. }
  62. log.info("t2 end ...{}", countDownLatch.getCount());
  63. countDownLatch.countDown();
  64. });
  65. executorService.submit(() -> {
  66. log.info("t3 start ...");
  67. try {
  68. Thread.sleep(1500);
  69. } catch (InterruptedException e) {
  70. e.printStackTrace();
  71. }
  72. log.info("t3 end ...{}", countDownLatch.getCount());
  73. countDownLatch.countDown();
  74. });
  75. executorService.submit(() -> {
  76. log.info("main wait ...");
  77. try {
  78. countDownLatch.await();
  79. } catch (InterruptedException e) {
  80. e.printStackTrace();
  81. }
  82. log.info("main wait end ...");
  83. executorService.shutdown();
  84. });
  85. }
  86. public static void method3() throws InterruptedException {
  87. CountDownLatch countDownLatch = new CountDownLatch(10);
  88. ExecutorService executorService = Executors.newFixedThreadPool(10);
  89. String[] all = new String[10];
  90. Random random = new Random();
  91. for(int i = 0; i < 10; i++) {
  92. int id = i;
  93. executorService.submit(() -> {
  94. for (int j = 0; j <= 100; j++) {
  95. try {
  96. Thread.sleep(random.nextInt(100));
  97. } catch (InterruptedException e) {
  98. e.printStackTrace();
  99. }
  100. all[id] = j + "%";
  101. System.out.print("\r" + Arrays.toString(all));
  102. }
  103. countDownLatch.countDown();
  104. });
  105. }
  106. countDownLatch.await();
  107. System.out.println();
  108. System.out.println("游戏开始");
  109. executorService.shutdown();
  110. }
  111. }

微服务中调用各个服务可以用CountDownLatch,每个服务调用完加一行latch.countDown(),主线程latch.await()等待

6.5 cyclicbarrier

循环栅栏,用来进行线程协作,等待线程满足某个计数。构造时设置『计数个数』,每个线程执行到某个需要“同步”的时刻调用 await() 方法进行等待,当等待的线程数满足『计数个数』时,继续执行。跟 CountdownLatch 一样,但这个可以重用。

7.线程安全集合类

📀 JAVA多线程 - 图40


7.1 重点

重点介绍 java.util.concurrent.* 下的线程安全集合类,可以发现它们有规律,里面包含三类关键词: Blocking、CopyOnWrite、Concurrent

  1. Blocking :大部分实现基于锁,并提供用来阻塞的方法
  2. CopyOnWrite 之类容器:修改开销相对较重
  3. Concurrent类型的容器
    1. 内部很多操作使用 cas 优化,一般可以提供较高吞吐量
    2. 弱一致性
    • 遍历时弱一致性,例如,当利用迭代器遍历时,如果容器发生修改,迭代器仍然可以继续进行遍 历,这时内容是旧的(fast-safe机制)
    • 求大小弱一致性,size 操作未必是 100% 准确
    • 读取弱一致性

7.2 CopyOnWriteArrayList

大多数业务场景都是一种“读多写少”的情形,CopyOnWriteArrayList就是为适应这种场景而诞生的。
CopyOnWriteArrayList,运用了一种“写时复制”的思想。通俗的理解就是当我们需要修改(增/删/改)列表中的元素时,不直接进行修改,而是先将列表Copy,然后在新的副本上进行修改,修改完成之后,再将引用从原列表指向新列表。
这样做的好处是读/写是不会冲突的,可以并发进行,读操作还是在原列表,写操作在新列表。仅仅当有多个线程同时进行写操作时,才会进行同步。

内部结构

  1. public class CopyOnWriteArrayList<E>
  2. implements List<E>, RandomAccess, Cloneable, java.io.Serializable {
  3. /**
  4. * 排它锁, 用于同步修改操作
  5. */
  6. final transient ReentrantLock lock = new ReentrantLock();
  7. /**
  8. * 内部数组
  9. */
  10. private transient volatile Object[] array;
  11. }

查询——get方法

  1. public E get(int index) {
  2. return get(getArray(), index);
  3. }
  4. private E get(Object[] a, int index) {
  5. return (E) a[index];
  6. }

添加——add方法

  1. public boolean add(E e) {
  2. final ReentrantLock lock = this.lock;
  3. lock.lock();
  4. try {
  5. Object[] elements = getArray(); // 旧数组
  6. int len = elements.length;
  7. Object[] newElements = Arrays.copyOf(elements, len + 1); // 复制并创建新数组
  8. newElements[len] = e; // 将元素插入到新数组末尾
  9. setArray(newElements); // 内部array引用指向新数组
  10. return true;
  11. } finally {
  12. lock.unlock();
  13. }
  14. }

add方法首先会进行加锁,保证只有一个线程能进行修改;然后会创建一个新数组(大小为n+1),并将原数组的值复制到新数组,新元素插入到新数组的最后;最后,将字段array指向新数组。

迭代
CopyOnWriteArrayList对元素进行迭代时,仅仅返回一个当前内部数组的快照,也就是说,如果此时有其它线程正在修改元素,并不会在迭代中反映出来,因为修改都是在新数组中进行的。

  1. public Iterator<E> iterator() {
  2. return new COWIterator<E>(getArray(), 0);
  3. }
  4. static final class COWIterator<E> implements ListIterator<E> {
  5. /**
  6. * Snapshot of the array
  7. */
  8. private final Object[] snapshot;
  9. /**
  10. * Index of element to be returned by subsequent call to next.
  11. */
  12. private int cursor;
  13. private COWIterator(Object[] elements, int initialCursor) {
  14. cursor = initialCursor;
  15. snapshot = elements;
  16. }
  17. public boolean hasNext() {
  18. return cursor < snapshot.length;
  19. }
  20. public E next() {
  21. if (!hasNext())
  22. throw new NoSuchElementException();
  23. return (E) snapshot[cursor++];
  24. }
  25. // ...
  26. }

1. 内存的使用
由于CopyOnWriteArrayList使用了“写时复制”,所以在进行写操作的时候,内存里会同时存在两个array数组,如果数组内存占用的太大,那么可能会造成频繁GC,所以CopyOnWriteArrayList并不适合大数据量的场景。
2. 数据一致性
CopyOnWriteArrayList只能保证数据的最终一致性,不能保证数据的实时一致性——读操作读到的数据只是一份快照。所以如果希望写入的数据可以立刻被读到,那CopyOnWriteArrayList并不适合。

8.Threadlocal

ThreadLocal为变量在每个线程中都创建了一个副本,那么每个线程可以访问自己内部的副本变量。
在spring MVC中,常用ThreadLocal保存当前登陆用户信息,这样线程在任意地方都可以取到用户信息了。
Threadlocal 主要用来做线程变量的隔离

  1. //存放用户信息的ThreadLocal
  2. private static final ThreadLocal<UserInfo> userInfoThreadLocal = new ThreadLocal<>();
  3. public Response handleRequest(UserInfo userInfo) {
  4. Response response = new Response();
  5. try {
  6. // 1.用户信息set到线程局部变量中
  7. userInfoThreadLocal.set(userInfo);
  8. doHandle();
  9. } finally {
  10. // 3.使用完移除掉
  11. userInfoThreadLocal.remove();
  12. }
  13. return response;
  14. }
  15. //业务逻辑处理
  16. private void doHandle () {
  17. // 2.实际用的时候取出来
  18. UserInfo userInfo = userInfoThreadLocal.get();
  19. //查询用户资产
  20. queryUserAsset(userInfo);
  21. }

📀 JAVA多线程 - 图41
  • 首先我们通过ThreadLocal<UserInfo> userInfoThreadLocal = new ThreadLocal() 初始化了一个Threadlocal 对象,就是上图中说的Threadlocal 引用,这个引用指向堆中的ThreadLocal 对象。
  • **我们知道 Thread 类有个 ThreadLocalMap 成员变量,这个Map key是Threadlocal 对象,value是你要存放的线程局部变量(线程隔离的变量)。**
  • 然后我们调用userInfoThreadLocal.set(userInfo); 这里做了什么事呢?我们把源代码拿出来,看一看就清晰了。我们知道 Thread 类有个 ThreadLocalMap 成员变量,这个Map key是Threadlocal 对象,value是你要存放的线程局部变量。

📀 JAVA多线程 - 图42
这里是在当前线程对象的ThreadlocalMap中put了一个元素(Entry),key是Threadlocal对象,value是userInfo。ThreadLocalMap 类的定义在 Threadlocal中。

Thread 类有个 ThreadlocalMap 属性的成员变量,但是ThreadlocalMap 的定义却在Threadlocal 中,为什么这么做?

  • 就是让使用者知道ThreadLocalMap就只做保存线程局部变量这一件事的。

既然是线程局部变量,那为什么不用线程对象(Thread对象)作为key,这样不是更清晰,直接用线程作为key获取线程变量?

  • 这样设计会有个问题,比如: 我已经把用户信息存在线程变量里了,这个时候需要新增加一个线程变量,比方说新增用户地理位置信息,我们ThreadlocalMap 的key用的是线程,再存一个地理位置信息,key都是同一个线程(key一样),不就把原来的用户信息覆盖了嘛。

那新增地理位置信息应该怎么做?

  • 新创建一个Threadlocal对象就好了,因为ThreadLocalMap的key是Threadlocal 对象,比如新增地理位置,我就再 Threadlocal < Geo> geo = new Threadlocal(), 存放地理位置信息,这样线程的ThreadlocalMap里面会有二个元素,一个是用户信息,一个是地理位置。

ThreadlocalMap 中key是 WeakReference类型,能讲讲Java中有几种类似的引用,什么区别吗?

  • **强引用**是使用最普遍的引用。如果一个对象具有强引用,那垃圾回收器绝不会回收它,当内存空间不足时,Java虚拟机宁愿抛出OutOfMemoryError错误,使程序异常终止,也不会靠随意回收具有强引用的对象来解决内存不足的问题
  • 如果一个对象只具有**软引用**,则内存空间充足时,垃圾回收器就不会回收它;如果内存空间不足了,就会回收这些对象的内存。
  • **弱引用**与**软引用**的区别在于:只具有**弱引用**的对象拥有**更短暂**的**生命周期**。在垃圾回收器线程扫描内存区域时,一旦发现了只具有**弱引用**的对象,不管当前**内存空间足够与否**,都会**回收**它的内存。
  • **虚引用**顾名思义,就是形同虚设。与其他几种引用都不同,虚引用并不会决定对象的生命周期。如果一个对象仅持有虚引用,那么它就和没有任何引用一样,在任何时候都可能被垃圾回收器回收。

为什么ThreadlocalMap 中key 设计成 WeakReference(弱引用)类型吗?

  • 避免内存泄漏

📀 JAVA多线程 - 图43

  1. private static final ThreadLocal<UserInfo> userInfoThreadLocal = new ThreadLocal<>();
  2. userInfoThreadLocal.set(userInfo);
  • 这里的引用关系是userInfoThreadLocal 引用了ThreadLocal对象,这是个强引用,ThreadLocal对象同时也被ThreadlocalMap的key引用,这是个WeakReference引用,我们前面说GC要回收ThreadLocal对象的前提是它只被WeakReference引用,没有任何强引用
  • 可以看到一旦一个对象只被弱引用引用,GC的时候就会回收这个对象。所以只要ThreadLocal对象如果还被 userInfoThreadLocal(强引用) 引用着,GC是不会回收被WeakReference引用的对象的

那既然ThreadLocal对象有强引用,回收不掉,干嘛还要设计成WeakReference类型呢?

  • ThreadLocal的设计者考虑到线程往往生命周期很长,比如经常会用到线程池,线程一直存活着,根据JVM 根搜索算法,一直存在 Thread -> ThreadLocalMap -> Entry(元素)这样一条引用链路,如果key不设计成WeakReference类型,是强引用的话,就一直不会被GC回收,key就一直不会是null,不为null Entry元素就不会被清理(ThreadLocalMap是根据key是否为null来判断是否清理Entry)
  • 所以ThreadLocal的设计者认为只要ThreadLocal 所在的作用域结束了工作被清理了,GC回收的时候就会把key引用对象回收,key置为null,ThreadLocal会尽力保证Entry清理掉来最大可能避免内存泄漏。

那如果Threadlocal 对象一直有强引用,那怎么办?岂不是有内存泄漏风险。

  • 最佳实践是用完手动调用remove函数。

循环打印ABC

  1. public class multiThread {
  2. public static void main(String[] args) throws InterruptedException {
  3. Semaphore a = new Semaphore(1);
  4. Semaphore b = new Semaphore(0);
  5. Semaphore c = new Semaphore(0);
  6. ExecutorService poolService = Executors.newFixedThreadPool(3);
  7. Integer count = 10;
  8. poolService.execute(new Worker(a, b, "A", count));//一开始是信号量a执行acquire(),因为会使信号量减1,所以要初始化为1
  9. poolService.execute(new Worker(b, c, "B", count));
  10. poolService.execute(new Worker(c, a, "C", count));
  11. Thread.sleep(1000);
  12. poolService.shutdownNow();
  13. }
  14. public static class Worker implements Runnable {
  15. private String key;
  16. private Semaphore current;
  17. private Semaphore next;
  18. private Integer count;
  19. public Worker(Semaphore current, Semaphore next, String key, Integer count) {
  20. this.current = current;
  21. this.next = next;
  22. this.key = key;
  23. this.count = count;
  24. }
  25. public void run() {
  26. for(int i = 0; i < count; i++) {
  27. try {
  28. //获取当前的锁
  29. current.acquire(); //current - 1
  30. System.out.println(i+","+key);
  31. //释放next的锁
  32. next.release(); //next + 1
  33. } catch (InterruptedException e) {
  34. e.printStackTrace();
  35. }
  36. }
  37. }
  38. }
  39. }

9.Fork/Join

思想:分而治之。将一个大任务分割成若干小任务,最终汇总每个小任务的结果得到这个大任务的结果。
举例说明
1、我们举个例子:如果要计算一个超大数组的和,最简单的做法是用一个循环在一个线程内完成:
2、还有一种方法,可以把数组拆成两部分,分别计算,最后加起来就是最终结果,这样可以用两个线程并行执行:
3、如果拆成两部分还是很大,我们还可以继续拆,用4个线程并行执行:
这就是Fork/Join任务的原理:判断一个任务是否足够小。如果是,直接计算,否则,就分拆成几个小任务分别计算。这个过程可以反复“裂变”成一系列小任务。
编码实现
整个任务流程如下所示

  • 首先继承任务,覆写任务的执行方法
  • 通过判断阈值,判断该线程是否可以执行
  • 如果不能执行,则将任务继续递归分配,利用fork方法,并行执行
  • 如果是有返回值的,才需要调用join方法,汇集数据。

主要的两个类:

  • RecursiveAction一个递归无结果的ForkJoinTask(没有返回值)
  • RecursiveTask 一个递归有结果的ForkJoinTask(有返回值)

整个流程需要三个类完成
1、ForkJoinPool

  • 既然任务是被逐渐的细化的,那就需要把这些任务存在一个池子里面,这个池子就是ForkJoinPool。
  • 它与其它的ExecutorService区别主要在于它使用工作窃取,那什么是工作窃取呢?
  • 工作窃取:一个大任务会被划分成无数个小任务,这些任务被分配到不同的队列,这些队列有些干活干的块,有些干得慢。于是干得快的,一看自己没任务需要执行了,就去隔壁的队列里面拿去任务执行。

2、ForkJoinTask
ForkJoinTask就是ForkJoinPool里面的每一个任务。他主要有两个子类:RecursiveAction和RecursiveTask。然后通过fork()方法去分配任务执行任务,通过join()方法汇总任务结果。

Fork/Join是一种基于“分治”的算法:通过分解任务,并行执行,最后合并结果得到最终结果。
ForkJoinPool线程池可以把一个大任务分拆成小任务并行执行,任务类必须继承自RecursiveTask或RecursiveAction。
使用Fork/Join模式可以进行并行计算以提高效率。