作者:美团技术团队 https://tech.meituan.com/2019/12/05/aqs-theory-and-apply.html 推荐和另外一篇博客一起看,会更加容易理解:https://javadoop.com/post/AbstractQueuedSynchronizer

前言

Java中的大部分同步类(Lock、Semaphore、ReentrantLock等)都是基于AbstractQueuedSynchronizer(简称为AQS)实现的。AQS是一种提供了原子式管理同步状态、阻塞和唤醒线程功能以及队列模型的简单框架。本文会从应用层逐渐深入到原理层,并通过ReentrantLock的基本特性和ReentrantLock与AQS的关联,来深入解读AQS相关独占锁的知识点,同时采取问答的模式来帮助大家理解AQS。由于篇幅原因,本篇文章主要阐述AQS中独占锁的逻辑和Sync Queue,不讲述包含共享锁和Condition Queue的部分(本篇文章核心为AQS原理剖析,只是简单介绍了ReentrantLock,感兴趣同学可以阅读一下ReentrantLock的源码)。
下面列出本篇文章的大纲和思路,以便于大家更好地理解:
image.png

1 ReentrantLock

1.1 ReentrantLock特性概览

ReentrantLock意思为可重入锁,指的是一个线程能够对一个临界资源重复加锁。为了帮助大家更好地理解ReentrantLock的特性,我们先将ReentrantLock跟常用的Synchronized进行比较,其特性如下(蓝色部分为本篇文章主要剖析的点):
image.png
下面通过伪代码,进行更加直观的比较:

  1. // **************************Synchronized的使用方式**************************
  2. // 1.用于代码块
  3. synchronized (this) {}
  4. // 2.用于对象
  5. synchronized (object) {}
  6. // 3.用于方法
  7. public synchronized void test () {}
  8. // 4.可重入
  9. for (int i = 0; i < 100; i++) {
  10. synchronized (this) {}
  11. }
  12. // **************************ReentrantLock的使用方式**************************
  13. public void test () throw Exception {
  14. // 1.初始化选择公平锁、非公平锁
  15. ReentrantLock lock = new ReentrantLock(true);
  16. // 2.可用于代码块
  17. lock.lock();
  18. try {
  19. try {
  20. // 3.支持多种加锁方式,比较灵活; 具有可重入特性
  21. if(lock.tryLock(100, TimeUnit.MILLISECONDS)){ }
  22. } finally {
  23. // 4.手动释放锁
  24. lock.unlock()
  25. }
  26. } finally {
  27. lock.unlock();
  28. }
  29. }

1.2 ReentrantLock与AQS的关联

通过上文我们已经了解,ReentrantLock支持公平锁和非公平锁(关于公平锁和非公平锁的原理分析,可参考《不可不说的Java“锁”事》),并且ReentrantLock的底层就是由AQS来实现的。那么ReentrantLock是如何通过公平锁和非公平锁与AQS关联起来呢? 我们着重从这两者的加锁过程来理解一下它们与AQS之间的关系(加锁过程中与AQS的关联比较明显,解锁流程后续会介绍)。

ReentrantLock 在内部用了内部类 Sync 来管理锁,所以真正的获取锁和释放锁是由 Sync 的实现类来控制的。

非公平锁源码中的加锁流程如下:

  1. // java.util.concurrent.locks.ReentrantLock#NonfairSync
  2. // 非公平锁
  3. static final class NonfairSync extends Sync {
  4. ...
  5. final void lock() {
  6. if (compareAndSetState(0, 1))
  7. setExclusiveOwnerThread(Thread.currentThread());
  8. else
  9. acquire(1);
  10. }
  11. ...
  12. }

这块代码的含义为:

  • 若通过CAS设置变量State(同步状态)成功,也就是获取锁成功,则将当前线程设置为独占线程。
  • 若通过CAS设置变量State(同步状态)失败,也就是获取锁失败,则进入Acquire方法进行后续处理。

第一步很好理解,但第二步获取锁失败后,后续的处理策略是怎么样的呢?这块可能会有以下思考:

  • 某个线程获取锁失败的后续流程是什么呢?有以下两种可能:

(1) 将当前线程获锁结果设置为失败,获取锁流程结束。这种设计会极大降低系统的并发度,并不满足我们实际的需求。所以就需要下面这种流程,也就是AQS框架的处理流程。
(2) 存在某种排队等候机制,线程继续等待,仍然保留获取锁的可能,获取锁流程仍在继续。

  • 对于问题1的第二种情况,既然说到了排队等候机制,那么就一定会有某种队列形成,这样的队列是什么数据结构呢?
  • 处于排队等候机制中的线程,什么时候可以有机会获取锁呢?
  • 如果处于排队等候机制中的线程一直无法获取锁,还是需要一直等待吗,还是有别的策略来解决这一问题?

带着非公平锁的这些问题,再看下公平锁源码中获锁的方式:

  1. // java.util.concurrent.locks.ReentrantLock#FairSync
  2. static final class FairSync extends Sync {
  3. ...
  4. final void lock() {
  5. acquire(1);
  6. }
  7. ...
  8. }

看到这块代码,我们可能会存在这种疑问:Lock函数通过Acquire方法进行加锁,但是具体是如何加锁的呢?
结合公平锁和非公平锁的加锁流程,虽然流程上有一定的不同,但是都调用了Acquire方法,而Acquire方法是FairSync和UnfairSync的父类AQS中的核心方法。
对于上边提到的问题,其实在ReentrantLock类源码中都无法解答,而这些问题的答案,都是位于Acquire方法所在的类AbstractQueuedSynchronizer中,也就是本文的核心——AQS。下面我们会对AQS以及ReentrantLock和AQS的关联做详细介绍(相关问题答案会在2.3.5小节中解答)。

2 AQS

首先,我们通过下面的架构图来整体了解一下AQS框架:
image.png

  • 上图中有颜色的为Method,无颜色的为Attribution。
  • 总的来说,AQS框架共分为五层,自上而下由浅入深,从AQS对外暴露的API到底层基础数据。
  • 当有自定义同步器接入时,只需重写第一层所需要的部分方法即可,不需要关注底层具体的实现流程。当自定义同步器进行加锁或者解锁操作时,先经过第一层的API进入AQS内部方法,然后经过第二层进行锁的获取,接着对于获取锁失败的流程,进入第三层和第四层的等待队列处理,而这些处理方式均依赖于第五层的基础数据提供层。

下面我们会从整体到细节,从流程到方法逐一剖析AQS框架,主要分析过程如下:
image.png

2.1 原理概览

AQS核心思想是,如果被请求的共享资源空闲,那么就将当前请求资源的线程设置为有效的工作线程,将共享资源设置为锁定状态;如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是CLH队列的变体实现的,将暂时获取不到锁的线程加入到队列中。
CLH:Craig、Landin and Hagersten队列,是单向链表,AQS中的队列是CLH变体的虚拟双向队列(FIFO),AQS是通过将每条请求共享资源的线程封装成一个节点来实现锁的分配。
主要原理图如下:
image.png
AQS使用一个Volatile的int类型的成员变量来表示同步状态,通过内置的FIFO队列来完成资源获取的排队工作,通过CAS完成对State值的修改。

来看看 AQS 有哪些属性,搞清楚这些基本就知道 AQS 是什么套路了。

  1. // 头结点,你直接把它当做 当前持有锁的线程 可能是最好理解的
  2. private transient volatile Node head;
  3. // 阻塞的尾节点,每个新的节点进来,都插入到最后,也就形成了一个链表
  4. private transient volatile Node tail;
  5. // 这个是最重要的,代表当前锁的状态,0代表没有被占用,大于 0 代表有线程持有当前锁
  6. // 这个值可以大于 1,是因为锁可以重入,每次重入都加上 1
  7. private volatile int state;
  8. // 代表当前持有独占锁的线程,举个最重要的使用例子,因为锁可以重入
  9. // reentrantLock.lock()可以嵌套调用多次,所以每次用这个来判断当前线程是否已经拥有了锁
  10. // if (currentThread == getExclusiveOwnerThread()) {state++}
  11. private transient Thread exclusiveOwnerThread; //继承自AbstractOwnableSynchronizer

2.1.1 AQS数据结构

先来看下AQS中最基本的数据结构——Node,Node即为上面CLH变体队列中的节点。
image.png
Node代码:

  1. static final class Node {
  2. // 标识节点当前在共享模式下
  3. static final Node SHARED = new Node();
  4. // 标识节点当前在独占模式下
  5. static final Node EXCLUSIVE = null;
  6. // ======== 下面的几个int常量是给waitStatus用的 ===========
  7. /** waitStatus value to indicate thread has cancelled */
  8. // 代码此线程取消了争抢这个锁
  9. static final int CANCELLED = 1;
  10. /** waitStatus value to indicate successor's thread needs unparking */
  11. // 官方的描述是,其表示当前node的后继节点对应的线程需要被唤醒
  12. static final int SIGNAL = -1;
  13. /** waitStatus value to indicate thread is waiting on condition */
  14. // 本文不分析condition,所以略过吧,下一篇文章会介绍这个
  15. static final int CONDITION = -2;
  16. /**
  17. * waitStatus value to indicate the next acquireShared should
  18. * unconditionally propagate
  19. */
  20. // 同样的不分析,略过吧
  21. static final int PROPAGATE = -3;
  22. // =====================================================
  23. // 取值为上面的1、-1、-2、-3,或者0(以后会讲到)
  24. // 这么理解,暂时只需要知道如果这个值 大于0 代表此线程取消了等待,
  25. // ps: 半天抢不到锁,不抢了,ReentrantLock是可以指定timeouot的。。。
  26. volatile int waitStatus;
  27. // 前驱节点的引用
  28. volatile Node prev;
  29. // 后继节点的引用
  30. volatile Node next;
  31. // 这个就是线程本尊
  32. volatile Thread thread;
  33. // 指向下一个处于CONDITION状态的节点
  34. Node nextWaiter;
  35. // 返回前驱节点,没有的话抛出npe
  36. final Node predecessor() throws NullPointerException {
  37. Node p = prev;
  38. if (p == null)
  39. throw new NullPointerException();
  40. else
  41. return p;
  42. }
  43. }

解释一下几个方法和属性值的含义:

方法和属性值 含义
waitStatus 当前节点在队列中的状态
thread 表示处于该节点的线程
prev 前驱指针
predecessor 返回前驱节点,没有的话抛出npe
nextWaiter 指向下一个处于CONDITION状态的节点(由于本篇文章不讲述Condition Queue队列,这个指针不多介绍)
next 后继指针

线程两种锁的模式:

模式 含义
SHARED 表示线程以共享的模式等待锁
EXCLUSIVE 表示线程正在以独占的方式等待锁

waitStatus有下面几个枚举值:

枚举 含义
0 当一个Node被初始化的时候的默认值
CANCELLED 为1,表示线程获取锁的请求已经取消了
CONDITION 为-2,表示节点在等待队列中,节点线程等待唤醒
PROPAGATE 为-3,当前线程处在SHARED情况下,该字段才会使用
SIGNAL 为-1,表示线程已经准备好了,就等资源释放了

2.1.2 同步状态State

在了解数据结构后,接下来了解一下AQS的同步状态——State。AQS中维护了一个名为state的字段,意为同步状态,是由Volatile修饰的,用于展示当前临界资源的获锁情况。

  1. // java.util.concurrent.locks.AbstractQueuedSynchronizer
  2. private volatile int state;

下面提供了几个访问这个字段的方法:

方法名 描述
protected final int getState() 获取State的值
protected final void setState(int newState) 设置State的值
protected final boolean compareAndSetState(int expect, int update) 使用CAS方式更新State

这几个方法都是Final修饰的,说明子类中无法重写它们。我们可以通过修改State字段表示的同步状态来实现多线程的独占模式和共享模式(加锁过程)。
image.pngimage.png
对于我们自定义的同步工具,需要自定义获取同步状态和释放状态的方式,也就是AQS架构图中的第一层:API层。

2.2 AQS重要方法与ReentrantLock的关联

从架构图中可以得知,AQS提供了大量用于自定义同步器实现的Protected方法。自定义同步器实现的相关方法也只是为了通过修改State字段来实现多线程的独占模式或者共享模式。自定义同步器需要实现以下方法(ReentrantLock需要实现的方法如下,并不是全部):

方法名 描述
protected boolean isHeldExclusively() 该线程是否正在独占资源。只有用到Condition才需要去实现它。
protected boolean tryAcquire(int arg) 独占方式。arg为获取锁的次数,尝试获取资源,成功则返回True,失败则返回False。
protected boolean tryRelease(int arg) 独占方式。arg为释放锁的次数,尝试释放资源,成功则返回True,失败则返回False。
protected int tryAcquireShared(int arg) 共享方式。arg为获取锁的次数,尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
protected boolean tryReleaseShared(int arg) 共享方式。arg为释放锁的次数,尝试释放资源,如果释放后允许唤醒后续等待结点返回True,否则返回False。

一般来说,自定义同步器要么是独占方式,要么是共享方式,它们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。ReentrantLock是独占锁,所以实现了tryAcquire-tryRelease。
以非公平锁为例,这里主要阐述一下非公平锁与AQS之间方法的关联之处,具体每一处核心方法的作用会在文章后面详细进行阐述。
image.png

为了帮助大家理解ReentrantLock和AQS之间方法的交互过程,以非公平锁为例,我们将加锁和解锁的交互流程单独拎出来强调一下,以便于对后续内容的理解。
image.png
加锁:

  • 通过ReentrantLock的加锁方法Lock进行加锁操作。
  • 会调用到内部类Sync的Lock方法,由于Sync#lock是抽象方法,根据ReentrantLock初始化选择的公平锁和非公平锁,执行相关内部类的Lock方法,本质上都会执行AQS的Acquire方法。
  • AQS的Acquire方法会执行tryAcquire方法,但是由于tryAcquire需要自定义同步器实现,因此执行了ReentrantLock中的tryAcquire方法,由于ReentrantLock是通过公平锁和非公平锁内部类实现的tryAcquire方法,因此会根据锁类型不同,执行不同的tryAcquire。
  • tryAcquire是获取锁逻辑,获取失败后,会执行框架AQS的后续逻辑,跟ReentrantLock自定义同步器无关。

解锁:

  • 通过ReentrantLock的解锁方法Unlock进行解锁。
  • Unlock会调用内部类Sync的Release方法,该方法继承于AQS。
  • Release中会调用tryRelease方法,tryRelease需要自定义同步器实现,tryRelease只在ReentrantLock中的Sync实现,因此可以看出,释放锁的过程,并不区分是否为公平锁。
  • 释放成功后,所有处理由AQS框架完成,与自定义同步器无关。

通过上面的描述,大概可以总结出ReentrantLock加锁解锁时API层核心方法的映射关系。
image.png

2.3 通过ReentrantLock理解AQS

ReentrantLock中公平锁和非公平锁在底层是相同的,这里以非公平锁为例进行分析。
在非公平锁中,有一段这样的代码:

  1. // java.util.concurrent.locks.ReentrantLock
  2. static final class NonfairSync extends Sync {
  3. ...
  4. final void lock() {
  5. // 先使用CAS尝试获取一下锁,这个非公平锁的第一个体现
  6. if (compareAndSetState(0, 1))
  7. setExclusiveOwnerThread(Thread.currentThread());
  8. else
  9. // 如果没有抢到锁,就调用AQS的acquire方法
  10. acquire(1);
  11. }
  12. ...
  13. }

看一下这个Acquire是怎么写的:

  1. // java.util.concurrent.locks.AbstractQueuedSynchronizer
  2. public final void acquire(int arg) {
  3. // 需要先尝试一下获取锁,
  4. // 这次尝试不是第一次那种暴力尝试获取,而且有点类似偏向锁的思想,多个判断是否是当前线程获取到了锁,就没必要进队列等待(又是挂起,又是等待被唤醒的)
  5. if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  6. selfInterrupt();
  7. }

再看一下tryAcquire方法:

  1. // java.util.concurrent.locks.AbstractQueuedSynchronizer
  2. protected boolean tryAcquire(int arg) {
  3. throw new UnsupportedOperationException();
  4. }

可以看出,这里只是AQS的简单实现,具体获取锁的实现方法是由各自的公平锁和非公平锁单独实现的(以ReentrantLock为例)。如果该方法返回了True,则说明当前线程获取锁成功,就不用往后执行了;如果获取失败,就需要加入到等待队列中。下面会详细解释线程是何时以及怎样被加入进等待队列中的。

非公平锁的tryAcquire方法:

  1. // class NonfairSync;
  2. protected final boolean tryAcquire(int acquires) {
  3. return nonfairTryAcquire(acquires);
  4. }
  5. // class Sync
  6. final boolean nonfairTryAcquire(int acquires) {
  7. // 获取当前线程
  8. final Thread current = Thread.currentThread();
  9. // 获取State状态
  10. int c = getState();
  11. // 如果没有线程使用共享资源
  12. if (c == 0) {
  13. // 尝试CAS获取共享资源,这里非公平还是公平锁都会CAS获取一下,
  14. if (compareAndSetState(0, acquires)) {
  15. // 设置独占锁
  16. setExclusiveOwnerThread(current);
  17. return true;
  18. }
  19. }
  20. // 进入这个else if分支,说明是重入了,需要操作:state=state+acquires(一般是1),这里不存在并发问题
  21. else if (current == getExclusiveOwnerThread()) {
  22. // 累加加锁次数,实现可重入锁
  23. int nextc = c + acquires;
  24. if (nextc < 0) // overflow
  25. throw new Error("Maximum lock count exceeded");
  26. // 更新State状态,目前记录的是重入锁的次数
  27. setState(nextc);
  28. return true;
  29. }
  30. // 尝试获取锁失败,返回false,进入AQS框架的acquire方法,将当前线程加入CLH队列,实现阻塞
  31. return false;
  32. }

公平锁的tryAcquire方法:

  1. protected final boolean tryAcquire(int acquires) {
  2. final Thread current = Thread.currentThread();
  3. int c = getState();
  4. if (c == 0) {
  5. // 多了一个hasQueuedPredecessors()方法,查询当前线程是不是队列第一个,实现公平锁
  6. if (!hasQueuedPredecessors() &&
  7. compareAndSetState(0, acquires)) { // 公平锁也CAS一下是防止一个极端情况:瞬间有其他线程抢先获取到了锁
  8. setExclusiveOwnerThread(current);
  9. return true;
  10. }
  11. }
  12. // 重入锁实现原理
  13. else if (current == getExclusiveOwnerThread()) {
  14. int nextc = c + acquires;
  15. if (nextc < 0)
  16. throw new Error("Maximum lock count exceeded");
  17. setState(nextc);
  18. return true;
  19. }
  20. return false;
  21. }

2.3.1 线程加入等待队列

2.3.1.1 加入队列的时机

当执行Acquire(1)时,会通过tryAcquire获取锁。在这种情况下,如果获取锁失败,就会调用addWaiter加入到等待队列中去。

  1. // java.util.concurrent.locks.AbstractQueuedSynchronizer
  2. public final void acquire(int arg) {
  3. if (!tryAcquire(arg) &&
  4. acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 如果获取锁失败,就会调用addWaiter加入到等待队列中去
  5. selfInterrupt();
  6. }

2.3.1.2 如何加入队列

获取锁失败后,会执行addWaiter(Node.EXCLUSIVE)加入等待队列,具体实现方法如下:

  1. // java.util.concurrent.locks.AbstractQueuedSynchronizer
  2. // 此方法的作用是把线程包装成node,同时进入到队列中
  3. // 参数mode此时是Node.EXCLUSIVE,代表独占模式
  4. private Node addWaiter(Node mode) {
  5. // 通过当前的线程和锁模式新建一个节点
  6. Node node = new Node(Thread.currentThread(), mode);
  7. // 以下几行代码想把当前node加到链表的最后面去,也就是进到阻塞队列的最后
  8. // Pred指针指向尾节点Tail
  9. Node pred = tail;
  10. // 意思是队列不为空
  11. if (pred != null) {
  12. // 将当前的队尾节点,设置为新节点的前驱
  13. node.prev = pred;
  14. // 用CAS把自己设置为队尾, 如果成功后,tail == node 了,这个节点成为阻塞队列新的尾巴
  15. if (compareAndSetTail(pred, node)) {
  16. // 进到这里说明设置成功,当前node==tail, 将自己与之前的队尾相连,
  17. // 上面已经有 node.prev = pred,加上下面这句,也就实现了和之前的尾节点双向连接了
  18. pred.next = node;
  19. return node;
  20. }
  21. }
  22. // 代码运行到这里说明 pred==null(队列是空的) 或者 CAS失败(有线程在竞争入队),需要初始化CLH队列或者采用自旋转入队。
  23. enq(node);
  24. return node;
  25. }
  26. // 这个方法主要是对tailOffset和Expect进行比较,如果tailOffset的Node和Expect的Node地址是相同的,那么设置Tail的值为Update的值
  27. private final boolean compareAndSetTail(Node expect, Node update) {
  28. return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
  29. }

主要的流程如下:

  • 通过当前的线程和锁模式新建一个节点。
  • Pred指针指向尾节点Tail。
  • 将New中Node的Prev指针指向Pred。
  • 通过compareAndSetTail方法,完成尾节点的设置。这个方法主要是对tailOffset和Expect进行比较,如果tailOffset的Node和Expect的Node地址是相同的,那么设置Tail的值为Update的值。 ```java // java.util.concurrent.locks.AbstractQueuedSynchronizer

static { try { stateOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField(“state”)); headOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField(“head”)); tailOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField(“tail”)); waitStatusOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField(“waitStatus”)); nextOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField(“next”)); } catch (Exception ex) { throw new Error(ex); } }

  1. AQS的静态代码块可以看出,都是获取一个对象的属性相对于该对象在内存当中的偏移量,这样我们就可以根据这个偏移量在对象内存当中找到这个属性。tailOffset指的是tail对应的偏移量,所以这个时候会将new出来的Node置为当前队列的尾节点。同时,由于是双向链表,也需要将前一个节点指向尾节点。
  2. - 如果Pred指针是Null(说明等待队列中没有元素),或者当前Pred指针和Tail指向的位置不同(说明被别的线程已经修改),就需要看一下Enq的方法。
  3. ```java
  4. // java.util.concurrent.locks.AbstractQueuedSynchronizer
  5. // 采用自旋的方式入队
  6. // 之前说过,到这个方法只有两种可能:等待队列为空,或者有线程竞争入队,
  7. // 自旋在这边的语义是:CAS设置tail过程中,竞争一次竞争不到,我就多次竞争,总会排到的
  8. private Node enq(final Node node) {
  9. for (;;) {
  10. // 获取CLH队列的尾节点
  11. Node t = tail;
  12. // 如果是CLH队列为空进来的,必须先初始化
  13. if (t == null) { // Must initialize
  14. // 创建一个哨兵节点作为头节点,由于可能是多线程进来的,需要使用CAS创建头节点
  15. if (compareAndSetHead(new Node()))
  16. // 给后面用:这个时候head节点的waitStatus==0, 看new Node()构造方法就知道了
  17. // 这个时候有了head,但是tail还是null,设置一下,
  18. // 把tail指向head,放心,马上就有线程要来了,到时候tail就要被抢了
  19. // 注意:这里只是设置了tail=head,这里可没return哦,没有return,没有return
  20. // 所以,设置完了以后,继续for循环,下次就到下面的else分支了
  21. tail = head;
  22. }
  23. // 如果是CAS失败进来的,继续CAS
  24. else {
  25. // 下面几行,和上一个方法 addWaiter 是一样的,
  26. // 只是这个套在无限循环里,反正就是将当前线程排到队尾,有线程竞争的话排不上重复排
  27. node.prev = t;
  28. if (compareAndSetTail(t, node)) {
  29. t.next = node;
  30. return t;
  31. }
  32. }
  33. }
  34. }

如果没有被初始化,需要进行初始化一个头结点出来。但请注意,初始化的头结点并不是当前线程节点,而是调用了无参构造函数的节点。如果经历了初始化或者并发导致队列中有元素,则与之前的方法相同。其实,addWaiter就是一个在双端链表添加尾节点的操作,需要注意的是,双端链表的头结点是一个无参构造函数的头结点。
总结一下,线程获取锁的时候,过程大体如下:

  1. 当没有线程获取到锁时,线程1获取锁成功。
  2. 线程2申请锁,但是锁被线程1占有。

image.png

  1. 如果再有线程要获取锁,依次在队列中往后排队即可。

回到上边的代码,hasQueuedPredecessors是公平锁加锁时判断等待队列中是否存在有效节点的方法。如果返回False,说明当前线程可以争取共享资源;如果返回True,说明队列中存在有效节点,当前线程必须加入到等待队列中。

  1. // java.util.concurrent.locks.ReentrantLock
  2. public final boolean hasQueuedPredecessors() {
  3. // The correctness of this depends on head being initialized
  4. // before tail and on head.next being accurate if the current
  5. // thread is first in queue.
  6. Node t = tail; // Read fields in reverse initialization order
  7. Node h = head;
  8. Node s;
  9. return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());
  10. }

看到这里,我们理解一下 h != t && ((s = h.next) == null || s.thread != Thread.currentThread());
为什么要判断的头结点的下一个节点?第一个节点储存的数据是什么?
双向链表中,第一个节点为虚节点,其实并不存储任何信息,只是占位。真正的第一个有数据的节点,是在第二个节点开始的。

  • 当h != t时: 如果(s = h.next) == null,等待队列正在有线程进行初始化,但只是进行到了Tail指向Head,没有将Head指向Tail,此时队列中有元素,并且就是当前线程,需要返回True(这块具体见下边代码分析)。
  • 如果(s = h.next) != null,说明此时队列中至少有一个有效节点。如果此时s.thread == Thread.currentThread(),说明等待队列的第一个有效节点中的线程与当前线程相同,那么当前线程是可以获取资源的;如果s.thread != Thread.currentThread(),说明等待队列的第一个有效节点线程与当前线程不同,当前线程必须加入进等待队列。 ```java // java.util.concurrent.locks.AbstractQueuedSynchronizer#enq

if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } }

  1. 节点入队不是原子操作,所以会出现短暂的head != tail,此时Tail指向最后一个节点,而且Tail指向Head。如果Head没有指向Tail(可见567行),这种情况下也需要将相关线程加入队列中。所以这块代码是为了解决极端情况下的并发问题。
  2. <a name="d0WIS"></a>
  3. #### 2.3.1.3 等待队列中线程出队列时机
  4. 回到最初的源码:
  5. ```java
  6. // java.util.concurrent.locks.AbstractQueuedSynchronizer
  7. public final void acquire(int arg) {
  8. if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  9. selfInterrupt();
  10. }

上文解释了addWaiter方法,这个方法其实就是把对应的线程以Node的数据结构形式加入到双端队列里,返回的是一个包含该线程的Node。而这个Node会作为参数,进入到acquireQueued方法中。acquireQueued方法可以对排队中的线程进行“获锁”操作。

总的来说,一个线程获取锁失败了,被放入等待队列,acquireQueued会把放入队列中的线程不断去获取锁,直到获取成功或者不再需要获取(中断)。

下面我们从“何时出队列?”和“如何出队列?”两个方向来分析一下acquireQueued源码:

  1. // java.util.concurrent.locks.AbstractQueuedSynchronizer
  2. // 下面这个方法,参数node,经过addWaiter(Node.EXCLUSIVE),此时已经进入阻塞队列
  3. // 注意一下:如果acquireQueued(addWaiter(Node.EXCLUSIVE), arg))返回true的话,
  4. // 意味着上面这段代码将进入selfInterrupt(),所以正常情况下,下面应该返回false
  5. // 这个方法非常重要,应该说真正的线程挂起,然后被唤醒后去获取锁,都在这个方法里了
  6. final boolean acquireQueued(final Node node, int arg) {
  7. // 标记是否没拿到资源
  8. boolean failed = true;
  9. try {
  10. // 标记等待过程中是否中断过
  11. boolean interrupted = false;
  12. // 开始自旋,要么获取锁,要么中断
  13. for (;;) {
  14. // 获取当前节点的前驱节点
  15. final Node p = node.predecessor();
  16. // p == head 说明当前节点虽然进到了阻塞队列,但是是阻塞队列的第一个,因为它的前驱是head
  17. // 注意,阻塞队列不包含head节点,head一般指的是占有锁的线程,head后面的才称为阻塞队列,所以当前节点可以去尝试抢一下锁
  18. // 这里我们说一下,为什么可以去试试:
  19. // 首先,它是队头,这个是第一个条件,其次,当前的head有可能是刚刚初始化的node,
  20. // enq(node) 方法里面有提到,head是延时初始化的,而且new Node()的时候没有设置任何线程
  21. // 也就是说,当前的head不属于任何一个线程,所以作为队头,可以去试一试,
  22. // tryAcquire已经分析过了, 忘记了请往前看一下,就是简单用CAS试操作一下state
  23. if (p == head && tryAcquire(arg)) {
  24. // 获取锁成功,头指针移动到当前node
  25. setHead(node);
  26. p.next = null; // help GC
  27. failed = false;
  28. return interrupted;
  29. }
  30. // p不为头结点,或者p为头节点且当前没有获取到锁(可能是非公平锁被抢占了),
  31. // 这个时候就要判断当前node是否要被阻塞(被阻塞条件:前驱节点的waitStatus为-1),防止无限循环浪费资源。具体两个方法下面细细分析
  32. if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
  33. interrupted = true;
  34. }
  35. } finally {
  36. // 什么时候 failed 会为 true???
  37. // tryAcquire() 方法抛异常的情况
  38. if (failed)
  39. cancelAcquire(node);
  40. }
  41. }

注:setHead方法是把当前节点置为虚节点,但并没有修改waitStatus,因为它是一直需要用的数据。

  1. // java.util.concurrent.locks.AbstractQueuedSynchronizer
  2. private void setHead(Node node) {
  3. head = node;
  4. node.thread = null;
  5. node.prev = null;
  6. }

shouldParkAfterFailedAcquire主要用与判断是否需要挂起当前线程。

  1. // java.util.concurrent.locks.AbstractQueuedSynchronizer
  2. // 刚刚说过,会到这里就是没有抢到锁呗,这个方法说的是:"当前线程没有抢到锁,是否需要挂起当前线程?"
  3. // 第一个参数是前驱节点,第二个参数才是代表当前线程的节点
  4. // 靠前驱节点判断当前线程是否应该被阻塞
  5. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  6. // 获取前驱节点的状态
  7. int ws = pred.waitStatus;
  8. // 前驱节点的 waitStatus == -1 ,说明前驱节点状态正常,当前线程需要被挂起等待唤醒,直接可以返回true
  9. if (ws == Node.SIGNAL)
  10. return true;
  11. // 通过枚举值我们知道waitStatus>0是取消状态,说明前驱节点取消了排队。
  12. // 这里需要知道这点:进入阻塞队列排队的线程会被挂起,而唤醒的操作是由前驱节点完成的。
  13. // 所以下面这块代码说的是将当前节点的prev指向waitStatus<=0的节点,
  14. // 简单说,就是为了找个好爹,因为你还得依赖它来唤醒呢,如果前驱节点取消了排队,
  15. // 找前驱节点的前驱节点做爹,往前遍历总能找到一个好爹的
  16. if (ws > 0) {
  17. do {
  18. // 循环向前查找取消节点,把取消节点从队列中剔除
  19. node.prev = pred = pred.prev;
  20. } while (pred.waitStatus > 0);
  21. pred.next = node;
  22. } else {
  23. // 仔细想想,如果进入到这个分支意味着什么
  24. // 前驱节点的waitStatus不等于-1和1,那也就是只可能是0,-2,-3
  25. // 在我们前面的源码中,都没有看到有设置waitStatus的,所以每个新的node入队时,waitStatus都是0
  26. // 正常情况下,前驱节点是之前的 tail,那么它的 waitStatus 应该是 0
  27. // 用CAS将前驱节点的waitStatus设置为Node.SIGNAL(也就是-1)
  28. compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
  29. }
  30. // 这个方法返回 false,那么会再走一次 for 循序,
  31. // 然后再次进来此方法,此时会从第一个分支返回 true
  32. return false;
  33. }

parkAndCheckInterrupt主要用于挂起当前线程,阻塞调用栈,返回当前线程的中断状态。

  1. // private static boolean shouldParkAfterFailedAcquire(Node pred, Node node)
  2. // 这个方法结束根据返回值我们简单分析下:
  3. // 如果返回true, 说明前驱节点的waitStatus==-1,是正常情况,那么当前线程需要被挂起,等待以后被唤醒
  4. // 我们也说过,以后是被前驱节点唤醒,就等着前驱节点拿到锁,然后释放锁的时候叫你好了
  5. // 如果返回false, 说明当前不需要被挂起,为什么呢?往后看
  6. // 跳回到前面是这个方法
  7. // if (shouldParkAfterFailedAcquire(p, node) &&
  8. // parkAndCheckInterrupt())
  9. // interrupted = true;
  10. // 1. 如果shouldParkAfterFailedAcquire(p, node)返回true,
  11. // 那么需要执行parkAndCheckInterrupt():
  12. // 这个方法很简单,因为前面返回true,所以需要挂起线程,这个方法就是负责挂起线程的
  13. // 这里用了LockSupport.park(this)来挂起线程,然后就停在这里了,等待被唤醒=======
  14. private final boolean parkAndCheckInterrupt() {
  15. LockSupport.park(this);
  16. return Thread.interrupted();
  17. }
  18. // 2. 接下来说说如果shouldParkAfterFailedAcquire(p, node)返回false的情况
  19. // 仔细看shouldParkAfterFailedAcquire(p, node),我们可以发现,其实第一次进来的时候,一般都不会返回true的,
  20. // 原因很简单,前驱节点的waitStatus=-1是依赖于后继节点设置的。也就是说,我都还没给前驱设置-1呢,怎么可能是true呢,
  21. // 但是要看到,这个方法是套在循环里的,所以第二次进来的时候状态就是-1了。
  22. // 解释下为什么shouldParkAfterFailedAcquire(p, node)返回false的时候不直接挂起线程:
  23. // 是为了应对在经过这个方法后,node已经是head的直接后继节点了。剩下的读者自己想想吧。

上述方法的流程图如下:
image.png
从上图可以看出,跳出当前循环的条件是当“前置节点是头结点,且当前线程获取锁成功”。为了防止因死循环导致CPU资源被浪费,我们会判断前置节点的状态来决定是否要将当前线程挂起,具体挂起流程用流程图表示如下(shouldParkAfterFailedAcquire流程):
image.png
从队列中释放节点的疑虑打消了,那么又有新问题了:

  • shouldParkAfterFailedAcquire中取消节点是怎么生成的呢?什么时候会把一个节点的waitStatus设置为-1?
  • 是在什么时间释放节点通知到被挂起的线程呢?

这些下一节再详细解释。

最后再用公平锁理一遍源码,坚持就是胜利!

  1. static final class FairSync extends Sync {
  2. private static final long serialVersionUID = -3000897897090466540L;
  3. // 争锁
  4. final void lock() {
  5. acquire(1);
  6. }
  7. // 来自父类AQS,我直接贴过来这边,下面分析的时候同样会这样做,不会给读者带来阅读压力
  8. // 我们看到,这个方法,如果tryAcquire(arg) 返回true, 也就结束了。
  9. // 否则,acquireQueued方法会将线程压到队列中
  10. public final void acquire(int arg) { // 此时 arg == 1
  11. // 首先调用tryAcquire(1)一下,名字上就知道,这个只是试一试
  12. // 因为有可能直接就成功了呢,也就不需要进队列排队了,
  13. // 对于公平锁的语义就是:本来就没人持有锁,根本没必要进队列等待(又是挂起,又是等待被唤醒的)
  14. if (!tryAcquire(arg) &&
  15. // tryAcquire(arg)没有成功,这个时候需要把当前线程挂起,放到阻塞队列中。
  16. acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {
  17. selfInterrupt();
  18. }
  19. }
  20. /**
  21. * Fair version of tryAcquire. Don't grant access unless
  22. * recursive call or no waiters or is first.
  23. */
  24. // 尝试直接获取锁,返回值是boolean,代表是否获取到锁
  25. // 返回true:1.没有线程在等待锁;2.重入锁,线程本来就持有锁,也就可以理所当然可以直接获取
  26. protected final boolean tryAcquire(int acquires) {
  27. final Thread current = Thread.currentThread();
  28. int c = getState();
  29. // state == 0 此时此刻没有线程持有锁
  30. if (c == 0) {
  31. // 虽然此时此刻锁是可以用的,但是这是公平锁,既然是公平,就得讲究先来后到,
  32. // 看看有没有别人在队列中等了半天了
  33. if (!hasQueuedPredecessors() &&
  34. // 如果没有线程在等待,那就用CAS尝试一下,成功了就获取到锁了,
  35. // 不成功的话,只能说明一个问题,就在刚刚几乎同一时刻有个线程抢先了 =_=
  36. // 因为刚刚还没人的,我判断过了
  37. compareAndSetState(0, acquires)) {
  38. // 到这里就是获取到锁了,标记一下,告诉大家,现在是我占用了锁
  39. setExclusiveOwnerThread(current);
  40. return true;
  41. }
  42. }
  43. // 会进入这个else if分支,说明是重入了,需要操作:state=state+1
  44. // 这里不存在并发问题
  45. else if (current == getExclusiveOwnerThread()) {
  46. int nextc = c + acquires;
  47. if (nextc < 0)
  48. throw new Error("Maximum lock count exceeded");
  49. setState(nextc);
  50. return true;
  51. }
  52. // 如果到这里,说明前面的if和else if都没有返回true,说明没有获取到锁
  53. // 回到上面一个外层调用方法继续看:
  54. // if (!tryAcquire(arg)
  55. // && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  56. // selfInterrupt();
  57. return false;
  58. }
  59. // 假设tryAcquire(arg) 返回false,那么代码将执行:
  60. // acquireQueued(addWaiter(Node.EXCLUSIVE), arg),
  61. // 这个方法,首先需要执行:addWaiter(Node.EXCLUSIVE)
  62. /**
  63. * Creates and enqueues node for current thread and given mode.
  64. *
  65. * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
  66. * @return the new node
  67. */
  68. // 此方法的作用是把线程包装成node,同时进入到队列中
  69. // 参数mode此时是Node.EXCLUSIVE,代表独占模式
  70. private Node addWaiter(Node mode) {
  71. Node node = new Node(Thread.currentThread(), mode);
  72. // Try the fast path of enq; backup to full enq on failure
  73. // 以下几行代码想把当前node加到链表的最后面去,也就是进到阻塞队列的最后
  74. Node pred = tail;
  75. // tail!=null => 队列不为空(tail==head的时候,其实队列是空的,不过不管这个吧)
  76. if (pred != null) {
  77. // 将当前的队尾节点,设置为自己的前驱
  78. node.prev = pred;
  79. // 用CAS把自己设置为队尾, 如果成功后,tail == node 了,这个节点成为阻塞队列新的尾巴
  80. if (compareAndSetTail(pred, node)) {
  81. // 进到这里说明设置成功,当前node==tail, 将自己与之前的队尾相连,
  82. // 上面已经有 node.prev = pred,加上下面这句,也就实现了和之前的尾节点双向连接了
  83. pred.next = node;
  84. // 线程入队了,可以返回了
  85. return node;
  86. }
  87. }
  88. // 仔细看看上面的代码,如果会到这里,
  89. // 说明 pred==null(队列是空的) 或者 CAS失败(有线程在竞争入队)
  90. // 读者一定要跟上思路,如果没有跟上,建议先不要往下读了,往回仔细看,否则会浪费时间的
  91. enq(node);
  92. return node;
  93. }
  94. /**
  95. * Inserts node into queue, initializing if necessary. See picture above.
  96. * @param node the node to insert
  97. * @return node's predecessor
  98. */
  99. // 采用自旋的方式入队
  100. // 之前说过,到这个方法只有两种可能:等待队列为空,或者有线程竞争入队,
  101. // 自旋在这边的语义是:CAS设置tail过程中,竞争一次竞争不到,我就多次竞争,总会排到的
  102. private Node enq(final Node node) {
  103. for (;;) {
  104. Node t = tail;
  105. // 之前说过,队列为空也会进来这里
  106. if (t == null) { // Must initialize
  107. // 初始化head节点
  108. // 细心的读者会知道原来 head 和 tail 初始化的时候都是 null 的
  109. // 还是一步CAS,你懂的,现在可能是很多线程同时进来呢
  110. if (compareAndSetHead(new Node()))
  111. // 给后面用:这个时候head节点的waitStatus==0, 看new Node()构造方法就知道了
  112. // 这个时候有了head,但是tail还是null,设置一下,
  113. // 把tail指向head,放心,马上就有线程要来了,到时候tail就要被抢了
  114. // 注意:这里只是设置了tail=head,这里可没return哦,没有return,没有return
  115. // 所以,设置完了以后,继续for循环,下次就到下面的else分支了
  116. tail = head;
  117. } else {
  118. // 下面几行,和上一个方法 addWaiter 是一样的,
  119. // 只是这个套在无限循环里,反正就是将当前线程排到队尾,有线程竞争的话排不上重复排
  120. node.prev = t;
  121. if (compareAndSetTail(t, node)) {
  122. t.next = node;
  123. return t;
  124. }
  125. }
  126. }
  127. }
  128. // 现在,又回到这段代码了
  129. // if (!tryAcquire(arg)
  130. // && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  131. // selfInterrupt();
  132. // 下面这个方法,参数node,经过addWaiter(Node.EXCLUSIVE),此时已经进入阻塞队列
  133. // 注意一下:如果acquireQueued(addWaiter(Node.EXCLUSIVE), arg))返回true的话,
  134. // 意味着上面这段代码将进入selfInterrupt(),所以正常情况下,下面应该返回false
  135. // 这个方法非常重要,应该说真正的线程挂起,然后被唤醒后去获取锁,都在这个方法里了
  136. final boolean acquireQueued(final Node node, int arg) {
  137. boolean failed = true;
  138. try {
  139. boolean interrupted = false;
  140. for (;;) {
  141. final Node p = node.predecessor();
  142. // p == head 说明当前节点虽然进到了阻塞队列,但是是阻塞队列的第一个,因为它的前驱是head
  143. // 注意,阻塞队列不包含head节点,head一般指的是占有锁的线程,head后面的才称为阻塞队列
  144. // 所以当前节点可以去试抢一下锁
  145. // 这里我们说一下,为什么可以去试试:
  146. // 首先,它是队头,这个是第一个条件,其次,当前的head有可能是刚刚初始化的node,
  147. // enq(node) 方法里面有提到,head是延时初始化的,而且new Node()的时候没有设置任何线程
  148. // 也就是说,当前的head不属于任何一个线程,所以作为队头,可以去试一试,
  149. // tryAcquire已经分析过了, 忘记了请往前看一下,就是简单用CAS试操作一下state
  150. if (p == head && tryAcquire(arg)) {
  151. setHead(node);
  152. p.next = null; // help GC
  153. failed = false;
  154. return interrupted;
  155. }
  156. // 到这里,说明上面的if分支没有成功,要么当前node本来就不是队头,
  157. // 要么就是tryAcquire(arg)没有抢赢别人,继续往下看
  158. if (shouldParkAfterFailedAcquire(p, node) &&
  159. parkAndCheckInterrupt())
  160. interrupted = true;
  161. }
  162. } finally {
  163. // 什么时候 failed 会为 true???
  164. // tryAcquire() 方法抛异常的情况
  165. if (failed)
  166. cancelAcquire(node);
  167. }
  168. }
  169. /**
  170. * Checks and updates status for a node that failed to acquire.
  171. * Returns true if thread should block. This is the main signal
  172. * control in all acquire loops. Requires that pred == node.prev
  173. *
  174. * @param pred node's predecessor holding status
  175. * @param node the node
  176. * @return {@code true} if thread should block
  177. */
  178. // 刚刚说过,会到这里就是没有抢到锁呗,这个方法说的是:"当前线程没有抢到锁,是否需要挂起当前线程?"
  179. // 第一个参数是前驱节点,第二个参数才是代表当前线程的节点
  180. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  181. int ws = pred.waitStatus;
  182. // 前驱节点的 waitStatus == -1 ,说明前驱节点状态正常,当前线程需要挂起,直接可以返回true
  183. if (ws == Node.SIGNAL)
  184. /*
  185. * This node has already set status asking a release
  186. * to signal it, so it can safely park.
  187. */
  188. return true;
  189. // 前驱节点 waitStatus大于0 ,之前说过,大于0 说明前驱节点取消了排队。
  190. // 这里需要知道这点:进入阻塞队列排队的线程会被挂起,而唤醒的操作是由前驱节点完成的。
  191. // 所以下面这块代码说的是将当前节点的prev指向waitStatus<=0的节点,
  192. // 简单说,就是为了找个好爹,因为你还得依赖它来唤醒呢,如果前驱节点取消了排队,
  193. // 找前驱节点的前驱节点做爹,往前遍历总能找到一个好爹的
  194. if (ws > 0) {
  195. /*
  196. * Predecessor was cancelled. Skip over predecessors and
  197. * indicate retry.
  198. */
  199. do {
  200. node.prev = pred = pred.prev;
  201. } while (pred.waitStatus > 0);
  202. pred.next = node;
  203. } else {
  204. /*
  205. * waitStatus must be 0 or PROPAGATE. Indicate that we
  206. * need a signal, but don't park yet. Caller will need to
  207. * retry to make sure it cannot acquire before parking.
  208. */
  209. // 仔细想想,如果进入到这个分支意味着什么
  210. // 前驱节点的waitStatus不等于-1和1,那也就是只可能是0,-2,-3
  211. // 在我们前面的源码中,都没有看到有设置waitStatus的,所以每个新的node入队时,waitStatu都是0
  212. // 正常情况下,前驱节点是之前的 tail,那么它的 waitStatus 应该是 0
  213. // 用CAS将前驱节点的waitStatus设置为Node.SIGNAL(也就是-1)
  214. compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
  215. }
  216. // 这个方法返回 false,那么会再走一次 for 循序,
  217. // 然后再次进来此方法,此时会从第一个分支返回 true
  218. return false;
  219. }
  220. // private static boolean shouldParkAfterFailedAcquire(Node pred, Node node)
  221. // 这个方法结束根据返回值我们简单分析下:
  222. // 如果返回true, 说明前驱节点的waitStatus==-1,是正常情况,那么当前线程需要被挂起,等待以后被唤醒
  223. // 我们也说过,以后是被前驱节点唤醒,就等着前驱节点拿到锁,然后释放锁的时候叫你好了
  224. // 如果返回false, 说明当前不需要被挂起,为什么呢?往后看
  225. // 跳回到前面是这个方法
  226. // if (shouldParkAfterFailedAcquire(p, node) &&
  227. // parkAndCheckInterrupt())
  228. // interrupted = true;
  229. // 1. 如果shouldParkAfterFailedAcquire(p, node)返回true,
  230. // 那么需要执行parkAndCheckInterrupt():
  231. // 这个方法很简单,因为前面返回true,所以需要挂起线程,这个方法就是负责挂起线程的
  232. // 这里用了LockSupport.park(this)来挂起线程,然后就停在这里了,等待被唤醒=======
  233. private final boolean parkAndCheckInterrupt() {
  234. LockSupport.park(this);
  235. return Thread.interrupted();
  236. }
  237. // 2. 接下来说说如果shouldParkAfterFailedAcquire(p, node)返回false的情况
  238. // 仔细看shouldParkAfterFailedAcquire(p, node),我们可以发现,其实第一次进来的时候,一般都不会返回true的,原因很简单,前驱节点的waitStatus=-1是依赖于后继节点设置的。也就是说,我都还没给前驱设置-1呢,怎么可能是true呢,但是要看到,这个方法是套在循环里的,所以第二次进来的时候状态就是-1了。
  239. // 解释下为什么shouldParkAfterFailedAcquire(p, node)返回false的时候不直接挂起线程:
  240. // => 是为了应对在经过这个方法后,node已经是head的直接后继节点了。剩下的读者自己想想吧。
  241. }

2.3.2 CANCELLED状态节点生成

acquireQueued方法中的Finally代码:

  1. // java.util.concurrent.locks.AbstractQueuedSynchronizer
  2. final boolean acquireQueued(final Node node, int arg) {
  3. boolean failed = true;
  4. try {
  5. ...
  6. for (;;) {
  7. final Node p = node.predecessor();
  8. if (p == head && tryAcquire(arg)) {
  9. ...
  10. failed = false;
  11. ...
  12. }
  13. ...
  14. } finally {
  15. if (failed)
  16. cancelAcquire(node);
  17. }
  18. }

通过cancelAcquire方法,将Node的状态标记为CANCELLED。接下来,我们逐行来分析这个方法的原理:

  1. // java.util.concurrent.locks.AbstractQueuedSynchronizer
  2. private void cancelAcquire(Node node) {
  3. // 将无效节点过滤
  4. if (node == null)
  5. return;
  6. // 设置该节点不关联任何线程,也就是虚节点
  7. node.thread = null;
  8. Node pred = node.prev;
  9. // 通过前驱节点,跳过取消状态的node
  10. while (pred.waitStatus > 0)
  11. node.prev = pred = pred.prev;
  12. // 获取过滤后的前驱节点的后继节点
  13. Node predNext = pred.next;
  14. // 把当前node的状态设置为CANCELLED
  15. node.waitStatus = Node.CANCELLED;
  16. // 如果当前节点是尾节点,将从后往前的第一个非取消状态的节点设置为尾节点
  17. // 更新失败的话,则进入else,如果更新成功,将tail的后继节点设置为null
  18. if (node == tail && compareAndSetTail(node, pred)) {
  19. compareAndSetNext(pred, predNext, null);
  20. } else {
  21. int ws;
  22. // 如果当前节点不是head的后继节点,1:判断当前节点前驱节点的是否为SIGNAL,2:如果不是,则把前驱节点设置为SINGAL看是否成功
  23. // 如果1和2中有一个为true,再判断当前节点的线程是否为null
  24. // 如果上述条件都满足,把当前节点的前驱节点的后继指针指向当前节点的后继节点
  25. if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) {
  26. Node next = node.next;
  27. if (next != null && next.waitStatus <= 0)
  28. compareAndSetNext(pred, predNext, next);
  29. } else {
  30. // 如果当前节点是head的后继节点,或者上述条件不满足,那就唤醒当前节点的后继节点
  31. unparkSuccessor(node);
  32. }
  33. node.next = node; // help GC
  34. }
  35. }

当前的流程:

  • 获取当前节点的前驱节点,如果前驱节点的状态是CANCELLED,那就一直往前遍历,找到第一个waitStatus <= 0的节点,将找到的Pred节点和当前Node关联,将当前Node设置为CANCELLED。
  • 根据当前节点的位置,考虑以下三种情况:

(1) 当前节点是尾节点。
(2) 当前节点是Head的后继节点。
(3) 当前节点不是Head的后继节点,也不是尾节点。
根据上述第二条,我们来分析每一种情况的流程。
当前节点是尾节点。
image.png
当前节点是Head的后继节点。
image.png
当前节点不是Head的后继节点,也不是尾节点。
image.png
通过上面的流程,我们对于CANCELLED节点状态的产生和变化已经有了大致的了解,但是为什么所有的变化都是对Next指针进行了操作,而没有对Prev指针进行操作呢?什么情况下会对Prev指针进行操作?
执行cancelAcquire的时候,当前节点的前置节点可能已经从队列中出去了(已经执行过Try代码块中的shouldParkAfterFailedAcquire方法了),如果此时修改Prev指针,有可能会导致Prev指向另一个已经移除队列的Node,因此这块变化Prev指针不安全。 shouldParkAfterFailedAcquire方法中,会执行下面的代码,其实就是在处理Prev指针。shouldParkAfterFailedAcquire是获取锁失败的情况下才会执行,进入该方法后,说明共享资源已被获取,当前节点之前的节点都不会出现变化,因此这个时候变更Prev指针比较安全。

  1. do {
  2. node.prev = pred = pred.prev;
  3. } while (pred.waitStatus > 0);

2.3.3 如何解锁

我们已经剖析了加锁过程中的基本流程,接下来再对解锁的基本流程进行分析。由于ReentrantLock在解锁的时候,并不区分公平锁和非公平锁,所以我们直接看解锁的源码:

  1. // java.util.concurrent.locks.ReentrantLock
  2. public void unlock() {
  3. sync.release(1);
  4. }

可以看到,本质释放锁的地方,是通过框架来完成的。

  1. // java.util.concurrent.locks.AbstractQueuedSynchronizer
  2. public final boolean release(int arg) {
  3. if (tryRelease(arg)) {
  4. Node h = head;
  5. if (h != null && h.waitStatus != 0)
  6. unparkSuccessor(h);
  7. return true;
  8. }
  9. return false;
  10. }

在ReentrantLock里面的公平锁和非公平锁的父类Sync定义了可重入锁的释放锁机制。

  1. // java.util.concurrent.locks.ReentrantLock.Sync
  2. // 方法返回当前锁是不是没有被线程持有
  3. protected final boolean tryRelease(int releases) {
  4. // 减少可重入次数
  5. int c = getState() - releases;
  6. // 当前线程不是持有锁的线程,抛出异常
  7. if (Thread.currentThread() != getExclusiveOwnerThread())
  8. throw new IllegalMonitorStateException();
  9. // 是否完全释放锁
  10. boolean free = false;
  11. // 如果持有线程全部释放,将当前独占锁所有线程设置为null,并更新state
  12. if (c == 0) {
  13. free = true;
  14. setExclusiveOwnerThread(null);
  15. }
  16. setState(c);
  17. return free;
  18. }

我们来解释下述源码:

  1. // java.util.concurrent.locks.AbstractQueuedSynchronizer
  2. public final boolean release(int arg) {
  3. // 上边自定义的tryRelease如果返回true,说明该锁没有被任何线程持有
  4. if (tryRelease(arg)) {
  5. // 获取头结点
  6. Node h = head;
  7. // 头结点不为空并且头结点的waitStatus不是初始化节点情况,解除线程挂起状态
  8. if (h != null && h.waitStatus != 0)
  9. unparkSuccessor(h);
  10. return true;
  11. }
  12. return false;
  13. }

这里的判断条件为什么是h != null && h.waitStatus != 0?

  • h == null ,Head还没初始化。初始情况下,head == null,第一个节点入队,Head会被初始化一个虚拟节点。所以说,这里如果还没来得及入队,就会出现head == null 的情况。
  • h != null && waitStatus == 0 表明后继节点对应的线程仍在运行中,不需要唤醒。
  • h != null && waitStatus < 0 表明后继节点可能被阻塞了,需要唤醒。

再看一下unparkSuccessor方法,用于唤醒后继节点:

  1. // java.util.concurrent.locks.AbstractQueuedSynchronizer
  2. // 唤醒后继节点
  3. // 从上面调用处知道,参数node是head头结点
  4. private void unparkSuccessor(Node node) {
  5. // 获取头结点waitStatus
  6. int ws = node.waitStatus;
  7. // 如果head节点当前waitStatus<0, 将其修改为0
  8. if (ws < 0)
  9. compareAndSetWaitStatus(node, ws, 0);
  10. // 获取当前节点的下一个节点
  11. Node s = node.next;
  12. // 下面的代码就是唤醒后继节点,但是有可能后继节点取消了等待(waitStatus==1)
  13. // 从队尾往前找,找到waitStatus<=0的所有节点中排在最前面的
  14. if (s == null || s.waitStatus > 0) {
  15. s = null;
  16. // 就从尾部节点开始找,到队首,找到队列第一个waitStatus<0的节点。
  17. for (Node t = tail; t != null && t != node; t = t.prev)
  18. if (t.waitStatus <= 0)
  19. s = t;
  20. }
  21. // 如果当前节点的下个节点不为空,而且状态<=0,就把当前节点unpark,唤醒
  22. if (s != null)
  23. LockSupport.unpark(s.thread);
  24. }

为什么要从后往前找第一个非Cancelled的节点呢?原因如下。
之前的addWaiter方法:

  1. // java.util.concurrent.locks.AbstractQueuedSynchronizer
  2. private Node addWaiter(Node mode) {
  3. Node node = new Node(Thread.currentThread(), mode);
  4. // Try the fast path of enq; backup to full enq on failure
  5. Node pred = tail;
  6. if (pred != null) {
  7. node.prev = pred;
  8. if (compareAndSetTail(pred, node)) {
  9. pred.next = node;
  10. return node;
  11. }
  12. }
  13. enq(node);
  14. return node;
  15. }

我们从这里可以看到,节点入队并不是原子操作,也就是说,node.prev = pred; compareAndSetTail(pred, node) 这两个地方可以看作Tail入队的原子操作,但是此时pred.next = node;还没执行,如果这个时候执行了unparkSuccessor方法,就没办法从前往后找了,所以需要从后往前找。还有一点原因,在产生CANCELLED状态节点的时候,先断开的是Next指针,Prev指针并未断开,因此也是必须要从后往前遍历才能够遍历完全部的Node。
综上所述,如果是从前往后找,由于极端情况下入队的非原子操作和CANCELLED节点产生过程中断开Next指针的操作,可能会导致无法遍历所有的节点。所以,唤醒对应的线程后,对应的线程就会继续往下执行。继续执行acquireQueued方法以后,中断如何处理?

2.3.4 中断恢复后的执行流程

唤醒后,会执行return Thread.interrupted();,这个函数返回的是当前执行线程的中断状态,并清除。

  1. // java.util.concurrent.locks.AbstractQueuedSynchronizer
  2. private final boolean parkAndCheckInterrupt() {
  3. LockSupport.park(this);
  4. return Thread.interrupted();
  5. }

再回到acquireQueued代码,当parkAndCheckInterrupt返回True或者False的时候,interrupted的值不同,但都会执行下次循环。如果这个时候获取锁成功,就会把当前interrupted返回。

  1. // java.util.concurrent.locks.AbstractQueuedSynchronizer
  2. final boolean acquireQueued(final Node node, int arg) {
  3. boolean failed = true;
  4. try {
  5. boolean interrupted = false;
  6. for (;;) {
  7. final Node p = node.predecessor();
  8. if (p == head && tryAcquire(arg)) {
  9. setHead(node);
  10. p.next = null; // help GC
  11. failed = false;
  12. return interrupted;
  13. }
  14. if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
  15. interrupted = true;
  16. }
  17. } finally {
  18. if (failed)
  19. cancelAcquire(node);
  20. }
  21. }

如果acquireQueued为True,就会执行selfInterrupt方法。

  1. public final void acquire(int arg) {
  2. if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  3. selfInterrupt();
  4. }
  5. // java.util.concurrent.locks.AbstractQueuedSynchronizer
  6. static void selfInterrupt() {
  7. Thread.currentThread().interrupt();
  8. }

该方法其实是为了中断线程。但为什么获取了锁以后还要中断线程呢?这部分属于Java提供的协作式中断知识内容,感兴趣同学可以查阅一下。这里简单介绍一下:

  1. 当中断线程被唤醒时,并不知道被唤醒的原因,可能是当前线程在等待中被中断,也可能是释放了锁以后被唤醒。因此我们通过Thread.interrupted()方法检查中断标记(该方法返回了当前线程的中断状态,并将当前线程的中断标识设置为False),并记录下来,如果发现该线程被中断过,就再中断一次。
  2. 线程在等待资源的过程中被唤醒,唤醒后还是会不断地去尝试获取锁,直到抢到锁为止。也就是说,在整个流程中,并不响应中断,只是记录中断记录。最后抢到锁返回了,那么如果被中断过的话,就需要补充一次中断。

这里的处理方式主要是运用线程池中基本运作单元Worder中的runWorker,通过Thread.interrupted()进行额外的判断处理,感兴趣的同学可以看下ThreadPoolExecutor源码。

2.3.5 小结

我们在1.3小节中提出了一些问题,现在来回答一下。
Q:某个线程获取锁失败的后续流程是什么呢?
A:存在某种排队等候机制,线程继续等待,仍然保留获取锁的可能,获取锁流程仍在继续。
Q:既然说到了排队等候机制,那么就一定会有某种队列形成,这样的队列是什么数据结构呢?
A:是CLH变体的FIFO双端队列。
Q:处于排队等候机制中的线程,什么时候可以有机会获取锁呢?
A:可以详细看下2.3.1.3小节。
Q:如果处于排队等候机制中的线程一直无法获取锁,需要一直等待么?还是有别的策略来解决这一问题?
A:线程所在节点的状态会变成取消状态,取消状态的节点会从队列中释放,具体可见2.3.2小节。
Q:Lock函数通过Acquire方法进行加锁,但是具体是如何加锁的呢?
A:AQS的Acquire会调用tryAcquire方法,tryAcquire由各个自定义同步器实现,通过tryAcquire完成加锁过程。

在并发环境下,加锁和解锁需要以下三个部件的协调:

  1. 锁状态。我们要知道锁是不是被别的线程占有了,这个就是 state 的作用,它为 0 的时候代表没有线程占有锁,可以去争抢这个锁,用 CAS 将 state 设为 1,如果 CAS 成功,说明抢到了锁,这样其他线程就抢不到了,如果锁重入的话,state进行 +1 就可以,解锁就是减 1,直到 state 又变为 0,代表释放锁,所以 lock() 和 unlock() 必须要配对啊。然后唤醒等待队列中的第一个线程,让其来占有锁。
  2. 线程的阻塞和解除阻塞。AQS 中采用了 LockSupport.park(thread) 来挂起线程,用 unpark 来唤醒线程。
  3. 阻塞队列。因为争抢锁的线程可能很多,但是只能有一个线程拿到锁,其他的线程都必须等待,这个时候就需要一个 queue 来管理这些线程,AQS 用的是一个 FIFO 的队列,就是一个链表,每个 node 都持有后继节点的引用。AQS 采用了 CLH 锁的变体来实现,感兴趣的读者可以参考这篇文章关于CLH的介绍,写得简单明了。

    示例图解析

    下面属于回顾环节,用简单的示例来说一遍,如果上面的有些东西没看懂,这里还有一次帮助你理解的机会。
    首先,第一个线程调用 reentrantLock.lock(),翻到最前面可以发现,tryAcquire(1) 直接就返回 true 了,结束。只是设置了 state=1,连 head 都没有初始化,更谈不上什么阻塞队列了。要是线程 1 调用 unlock() 了,才有线程 2 来,那世界就太太太平了,完全没有交集嘛,那我还要 AQS 干嘛。
    如果线程 1 没有调用 unlock() 之前,线程 2 调用了 lock(), 想想会发生什么?
    线程 2 会初始化 head【new Node()】,同时线程 2 也会插入到阻塞队列并挂起 (注意看这里是一个 for 循环,而且设置 head 和 tail 的部分是不 return 的,只有入队成功才会跳出循环)
    1. private Node enq(final Node node) {
    2. for (;;) {
    3. Node t = tail;
    4. if (t == null) { // Must initialize
    5. if (compareAndSetHead(new Node()))
    6. tail = head;
    7. } else {
    8. node.prev = t;
    9. if (compareAndSetTail(t, node)) {
    10. t.next = node;
    11. return t;
    12. }
    13. }
    14. }
    15. }
    首先,是线程 2 初始化 head 节点,此时 head==tail, waitStatus==0
    image.png
    然后线程 2 入队:
    image.png
    同时我们也要看此时节点的 waitStatus,我们知道 head 节点是线程 2 初始化的,此时的 waitStatus 没有设置, java 默认会设置为 0,但是到 shouldParkAfterFailedAcquire 这个方法的时候,线程 2 会把前驱节点,也就是 head 的waitStatus设置为 -1。
    那线程 2 节点此时的 waitStatus 是多少呢,由于没有设置,所以是 0;
    如果线程 3 此时再进来,直接插到线程 2 的后面就可以了,此时线程 3 的 waitStatus 是 0,到 shouldParkAfterFailedAcquire 方法的时候把前驱节点线程 2 的 waitStatus 设置为 -1。
    image.png
    这里可以简单说下 waitStatus 中 SIGNAL(-1) 状态的意思,Doug Lea 注释的是:代表后继节点需要被唤醒。也就是说这个 waitStatus 其实代表的不是自己的状态,而是后继节点的状态,我们知道,每个 node 在入队的时候,都会把前驱节点的状态改为 SIGNAL,然后阻塞,等待被前驱唤醒。这里涉及的是两个问题:有线程取消了排队、唤醒操作。其实本质是一样的,读者也可以顺着 “waitStatus代表后继节点的状态” 这种思路去看一遍源码。

    3 AQS应用

    3.1 ReentrantLock的可重入应用

    ReentrantLock的可重入性是AQS很好的应用之一,在了解完上述知识点以后,我们很容易得知ReentrantLock实现可重入的方法。在ReentrantLock里面,不管是公平锁还是非公平锁,都有一段逻辑。
    公平锁: ```java // java.util.concurrent.locks.ReentrantLock.FairSync#tryAcquire

if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error(“Maximum lock count exceeded”); setState(nextc); return true; }

  1. 非公平锁:
  2. ```java
  3. // java.util.concurrent.locks.ReentrantLock.Sync#nonfairTryAcquire
  4. if (c == 0) {
  5. if (compareAndSetState(0, acquires)){
  6. setExclusiveOwnerThread(current);
  7. return true;
  8. }
  9. }
  10. else if (current == getExclusiveOwnerThread()) {
  11. int nextc = c + acquires;
  12. if (nextc < 0) // overflow
  13. throw new Error("Maximum lock count exceeded");
  14. setState(nextc);
  15. return true;
  16. }

从上面这两段都可以看到,有一个同步状态State来控制整体可重入的情况。State是Volatile修饰的,用于保证一定的可见性和有序性。

  1. // java.util.concurrent.locks.AbstractQueuedSynchronizer
  2. private volatile int state;

接下来看State这个字段主要的过程:

  1. State初始化的时候为0,表示没有任何线程持有锁。
  2. 当有线程持有该锁时,值就会在原来的基础上+1,同一个线程多次获得锁是,就会多次+1,这里就是可重入的概念。
  3. 解锁也是对这个字段-1,一直到0,此线程对锁释放。

    3.2 JUC中的应用场景

    除了上边ReentrantLock的可重入性的应用,AQS作为并发编程的框架,为很多其他同步工具提供了良好的解决方案。下面列出了JUC中的几种同步工具,大体介绍一下AQS的应用场景:
同步工具 同步工具与AQS的关联
ReentrantLock 使用AQS保存锁重复持有的次数。当一个线程获取锁时,ReentrantLock记录当前获得锁的线程标识,用于检测是否重复获取,以及错误线程试图解锁操作时异常情况的处理。
Semaphore 使用AQS同步状态来保存信号量的当前计数。tryRelease会增加计数,acquireShared会减少计数。
CountDownLatch 使用AQS同步状态来表示计数。计数为0时,所有的Acquire操作(CountDownLatch的await方法)才可以通过。
ReentrantReadWriteLock 使用AQS同步状态中的16位保存写锁持有的次数,剩下的16位用于保存读锁的持有次数。
ThreadPoolExecutor Worker利用AQS同步状态实现对独占线程变量的设置(tryAcquire和tryRelease)。

3.3 自定义同步工具

了解AQS基本原理以后,按照上面所说的AQS知识点,自己实现一个同步工具。

  1. public class LeeLock {
  2. private static class Sync extends AbstractQueuedSynchronizer {
  3. @Override
  4. protected boolean tryAcquire (int arg) {
  5. return compareAndSetState(0, 1);
  6. }
  7. @Override
  8. protected boolean tryRelease (int arg) {
  9. setState(0);
  10. return true;
  11. }
  12. @Override
  13. protected boolean isHeldExclusively () {
  14. return getState() == 1;
  15. }
  16. }
  17. private Sync sync = new Sync();
  18. public void lock () {
  19. sync.acquire(1);
  20. }
  21. public void unlock () {
  22. sync.release(1);
  23. }
  24. }

通过我们自己定义的Lock完成一定的同步功能。

  1. public class LeeMain {
  2. static int count = 0;
  3. static LeeLock leeLock = new LeeLock();
  4. public static void main (String[] args) throws InterruptedException {
  5. Runnable runnable = new Runnable() {
  6. @Override
  7. public void run () {
  8. try {
  9. leeLock.lock();
  10. for (int i = 0; i < 10000; i++) {
  11. count++;
  12. }
  13. } catch (Exception e) {
  14. e.printStackTrace();
  15. } finally {
  16. leeLock.unlock();
  17. }
  18. }
  19. };
  20. Thread thread1 = new Thread(runnable);
  21. Thread thread2 = new Thread(runnable);
  22. thread1.start();
  23. thread2.start();
  24. thread1.join();
  25. thread2.join();
  26. System.out.println(count);
  27. }
  28. }

上述代码每次运行结果都会是20000。通过简单的几行代码就能实现同步功能,这就是AQS的强大之处。

总结

我们日常开发中使用并发的场景太多,但是对并发内部的基本框架原理了解的人却不多。由于篇幅原因,本文仅介绍了可重入锁ReentrantLock的原理和AQS原理,希望能够成为大家了解AQS和ReentrantLock等同步器的“敲门砖”。