一,Lock

Lock在juc中是最核心的组件

1.Lock的实现

Lock 本质上是一个接口,它定义了释放锁和获得锁的抽象方法,定义成接口就意味着它定义了锁的一个标准规范,也同时意味着锁的不同实现。实现 Lock 接口的类有很多,以下为几个常见的锁实现。

ReentrantLock:表示重入锁,它是唯一一个实现了 Lock 接口的类。重入锁指的是线程在获得锁之后,再次获取该锁不需要阻塞,而是直接关联一次计数器增加重入次数。

ReentrantReadWriteLock:重入读写锁,它实现了 ReadWriteLock 接口,在这个类中维护了两个锁,一个是 ReadLock,一个是 WriteLock,他们都分别实现了 Lock接口。读写锁是一种适合读多写少的场景下解决线程安全问题的工具,基本原则是: 读和读不互斥、读和写互斥、写和写互斥。也就是说涉及到影响数据变化的操作都会存在互斥。

StampedLock: stampedLock 是 JDK8 引入的新的锁机制,可以简单认为是读写锁的一个改进版本,读写锁虽然通过分离读和写的功能使得读和读之间可以完全并发,但是读和写是有冲突的,如果大量的读线程存在,可能会引起写线程的饥饿。stampedLock 是一种乐观的读策略,使得乐观锁完全不会阻塞写线程。

2.Lock的类关系图

Lock有很多的实现,但是直观的实现是ReentrantLock重入锁。

Lock类关系图.jpg

  1. void lock() // 如果锁可用就获得锁,如果锁不可用就阻塞直到锁释放
  2. void lockInterruptibly() // 和lock()方法相似, 但阻塞的线程 可 中 断 , 抛 出java.lang.InterruptedException 异常
  3. boolean tryLock() // 非阻塞获取锁;尝试获取锁,如果成功返回 true
  4. boolean tryLock(long timeout, TimeUnit timeUnit) //带有超时时间的获取锁方法
  5. void unlock() // 释放锁

二,ReentrantLock

重入锁,表示支持重新进入的锁,也就是说,如果当前线程 t1 通过调用 lock 方法获取了锁之后,再次调用 lock,是不会再阻塞去获取锁的,直接增加重试次数就行了。synchronized 和 ReentrantLock 都是可重入锁。

1.重入锁的设计目的

比如调用 demo 方法获得了当前的对象锁,然后在这个方法中再去调用demo2,demo2 中的存在同一个实例锁,这个时候当前线程会因为无法获得demo2 的对象锁而阻塞,就会产生死锁。重入锁的设计目的是避免线程的死锁。

  1. public class ReentrantDemo {
  2. public synchronized void demo() {
  3. System.out.println("begin:demo");
  4. demo2();
  5. }
  6. public void demo2() {
  7. System.out.println("begin:demo1");
  8. synchronized (this) {
  9. }
  10. }
  11. public static void main(String[] args) {
  12. ReentrantDemo rd = new ReentrantDemo();
  13. new Thread(rd::demo).start();
  14. }
  15. }

2.ReentrantLock使用

  1. public class AtomicDemo {
  2. private static int count = 0;
  3. static Lock lock = new ReentrantLock();
  4. public static void inc() {
  5. lock.lock();
  6. try {
  7. Thread.sleep(1);
  8. count++;
  9. } catch (InterruptedException e) {
  10. e.printStackTrace();
  11. }finally {
  12. lock.unlock();
  13. }
  14. }
  15. public static void main(String[] args) throws InterruptedException {
  16. for (int i = 0; i < 1000; i++) {
  17. new Thread(() -> AtomicDemo.inc()).start();
  18. }
  19. Thread.sleep(3000);
  20. System.out.println("result:" + count);
  21. }
  22. }

3.ReentrantReadWriteLock

以前理解的锁,基本都是排他锁,也就是这些锁在同一时刻只允许一个线程进行访问,而读写所在同一时刻可以允许多个线程访问,但是在写线程访问时,所有的读线程和其他写线程都会被阻塞。读写锁维护了一对锁,一个读锁、一个写锁;一般情况下,读写锁的性能都会比排它锁好,因为大多数场景读是多于写的。在读多于写的情况下,读写锁能够提供比排它锁更好的并发性和吞吐量。

  1. public class LockDemo {
  2. static Map<String, Object> cacheMap = new HashMap<>();
  3. static ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
  4. static Lock read = rwl.readLock();
  5. static Lock write = rwl.writeLock();
  6. public static final Object get(String key) {
  7. System.out.println(" 开始读取数据");
  8. read.lock(); // 读锁
  9. try {
  10. return cacheMap.get(key);
  11. } finally {
  12. read.unlock();
  13. }
  14. }
  15. public static final Object put(String key, Object value) {
  16. System.out.println(" 开始写数据");
  17. write.lock();
  18. try {
  19. return cacheMap.put(key, value);
  20. } finally {
  21. write.unlock();
  22. }
  23. }
  24. }


在这个案例中,通过 hashmap 来模拟了一个内存缓存,然后使用读写锁来保证这个内存缓存的线程安全性。当执行读操作的时候,需要获取读锁,在并发访问的时候,读锁不会被阻塞,因为读操作不会影响执行结果。

在执行写操作时,线程必须要获取写锁,当已经有线程持有写锁的情况下,当前线程会被阻塞,只有当写锁释放以后,其他读写操作才能继续执行。使用读写锁提升读操作的并发性,也保证每次写操作对所有的读写操作的可见性。

读锁与读锁可以共享

读锁与写锁不可以共享(排他)

写锁与写锁不可以共享(排他)

三,ReentrantLock 的实现原理

锁的基本原理是,基于将多线程并行任务通过某一种机制实现线程的串行执行,从而达到线程安全性的目的。在 ReentrantLock 中,在多线程
竞争重入锁时,竞争失败的线程是如何实现阻塞以及被唤醒的呢?

1.AQS是什么?

在 Lock 中,用到了一个同步队列 AQS,全称 AbstractQueuedSynchronizer,它是一个同步工具也是 Lock 用来实现线程同步的核心组件。

2.AQS的两种功能

使用层面上来说,AQS的功能分为两种:独占和共享。

独占锁:每次只能有一个线程持有锁。

共享锁:允许多线程同时获取锁,并发访问共享资源。

3.AQS的内部实现

AQS队列内部维护的是一个FIFO的双向链表,这种结构的特点是每个数据结构都有两个指针,分别指向直接的后继节点和直接的前驱结点。所以双向链表可以从任意一个结点开始很方便的访问前驱和后继。每个Node其实是由线程封装,当线程争抢锁失败后会封装成Node加入到AQS队列中去,当获取锁的线程释放锁之后,会从队列中唤醒一个阻塞的节点(线程)。

FIFO队列.png

4.Node的组成

  1. static final class Node {
  2. /** Marker to indicate a node is waiting in shared mode */
  3. static final Node SHARED = new Node();
  4. /** Marker to indicate a node is waiting in exclusive mode */
  5. static final Node EXCLUSIVE = null;
  6. static final int CANCELLED = 1;
  7. static final int SIGNAL = -1;
  8. static final int CONDITION = -2;
  9. static final int PROPAGATE = -3;
  10. volatile int waitStatus;
  11. volatile Node prev; //前驱结点
  12. volatile Node next; //后继节点
  13. volatile Thread thread; //当前线程
  14. Node nextWaiter; //存储在condition队列中的后继节点
  15. //是否为共享锁
  16. final boolean isShared() {
  17. return nextWaiter == SHARED;
  18. }
  19. final Node predecessor() throws NullPointerException {
  20. Node p = prev;
  21. if (p == null)
  22. throw new NullPointerException();
  23. else
  24. return p;
  25. }
  26. Node() { // Used to establish initial head or SHARED marker
  27. }
  28. Node(Thread thread, Node mode) { // Used by addWaiter
  29. this.nextWaiter = mode;
  30. this.thread = thread;
  31. }
  32. Node(Thread thread, int waitStatus) { // Used by Condition
  33. this.waitStatus = waitStatus;
  34. this.thread = thread;
  35. }
  36. }

5.释放锁以及添加线程对于队列的变化

当出现锁竞争以及释放锁的时候,AQS 同步队列中的节点会发生变化,首先看一下添加节点的场景。

image.png

这里会涉及到两个变化:

  1. 新的线程封装成Node节点追加到同步队列中,设置prev节点以及修改当前节点的前驱节点的next节点指向自己。
  2. 通过CAS的方式将tail重新指向新的尾部节点。

head节点表示获取锁成功的节点,当头结点在释放同步状态的时候,会唤醒后继节点,如果后继节点获得锁成功,会把自己设置为头节点,节点的变化过程如下:

image.png

这个过程也是涉及到两个变化:

  1. 修改head节点指向下一个获得锁的节点
  2. 新的获得锁的节点,将prev的指针指向null

设置head节点不需要用CAS,原因是设置head节点是由获得锁的线程来完成的,而同步锁只能由一个线程来获得,所以不需要CAS保证,只需要把head节点设置为原首节点的后继节点,并且断开原head节点的next引用即可。
AQS.png

四,ReentrantLock 的源码分析

1.加锁逻辑

ReentrantLock.lock

  1. public void lock() {
  2. sync.lock();
  3. }

sync 实际上是一个抽象的静态内部类,它继承了 AQS 来实现重入锁的逻辑,前面说过 AQS 是一个同步队列,它能够实现线程的阻塞以及唤醒,但它并不具备业务功能,所以在不同的同步场景中,会继承 AQS 来实现对应场景的功能。

Sync 有两个具体的实现类,分别是:

NofairSync:表示可以存在抢占锁的功能,也就是说不管当前队列上是否存在其他线程等待,新线程都有机会抢占锁。

FailSync: 表示所有线程严格按照 FIFO 来获取锁。

NonfairSync.lock

  1. 非公平锁和公平锁最大的区别在于:非公平锁中抢占锁的逻辑是,不管有没有线程排队,先上来cas去抢占一下
  2. cas成功,就表示成功获得了锁
  3. cas失败,调用acquire(1)走锁竞争逻辑

state 是 AQS 中的一个属性,它在不同的实现中所表达的含义不一样,对于重入锁的实现来说,表示一个同步状态。它有两个含义的表示:

  1. 当 state=0 时,表示无锁状态。

  2. 当 state>0 时,表示已经有线程获得了锁,也就是 state=1,但是因为ReentrantLock 允许重入,所以同一个线程多次获得同步锁的时候,state 会递增,比如重入 5 次,那么 state=5。 而在释放锁的时候,同样需要释放 5 次直到 state=0其他线程才有资格获得锁。

  1. final void lock() {
  2. //如果cas的方式修改state状态成功,说明获取到了锁,直接设置当前持有锁的线程是当前线程
  3. if (compareAndSetState(0, 1))
  4. setExclusiveOwnerThread(Thread.currentThread());
  5. else //cas失败,走抢占锁的逻辑
  6. acquire(1);
  7. }

AQS.acquire

acquire是AQS中的方法,如果cas操作未能成功,说明state已经不为0,此时继续acquire(1)操作。

这个方法的主要逻辑:

  1. 通过 tryAcquire 尝试获取独占锁,如果成功返回 true,失败返回 false.
  2. 如果 tryAcquire 失败,则会通过 addWaiter 方法将当前线程封装成 Node 添加到 AQS 队列尾部.
  3. acquireQueued,将 Node 作为参数,通过自旋去尝试获取锁。
    1. public final void acquire(int arg) {
    2. //如果尝试去抢占锁失败 就讲当前线尾插进入阻塞队列,让当前现成的node自旋的方式去尝试获取锁
    3. if (!tryAcquire(arg) &&
    4. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    5. //响应中断逻辑,表示如果当前线程在 acquireQueued 中被中断过,则需要产生一个中断请求,
    6. //原因是线程在调用 acquireQueued 方法的时候是不会响应中断请求的。
    7. selfInterrupt();
    8. }

    AQS.selfInterrupt

表示如果当前线程在 acquireQueued 中被中断过,则需要产生一个中断请求,原因是线程在调用 acquireQueued 方法的时候是不会响应中断请求的。

  1. //执行中断逻辑
  2. static void selfInterrupt() {
  3. Thread.currentThread().interrupt();
  4. }

AQS.tryAcquire

这里采用模板模式,具体的逻辑交给继承该类的子类实现,直接看非公平锁的逻辑。

  1. protected boolean tryAcquire(int arg) {
  2. throw new UnsupportedOperationException();
  3. }

NonfairSync.tryAcquire

这个方法的作用是尝试获取锁,如果成功返回true,不成功返回false。

它是重写AQS类中的tryAcquire方法,AQS中的tryAcquire()的定义,并没有实现,而是抛出异常。

  1. protected final boolean tryAcquire(int acquires) {
  2. return nonfairTryAcquire(acquires);
  3. }

ReentrantLock.nofairTryAcquire

  1. 获取当前线程,判断当前的锁的状态
  2. 如果state=0 表示当前是无锁状态,通过cas更新state状态的值
  3. 当前线程是属于重入,则增加重入次数
    1. final boolean nonfairTryAcquire(int acquires) {
    2. //获取当前线程
    3. final Thread current = Thread.currentThread();
    4. //获取当前的state值
    5. int c = getState();
    6. //如果此时没有线程持有锁
    7. if (c == 0) {
    8. //cas去获取一次锁
    9. if (compareAndSetState(0, acquires)) {
    10. //走到这里说明cas的方式获取到了锁,直接设置当前锁的线程是当前线程,并返回true
    11. setExclusiveOwnerThread(current);
    12. return true;
    13. }
    14. }
    15. //走到这里说明 当前锁被其他线程持有 或者 当前cas失败,存在竞争
    16. else if (current == getExclusiveOwnerThread()) { //如果持有锁的线程是当前线程自己
    17. //执行到这里说明是一次重入锁的逻辑,直接在原有状态上+就可以
    18. int nextc = c + acquires;
    19. if (nextc < 0) // overflow
    20. throw new Error("Maximum lock count exceeded");
    21. setState(nextc);
    22. return true;
    23. }
    24. //什么情况下会走到这里?
    25. //cas失败,锁被其他线程持有,
    26. //state》0
    27. return false;
    28. }

    AQS.addWaiter

当 tryAcquire 方法获取锁失败以后,则会先调用 addWaiter 将当前线程封装成Node.

入参 mode 表示当前节点的状态,传递的参数是 Node.EXCLUSIVE,表示独占状态。意味着重入锁用到了 AQS 的独占锁功能。

  1. 将当前线程封装成 Node

  2. 当前链表中的 tail 节点是否为空,如果不为空,则通过 cas 操作把当前线程的node 添加到 AQS 队列

  3. 如果为空或者 cas 失败,调用 enq 将节点添加到 AQS 队列

    1. private Node addWaiter(Node mode) {
    2. //构建一个新的节点
    3. Node node = new Node(Thread.currentThread(), mode);
    4. // Try the fast path of enq; backup to full enq on failure
    5. Node pred = tail;
    6. //尾结点不为空,说明队列已经初始化过了
    7. if (pred != null) {
    8. //将当前节点的前驱节点指向为尾结点
    9. node.prev = pred;
    10. //cas的方式设置尾结点
    11. if (compareAndSetTail(pred, node)) {
    12. //cas成功,让尾结点的后继指针指向node
    13. pred.next = node;
    14. return node;
    15. }
    16. }
    17. //执行到这里 说明队列尚未初始化, 或者cas的方式设置尾结点失败,说明有其他线程先一步入队了。
    18. enq(node);
    19. return node;
    20. }

    AQS.enq

    enq 就是通过自旋操作把当前节点加入到队列中。
    AQS-独占模式-情景分析.png

    1. private Node enq(final Node node) {
    2. //自旋:
    3. for (;;) {
    4. Node t = tail;
    5. if (t == null) { // 尾结点为空,说明需要初始化
    6. if (compareAndSetHead(new Node()))
    7. tail = head;
    8. //注意,这里初始化完之后并没有直接退出,而是继续往下走,为啥呢?
    9. //因为当前执行的是初始化操作,那就说明,当前这个抢占锁失败的线程需要给当前持有锁的线程补充一个头节点
    10. } else {//尾结点不为空
    11. //让当前节点的前驱指针指向尾结点
    12. node.prev = t;
    13. //cas的方式设置尾结点
    14. if (compareAndSetTail(t, node)) {
    15. //让尾结点的后继指针指向当前节点
    16. t.next = node;
    17. return t;
    18. }
    19. }
    20. }
    21. }

    enq.jpg

    AQS .acquireQueued

通过 addWaiter 方法把线程添加到链表后,会接着把 Node 作为参数传递给acquireQueued 方法,去竞争锁

  1. 获取当前节点的 prev 节点

  2. 如果 prev 节点为 head 节点,那么它就有资格去争抢锁,调用 tryAcquire 抢占锁

  3. 抢占锁成功以后,把获得锁的节点设置为 head,并且移除原来的初始化 head节点

  4. 如果获得锁失败,则根据 waitStatus 决定是否需要挂起线程

  5. 最后,通过 cancelAcquire 取消获得锁的操作

    1. final boolean acquireQueued(final Node node, int arg) {
    2. boolean failed = true;
    3. try {
    4. boolean interrupted = false;
    5. for (;;) {
    6. //获取当前节点的前驱节点
    7. final Node p = node.predecessor();
    8. //如果当前节点的前驱节点是头节点,说明当前节点拥有了竞争锁的权利,尝试cas的方式去获取一次锁
    9. if (p == head && tryAcquire(arg)) {
    10. //执行到这里说名cas获取到了锁
    11. //设置头节点为当前节点,这里为啥不用加锁,因为设置头节点的线程是持有锁的线程,独占模式下,只有一个线程持有锁,所以设置头节点不存在线程安全问题
    12. setHead(node);
    13. //让p的后继指针指向null 帮助垃圾回收
    14. p.next = null; // help GC
    15. //声明获取锁成功
    16. failed = false;
    17. //当前线程不需要被中断
    18. return interrupted;
    19. }
    20. //当前持有锁的线程可能还没有释放锁,所以当前线程尝试获取锁会失败,这个时候当前线程应该park
    21. if (shouldParkAfterFailedAcquire(p, node) &&
    22. parkAndCheckInterrupt())
    23. //这里比较友好,仅仅是把中断标识位改成true,如果是响应中断逻辑,直接这里抛异常了,然后走cancelAcquire(node)的逻辑。
    24. interrupted = true;
    25. }
    26. } finally {
    27. if (failed)
    28. cancelAcquire(node);
    29. }
    30. }

    acquireQueue.jpg

    AQS.shouldParkAfterFailedAcquire

如果 ThreadA 的锁还没有释放的情况下,ThreadB 和 ThreadC 来争抢锁肯定是会失败,那么失败以后会调用 shouldParkAfterFailedAcquire 方法

Node 有 5 种状态,分别是:CANCELLED(1),SIGNAL(-1)、CONDITION(-2)、PROPAGATE(-3)、默认状态(0)

  • CANCELLED: 在同步队列中等待的线程等待超时或被中断,需要从同步队列中取消该 Node 的结点, 其结点的 waitStatus 为 CANCELLED,即结束状态,进入该状态后的结点将不会再变化

  • SIGNAL: 只要前置节点释放锁,就会通知标识为 SIGNAL 状态的后续节点的线程

  • CONDITION: 和 Condition 有关系

  • PROPAGATE:共享模式下,PROPAGATE 状态的线程处于可运行状态

  • 0:初始状态

这个方法的主要作用是,通过 Node 的状态来判断,ThreadA 竞争锁失败以后是否应该被挂起。

  1. 如果 ThreadA 的 pred 节点状态为 SIGNAL,那就表示可以放心挂起当前线程

  2. 通过循环扫描链表把 CANCELLED 状态的节点移除

  3. 修改 pred 节点的状态为 SIGNAL,返回 false.

返回 false 时,也就是不需要挂起,返回 true,则需要调用 parkAndCheckInterrupt挂起当前线程

  1. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  2. //前驱节点的等待状态
  3. int ws = pred.waitStatus;
  4. //如果前置节点为 SIGNAL,意味着只需要等待其他前置节点的线程被释放
  5. if (ws == Node.SIGNAL)
  6. return true;
  7. //ws 大于 0,意味着 prev 节点取消了排队,直接移除这个节点就行
  8. if (ws > 0) {
  9. do {//相当于: pred=pred.prev; node.prev=pred;
  10. node.prev = pred = pred.prev;
  11. } while (pred.waitStatus > 0);//这里采用循环,从双向列表中移除 CANCELLED 的节点
  12. pred.next = node;
  13. } else {
  14. //利用 cas 设置 prev 节点的状态为 SIGNAL(-1)
  15. compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
  16. }
  17. return false;
  18. }

AQS.parkAndCheckInterrupt

使用 LockSupport.park 挂起当前线程变成 WATING 状态

Thread.interrupted,返回当前线程是否被其他线程触发过中断请求,也就是thread.interrupt(); 如果有触发过中断请求,那么这个方法会返回当前的中断标识true,并且对中断标识进行复位标识已经响应过了中断请求。如果返回 true,意味着在 acquire 方法中会执行 selfInterrupt()。

  1. private final boolean parkAndCheckInterrupt() {
  2. LockSupport.park(this);
  3. return Thread.interrupted();
  4. }

AQS.cancelAcquire

  1. private void cancelAcquire(Node node) {
  2. //如果节点不存在,直接返回
  3. if (node == null)
  4. return;
  5. //将当前节点的线程设置为空
  6. node.thread = null;
  7. //获取当前节点的前驱节点
  8. Node pred = node.prev;
  9. //循环判断:如果前驱节点是一个取消状态,那就继续往前找
  10. while (pred.waitStatus > 0)
  11. node.prev = pred = pred.prev;
  12. //可能是当前节点,可能是ws>0的节点
  13. Node predNext = pred.next;
  14. //设置当前节点的waitStatus是取消状态
  15. node.waitStatus = Node.CANCELLED;
  16. /**
  17. * 当前取消排队的node所在 队列的位置不同,执行的出队策略是不一样的,一共分为三种情况:
  18. * 1.当前node是队尾 tail -> node
  19. * 2.当前node 不是 head.next 节点,也不是 tail
  20. * 3.当前node 是 head.next节点。
  21. */
  22. //尾结点且cas tail成功
  23. if (node == tail && compareAndSetTail(node, pred)) {
  24. //pred.next = null node出队
  25. compareAndSetNext(pred, predNext, null);
  26. } else {
  27. int ws;
  28. /*
  29. node不是head.next node 不是tail
  30. node的前驱节点是取消状态 或 假设前驱状态是 <= 0 则设置前驱状态为 Signal状态..表示要唤醒后继节点。
  31. if里面要做的就是让node 的prev 跨过node 直接指向 node.next
  32. */
  33. if (pred != head &&
  34. ((ws = pred.waitStatus) == Node.SIGNAL ||
  35. (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
  36. pred.thread != null) {
  37. Node next = node.next;
  38. if (next != null && next.waitStatus <= 0)
  39. compareAndSetNext(pred, predNext, next);
  40. } else {
  41. //node是head.next && node !=tail
  42. //那就跨过node head.next = node.next node.next.prev=head
  43. unparkSuccessor(node);
  44. }
  45. node.next = node; // help GC
  46. }
  47. }

2.释放锁的逻辑

如果这个时候 ThreadA 释放锁了,那么来看锁被释放后会产生什么效果?

ReentrantLock.unlock

  1. public void unlock() {
  2. //执行sync的release方法,默认是非公平锁,所以看非公平锁的逻辑
  3. sync.release(1);
  4. }

AQS.release

  1. public final boolean release(int arg) {
  2. //如果尝试释放锁成功
  3. if (tryRelease(arg)) {
  4. //如果头结点不为空,且头节点状态不等于0
  5. Node h = head;
  6. if (h != null && h.waitStatus != 0)
  7. //唤醒后继线程
  8. unparkSuccessor(h);
  9. return true;
  10. }
  11. return false;
  12. }

ReentrantLock.tryRelease

这个方法可以认为是一个设置锁状态的操作,通过将 state 状态减掉传入的参数值(参数是 1),如果结果状态为 0,就将排它锁的 Owner 设置为 null,以使得其它的线程有机会进行执行。

在排它锁中,加锁的时候状态会增加 1(当然可以自己修改这个值),在解锁的时候减掉 1,同一个锁,在可以重入后,可能会被叠加为 2、3、4 这些值,只有 unlock()的次数与 lock()的次数对应才会将 Owner 线程设置为空,而且也只有这种情况下才会返回 true。

  1. protected final boolean tryRelease(int releases) {
  2. //将当前值与传入的值进行相减
  3. int c = getState() - releases;
  4. //如果当前线程不是线程拥有者,抛异常
  5. if (Thread.currentThread() != getExclusiveOwnerThread())
  6. throw new IllegalMonitorStateException();
  7. boolean free = false;
  8. if (c == 0) {//重入锁已经完全释放
  9. free = true;
  10. setExclusiveOwnerThread(null);//设置当前线程的拥有者为null
  11. }
  12. setState(c);//修改state状态
  13. return free;
  14. }

AQS.unparkSuccessor

  1. private void unparkSuccessor(Node node) {
  2. //获取头节点的状态
  3. int ws = node.waitStatus;
  4. if (ws < 0)//如果头结点的状态小于0
  5. //修改头节点的状态为0
  6. compareAndSetWaitStatus(node, ws, 0);
  7. //获取头节点的下一个节点
  8. Node s = node.next;
  9. /如果下一个节点为 null 或者 status>0 表示 cancelled 状态
  10. if (s == null || s.waitStatus > 0) {
  11. s = null;
  12. //通过从尾部节点开始扫描,找到距离 head 最近的一个waitStatus<=0 的节点
  13. for (Node t = tail; t != null && t != node; t = t.prev)
  14. if (t.waitStatus <= 0)
  15. s = t;
  16. }
  17. if (s != null)//唤醒这个节点
  18. LockSupport.unpark(s.thread);
  19. }

通过锁的释放,原本的结构就发生了一些变化。head 节点的 waitStatus 变成了 0,ThreadB 被唤醒。

unparkSuccessor.jpg

为什么释放锁的时候是从尾结点开始扫描?

再回到 enq那个方法。看一个新的节点是如何加入到链表中的

  1. 将新的节点的 prev 指向 tail。

  2. 通过 cas 将 tail 设置为新的节点,因为 cas 是原子操作所以能够保证线程安全性。

  3. t.next=node;设置原 tail 的 next 节点指向新的节点。

唤醒.jpg

在 cas 操作之后,t.next=node 操作之前。 存在其他线程调用 unlock 方法从 head开始往后遍历,由于 t.next=node 还没执行意味着链表的关系还没有建立完整。就会导致遍历到 t 节点的时候被中断。所以从后往前遍历,一定不会存在这个问题。

原本被挂起的线程在被唤醒后继续执行

通过 ReentrantLock.unlock,原本挂起的线程被唤醒以后继续执行,原来被挂起的线程是在 acquireQueued 方法中,所以被唤醒以后继续从这个方法开始执行。

由于 ThreadB 的 prev 节点指向的是 head,并且 ThreadA 已经释放了锁。所以这个时候调用 tryAcquire 方法时,可以顺利获取到锁。

  1. 把 ThreadB 节点当成 head。

  2. 把原 head 节点的 next 节点指向为 null。

tryAcquire.jpg

3.公平锁和非公平锁的区别

锁的公平性是相对于获取锁的顺序而言的,如果是一个公平锁,那么锁的获取顺序就应该符合请求的绝对时间顺序,也就是 FIFO。 在上面分析的例子来说,只要CAS 设置同步状态成功,则表示当前线程获取了锁,而公平锁则不一样,差异点有两个:

FairSync.tryAcquire

  1. final void lock() {
  2. acquire(1);
  3. }


非公平锁在获取锁的时候,会先通过 CAS 进行抢占,而公平锁则不会。

FairSync .tryAcquire

  1. protected final boolean tryAcquire(int acquires) {
  2. //获取当前线程
  3. final Thread current = Thread.currentThread();
  4. //获取当前state状态
  5. int c = getState();
  6. if (c == 0) {//状态为0,说明此时没有现成持有锁
  7. //如果队列里面没有节点 且cas的方式获取锁成功
  8. if (!hasQueuedPredecessors() &&
  9. compareAndSetState(0, acquires)) {
  10. //设置持有锁的线程我当前线程
  11. setExclusiveOwnerThread(current);
  12. return true;
  13. }
  14. }
  15. //如果当前线程是获取锁的线程
  16. else if (current == getExclusiveOwnerThread()) {
  17. //锁重入逻辑
  18. int nextc = c + acquires;
  19. if (nextc < 0)
  20. throw new Error("Maximum lock count exceeded");
  21. setState(nextc);
  22. return true;
  23. }
  24. /*
  25. 返回false的情况:
  26. 1. 当前锁被其他线程持有
  27. 2. 队列里面有节点或cas争抢锁失败
  28. */
  29. return false;
  30. }

这个方法与 nonfairTryAcquire(int acquires)比较,不同的地方在于判断条件多了hasQueuedPredecessors()方法,也就是加入了[同步队列中当前节点是否有前驱节点]的判断,如果该方法返回 true,则表示有线程比当前线程更早地请求获取锁,因此需要等待前驱线程获取并释放锁之后才能继续获取锁。

hasQueuedPredecessors

  1. //首先什么时候返回false?返回false代表当前队列没有线程或者当前队列头节点的下一个线程就是当前线程
  2. //1.队列为空
  3. //2.头节点的下一个节点的线程是当前线程
  4. public final boolean hasQueuedPredecessors() {
  5. Node t = tail;
  6. Node h = head;
  7. Node s;
  8. //h!=t:头节点不是尾结点,说明队列里面还有其他的节点
  9. //(头节点的下一个节点为空||头节点的后继节点对应的线程不是当前线程)
  10. //头节点的下一个节点的线程正在拿到锁 或者 头节点的下一个节点的线程不是当前线程
  11. return h != t &&
  12. ((s = h.next) == null || s.thread != Thread.currentThread());
  13. }

五,Condition 条件队列

对比synchronized管程模型:
管程模型(monitor).png
Condition原理图:

AQS-条件队列.png

1.AQS.ConditionObject

  1. public class ConditionObject implements Condition, java.io.Serializable {
  2. private static final long serialVersionUID = 1173984872572414699L;
  3. //第一个等待的节点
  4. private transient Node firstWaiter;
  5. //最后一个等待的节点
  6. private transient Node lastWaiter;
  7. public ConditionObject() { }

2.Condition接口

  1. public interface Condition {
  2. void await() throws InterruptedException;
  3. void awaitUninterruptibly();
  4. long awaitNanos(long nanosTimeout) throws InterruptedException;
  5. boolean await(long time, TimeUnit unit) throws InterruptedException;
  6. boolean awaitUntil(Date deadline) throws InterruptedException;
  7. void signal();
  8. void signalAll();
  9. }

3.newCondition()

创建一个等待条件

  1. public Condition newCondition() {
  2. return sync.newCondition();
  3. }
  1. final ConditionObject newCondition() {
  2. return new ConditionObject();
  3. }

实际上这里new了一个AQS的ConditionObject。

4.AQS.await

这个方法的逻辑就是先进行边界判断,如果当前线程已经被中断,就抛出中断异常。否则将当前线程封装成节点加入到条件队列,并释放全部锁资源。
接下来判断是否当前持有锁的线程执行了当前线程的signal方法:

  • 如果没有执行:当前线程就还是在条件队列,此时应当将当前线程挂起
  • 如果执行了:当前线程就迁移到了阻塞队列,就走竞争锁的逻辑….

    1. public final void await() throws InterruptedException {
    2. //如果当前线程已经被挂起,直接抛出中断异常
    3. if (Thread.interrupted())
    4. throw new InterruptedException();
    5. //将当前线程封装成节点加入到条件队列
    6. Node node = addConditionWaiter();
    7. //释放线程的全部锁资源
    8. //为什么这里要释放掉全部的锁?
    9. //如果此时线程不释放锁资源,其他线程没办法拿到锁触发当前线程的唤醒机制
    10. int savedState = fullyRelease(node);
    11. //中断状态:
    12. //-1:当前线程在条件队列接收到了中断信号
    13. //0:当前线程在条件队列没有接收到中断信号
    14. //1:当前线程在条件队列没有接受到中断信号,但是在阻塞队列接收到了中断信号
    15. int interruptMode = 0;
    16. //如果当前线程不在阻塞队列
    17. while (!isOnSyncQueue(node)) {
    18. //挂起当前线程
    19. LockSupport.park(this);
    20. //判断当前线程在等待期间是否被中断过,无论是否发生中断,最终都会进入到阻塞队列
    21. //什么时候会被唤醒?都有几种情况?
    22. //1.常规:外部线程获取到lock之后,调用了signal方法,转移条件队列的头节点到阻塞队列,当这个节点获取到锁之后,会唤醒
    23. //2.转移至阻塞队列后,发现阻塞队列的前驱节点状态是取消状态,此时会唤醒当前节点
    24. //3.当前节点挂起期间,被外部线程使用中断唤醒...
    25. if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
    26. break;
    27. }
    28. //来到这里说明当前线程已经被其他线程调用了signal方法,加入到了阻塞队列
    29. //如果当前线程在竞争锁的过程中被中断过 && 中断标识位不是 throw_ie
    30. if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
    31. //将当前线程的中断标识位设置为重新中断
    32. interruptMode = REINTERRUPT;
    33. if (node.nextWaiter != null)
    34. unlinkCancelledWaiters();
    35. if (interruptMode != 0)
    36. reportInterruptAfterWait(interruptMode);
    37. }

    5.Condition.addConditionWaiter

    线程进入条件队列挂起逻辑:

  • 如果队列有元素,但是元素的状态不对,做一次全队的清理,将所有取消状态的节点干掉,并重新获取尾结点

  • 将线程封装成node节点,判断当前条件队列是否有元素
  • 如果没有元素,就将当前node设置为头节点
  • 如果有元素,就将当前尾结点的next指针指向node
  • 最终设置当前节点为尾节点,并返回当前线程的node。

    1. private Node addConditionWaiter() {
    2. //获取尾结点的引用
    3. Node t = lastWaiter;
    4. //条件一:尾结点!=空
    5. //条件二:尾结点的状态!=-2,说明被中断了
    6. if (t != null && t.waitStatus != Node.CONDITION) {
    7. //做一次全队列的垃圾节点清理
    8. unlinkCancelledWaiters();
    9. //重新获取尾结点的引用,因为在刚才的清理中,尾结点可能换成新的了
    10. t = lastWaiter;
    11. }
    12. //封装成node
    13. Node node = new Node(Thread.currentThread(), Node.CONDITION);
    14. //如果尾结点为空,说明队列为空,当前节点是进入队列的第一个元素
    15. if (t == null)
    16. firstWaiter = node;
    17. //队列不为空
    18. else
    19. t.nextWaiter = node;
    20. lastWaiter = node;
    21. //返回当前线程封装成的节点
    22. return node;
    23. }

    6.AQS.fullyRelease

    释放当前节点线程的全部锁资源

  • 正常情况下是会释放成功的,但是可能存在未持有锁的线程调用await方法,错误写法

  • 这个时候,就会将当前线程对应的node状态修改为取消状态,后继线程在假如队列的时候,就会把取消状态的线程清理出去
  • 如果成功释放了锁,会返回当前线程释放的state值,为什么要返回?
  • 因为当前节点迁移到阻塞队列以后,再次被唤醒,并且当前节点在队列中是头节点的下一个节点而且state状态=0表示无锁,这个时候说明当前节点要去竞争锁,此时要将node的state设置为savedStated

    1. final int fullyRelease(Node node) {
    2. /**
    3. * 完全释放锁是否成功
    4. * 当failed失败的时候,说明当前线程是未持有锁调用 await 方法的线程... 错误写法
    5. * 假设失败,在finally代码块中 会将刚刚加入到条件队列的 当前线程对应的node状态 修改为 取消状态
    6. * 后继线程就会将取消状态的节点给清理出去...
    7. */
    8. boolean failed = true;
    9. try {
    10. //获取当前线程持有的state值总数
    11. int savedState = getState();
    12. //绝大部分情况下:release这里会返回true
    13. if (release(savedState)) {
    14. //失败标记设置为false
    15. failed = false;
    16. /**
    17. * 返回当前线程释放的state值
    18. * 为什么要返回?
    19. * 因为当节点被迁移到阻塞队列以后,再次被唤醒,且当前node在阻塞队列中是head.next 而且
    20. * 当前lock状态是state=0 的情况下 当前node 可以获取到锁 此时需要将state 设置为 savedState
    21. */
    22. return savedState;
    23. } else {
    24. throw new IllegalMonitorStateException();
    25. }
    26. } finally {
    27. //如果失败了,就把当前节点状态改成取消
    28. if (failed)
    29. node.waitStatus = Node.CANCELLED;
    30. }
    31. }

    此时,同步队列会触发锁的释放和重新竞争。ThreadB 获得了锁。
    1.jpg

    7.isOnSyncQueue

    判断节点是否在阻塞队列

  • 第一个if判断:

  • 节点状态==-2说明当前节点一定在条件队列
  • 当前节点的前驱节点为空:首先分析前置条件 node 的状态肯定不是 -2 ,可能=0或者1,等于1就说明当前节点是未持有锁的await,最终状态会被设置为取消,等于0就说明当前节点已经被持有锁的线程唤醒,但是唤醒之后是先修改状态在入队,所以前驱节点为空,说明此时是当前节点已经被唤醒,但是还未进入到阻塞队列。
  • 执行到第二个if,当前节点的下一个节点不为空,说明一定在同步队列,为啥那?
  • 执行到这里,可以排除掉取消状态的节点,也就是说node的状态一定不等于1,为啥?取消状态的节点不会被加入到阻塞队列
  • 入队的两个条件:当前节点的前驱节点设置为tail,然后cas设置当前节点为tail节点,两个条件都成功了,才是真正进入到了阻塞队列
  • 所以说,就算前驱节点不等于空,也不是一定就是进入了阻塞队列。可能在过程中。
  • 剩下最后一种情况就是当前这个node节点可能进入到阻塞队列了,但是还不是尾结点,这个时候就需要从后往前便利找到他才行,找到了就返回true,找不到说明当前这个节点正在迁移中,返回false。

    1. //判断节点是否在阻塞队列
    2. final boolean isOnSyncQueue(Node node) {
    3. /**
    4. * 条件1:node.waitStatus==node.condition
    5. * 条件成立说明当前node一定是在条件队列,因为signal方法迁移节点到阻塞队列前,会将node的状态设置为0
    6. * 条件2:前置条件:node.waitStatus != Node.CONDITION ====>
    7. * node.waitStatus ==0 : 表示当前节点已经被signal
    8. * node.waitStatus ==1 : 当前线程是未持有锁调用await 最终这个节点将被取消
    9. * node.waitStatus ==0 为什么还要判断 node.prev == null ?
    10. * 因为signal方法是先修改状态在迁移
    11. */
    12. if (node.waitStatus == Node.CONDITION || node.prev == null)
    13. return false;
    14. /**
    15. * 执行到这里,
    16. * node.waitStatus !=Condition 且 node.prev!=null ====> 可以排除掉 node.waitStatus ==1 取消状态
    17. * 为什么可以排除掉取消状态?因为signal方法是不会吧取消状态的节点迁移走的
    18. * 设置prev引用的逻辑是迁移阻塞队列 逻辑的设置的 eng方法
    19. * 入队的逻辑:1.设置node.prev =tail
    20. * 2.cas当前node为 阻塞队列的tail节点成功才算是真正的进入到阻塞队列
    21. * 可以推算出,就算是prev不是null,也不能说明当前node已经成功入队到阻塞队列了。
    22. */
    23. if (node.next != null)
    24. return true;
    25. //执行到这里从阻塞队列的尾巴开始向前遍历查找node,找到了就返回true,找不到返回false,
    26. //当前node有可能正在迁移中,还未完成。
    27. return findNodeFromTail(node);
    28. }

    8.signal

  • 首先当前线程持有的必须是独占锁,否则没有必要进入到阻塞队列,所以如果不是独占锁,会抛出异常。

  • 获取到条件队列的头节点,如果头节点不为空,就说明条件队列里面有节点,去唤醒条件队列的第一个节点。

    1. public final void signal() {
    2. //判断当前线程是不是持有的独占锁,如果不是,直接抛出异常
    3. if (!isHeldExclusively())
    4. throw new IllegalMonitorStateException();
    5. //获取条件队列的首节点
    6. Node first = firstWaiter;
    7. //如果首节点不为空
    8. if (first != null)
    9. doSignal(first);//唤醒条件队列的首节点线程
    10. }

    9.doSignal

  • 如果头节点的下一个节点为空,说明头节点出队之后,头节点,尾结点都会是空,也就是条件队列没有元素了。

  • 断开当前头节点的next
  • 调用transferForSignal方法去迁移头节点到阻塞队列,如果成功返回true,失败返回false。
  • 循环的条件是:如果当前头节点迁移失败了,那就将头节点设置为当前头节点的下一个节点继续尝试,直到迁移成功或者头节点为空。

    1. private void doSignal(Node first) {
    2. do {
    3. //当前first马上要出队了,所以更新firstWaiter为当前节点的下一个节点
    4. //如果当前节点的下一个节点是null,说明条件队列只有当前一个节点了...当前出队后,整个队列就空了
    5. //所以需要更新lastWaiter==null
    6. if((firstWaiter=first.nextWaiter)==null) lastWaiter=null;
    7. //当前节点即将出队列,断开和下一个节点的关系
    8. first.nextWaiter = null;
    9. /**
    10. * transferForSignal(first) 返回true 当前first节点迁移到阻塞队列成功 false 迁移失败
    11. * while循环:(first =firstWaiter)!=null 当前first 迁移失败 ,则将first更新为first.next 继续尝试迁移
    12. * 直至迁移某个节点成功,或者条件队列为 null 为止。
    13. */
    14. } while (!transferForSignal(first)&&(first =firstWaiter)!=null);
    15. }

    10.transferForSignal

    接收到signal信号以后,把节点转入等待队列

  • 首先上来先cas将node的状态从-2设置为0,设置成功才继续往下,那么什么时候会设置失败呢?

  • 当前节点是取消状态,或者当前节点的线程在挂起期间被其他线程使用中断信号唤醒过,这个时候节点会进入到阻塞队列,这个时候节点的状态也会修改成0
  • 使用enq方法添加当前节点到阻塞队列,并返回当前节点的前驱节点
  • 如果前驱节点的状态大于0就说明前驱结点的状态是取消状态,
  • 第二个条件的前置条件就是前驱节点状态小于0,cas设置前驱节点为signal,
  • 如果条件一或者条件二成立一个,那就唤醒当前节点的线程
    1. //接收到signal信号后,把节点转入等待队列
    2. final boolean transferForSignal(Node node) {
    3. /**
    4. * cas修改当前节点的状态,修改为0,因为当前节点马上要迁移到阻塞队列了
    5. * 成功:当前节点在条件队列中状态正常
    6. * 失败:
    7. * 1.取消状态 (线程await时,未持有锁,最终线程对应的node将会被设置为取消状态)
    8. * 2.node对应的线程 挂起期间,被其他线程使用中断信号唤醒过...(就会主动进入到阻塞队列,这时
    9. * 也会修改状态为0)
    10. */
    11. if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
    12. return false;
    13. //添加节点到等待队列,并返回节点的前继节点(prev)
    14. Node p = enq(node);
    15. //ws:前驱节点的状态
    16. int ws = p.waitStatus;
    17. /**
    18. * 条件1:ws>0成立:说明前驱节点的状态在阻塞队列中是取消状态,唤醒当前节点
    19. * 条件2:前置条件 ws<=0
    20. * cas 返回true表示设置前驱节点状态为signal状态成功
    21. * cas返回false,什么时候返回false?
    22. * 当前驱node对应的线程是lockInterrupt入队的node时,是会响应中断的,外部线程给前驱线程中断信号之后
    23. * 前驱node会将状态修改为 取消状态,并且执行出队逻辑...
    24. * 前驱节点状态只要不是0 或者 -1 ,那么就唤醒当前线程
    25. */
    26. if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
    27. //如果前节点被取消,说明当前为最后一个等待线程,unpark唤醒当前线程
    28. LockSupport.unpark(node.thread);
    29. //如果 node 的 prev 节点已经是signal 状态,那么被阻塞的 ThreadA 的唤醒工作由 AQS 队列来完成
    30. return true;
    31. }
    执行完 doSignal 以后,会把 condition 队列中的节点转移到 aqs 队列上,逻辑结构图如下,这个时候会判断 ThreadA 的 prev 节点也就是 head 节点的 waitStatus,如果大于 0 或者设置 SIGNAL 失败,表示节点被设置成了 CANCELLED 状态。这个时候会唤醒ThreadA 这个线程。否则就基于 AQS 队列的机制来唤醒,也就是等到 ThreadB 释放锁之后来唤醒 ThreadA。
    1.jpg
    再往下就是被阻塞的线程唤醒之后的逻辑。

11.AQS.await

前面在分析 await 方法时,线程会被阻塞。而通过 signal被唤醒之后又继续回到上次执行的逻辑中。

checkInterruptWhileWaiting 这个方法是干嘛呢?其实从名字就可以看出来,就是 ThreadA 在 condition 队列被阻塞的过程中,有没有被其他线程触发过中断请求。

  1. public final void await() throws InterruptedException {
  2. if (Thread.interrupted())//表示 await 允许被中断
  3. throw new InterruptedException();
  4. Node node = addConditionWaiter();//创建一个新的节点,节点状态为 condition,采用的数据结构仍然是链表
  5. int savedState = fullyRelease(node); //释放当前的锁,得到锁的状态,并唤醒 AQS 队列中的一个线程
  6. int interruptMode = 0;
  7. //如果当前节点没有在同步队列上,即还没有被 signal,则将当前线程阻塞
  8. while (!isOnSyncQueue(node)) {//判断这个节点是否在 AQS 队列上,第一次判断的是 false,因为前面已经释放锁了
  9. LockSupport.park(this);//通过 park 挂起当前线程
  10. if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  11. break;
  12. }
  13. // 当这个线程醒来,会尝试拿锁, 当 acquireQueued返回 false 就是拿到锁了.
  14. // interruptMode != THROW_IE -> 表示这个线程没有成功将 node 入队,但 signal 执行了 enq 方法让其入队了.
  15. // 将这个变量设置成 REINTERRUPT
  16. if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
  17. interruptMode = REINTERRUPT;
  18. // 如果 node 的下一个等待者不是 null, 则进行清理,清理 Condition 队列上的节点.
  19. // 如果是 null ,就没有什么好清理的了.
  20. if (node.nextWaiter != null) // clean up if cancelled
  21. unlinkCancelledWaiters();
  22. // 如果线程被中断了,需要抛出异常.或者什么都不做
  23. if (interruptMode != 0)
  24. reportInterruptAfterWait(interruptMode);
  25. }

12.checkInterruptWhileWaiting

如果当前线程被中断,则调用transferAfterCancelledWait 方法判断后续的处理应该是抛出 InterruptedException 还是重新中断。

这里需要注意的地方是,如果第一次 CAS 失败了,则不能判断当前线程是先进行了中断还是先进行了 signal 方法的调用,可能是先执行了 signal 然后中断,也可能是先执行了中断,后执行了 signal,当然,这两个操作肯定是发生在 CAS 之前。这时需要做的就是等待当前线程的 node被添加到 AQS 队列后,也就是 enq 方法返回后,返回false 告诉 checkInterruptWhileWaiting 方法返回REINTERRUPT(1),后续进行重新中断。

简单来说,该方法的返回值代表当前线程是否在 park 的时候被中断唤醒,如果为 true 表示中断在 signal 调用之前,signal 还未执行,那么这个时候会根据 await 的语义,在 await 时遇到中断需要抛出interruptedException,返回 true 就是告诉checkInterruptWhileWaiting 返回 THROW_IE(-1)。如果返回 false,否则表示 signal 已经执行过了,只需要重新响应中断即可。

  1. private int checkInterruptWhileWaiting(Node node) {
  2. return Thread.interrupted() ?
  3. (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
  4. 0;
  5. }
  1. final boolean transferAfterCancelledWait(Node node) {
  2. //使用 cas 修改节点状态,如果还能修改成功,说明线程被中断时,signal 还没有被调用。
  3. // 这里有一个知识点,就是线程被唤醒,并不一定是在 java 层面执行了locksupport.unpark,也可能是调用了线程的 interrupt()方法,这个方法会更新一个中断标识,并且会唤醒处于阻塞状态下的线程。
  4. if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
  5. enq(node);//如果 cas 成功,则把node 添加到 AQS 队列
  6. return true;
  7. }
  8. // 如果 cas 失败,则判断当前 node 是否已经在 AQS 队列上,如果不在,则让给其他线程执行
  9. // 当 node 被触发了 signal 方法时, node 就会被加到 aqs 队列上
  10. while (!isOnSyncQueue(node))//循环检测 node 是否已经成功添加到 AQS 队列中。如果没有,则通过 yield
  11. Thread.yield();//使当前线程由执行状态,变成为就绪状态,让出cpu时间,在下一个线程执行时候,此线程有可能被执行,也有可能没有被执行。
  12. return false;
  13. }

12.AQS.acquireQueued

这个方法是当前被唤醒的节点ThreadA 去抢占同步锁。并且要恢复到原本的重入次数状态。调用完这个方法之后,AQS 队列的状态如下:

将 head 节点的 waitStatus 设置为-1,Signal 状态。

1.jpg

12.reportInterruptAfterWait

根据 checkInterruptWhileWaiting 方法返回的中断标识来进行中断上报。

如果是 THROW_IE,则抛出中断异常。

如果是 REINTERRUPT,则重新响应中断。

  1. private void reportInterruptAfterWait(int interruptMode)
  2. throws InterruptedException {
  3. if (interruptMode == THROW_IE)
  4. throw new InterruptedException();
  5. else if (interruptMode == REINTERRUPT)
  6. selfInterrupt();
  7. }

13.总结与梳理

线程 awaitThread 先通过 lock.lock()方法获取锁成功后调用了 condition.await 方法进入等待队列,而另一个线程 signalThread 通过 lock.lock()方法获取锁成功后调用了 condition.signal 或者 signalAll 方法,使得线程awaitThread 能够有机会移入到同步队列中,当其他线程释放 lock 后使得线程 awaitThread 能够有机会获取lock,从而使得线程 awaitThread 能够从 await 方法中退出执行后续操作。如果 awaitThread 获取 lock 失败会直接进入到同步队列。

image.png

阻塞:await()方法中,在线程释放锁资源之后,如果节点不在 AQS 等待队列,则阻塞当前线程,如果在等待队列,则自旋等待尝试获取锁。

释放:signal()后,节点会从 condition 队列移动到 AQS等待队列,则进入正常锁的获取流程。