前言
AbstractQueuedSynchronizer,基于队列实现的抽象同步器,一般被我们称之为 AQS。
AbstractQueuedSynchronizer 提供了一个 FIFO 队列,可以看做是一个可以用来实现锁以及其它需要同步功能的框架。AQS 的使用依靠继承来完成,子类通过继承自 AQS 并实现所需的方法来管理同步状态。例如 ReentrantLock, CountDownLatch 等。
同步器的设计是基于模板模式的,也就是说,使用者需要继承同步器并重写指定的方法,随后将同步器组合在自定义同步组件的实现中,并调用同步器提供的模板方法,而这些模板方法将会调用使用者重写的方法。
Doug Lea 编写 AQS 是有严谨的理论基础的,他的个人博客上有他的论文《The java.util.concurrent Synchronizer Framework》 。
如果想要深入研究 AQS ,可以先了解一下该论文的内容。
AQS 的主要功能
AQS 是 JUC 包中用于构建锁或者其它同步组件(信号量、事件等)的基础框架类。
AQS 主要提供了下面的功能:
- 同步状态的原子性管理
 - 线程的阻塞和解除阻塞
 - 提供阻塞线程的存储队列
 
基于这三大功能,衍生出下面的附加功能:
- 通过中断实现的任务取消,基于线程中断实现
 - 可选的超时设置,也就是调用者可以选择放弃等待
 定义了 Condition 接口,用于支持管程形式的 await/signal/signalAll 操作
AQS 的使用功能
从使用上来说,AQS 的功能可以分为两种:独占和共享。对于这两种功能,有一个很常用的类:ReentrantReadWriteLock,其就是通过两个内部类来分别实现了这两种功能,提供了读锁和写锁的功能。
但子类实现时,只能实现其中的一种功能,要么是独占功能,要么是共享功能。
对于独占功能,例如如下代码: ```java ReentrantLock lock = new ReentrantLock();… public void function(){lock.lock(); try {
// do something…
} finally {lock.unlock();}
}
这个很好理解,通过 ReentrantLock 来保证在 `lock.lock()` 之后的代码在同一时刻只能有一个线程来执行,其余的线程将会被阻塞,直到该线程执行了 `lock.unlock()` 。这就是一个独占锁功能。<br />对于共享功能,例如如下代码:```javaReentrantReadWriteLock lock = new ReentrantReadWriteLock();...public void function(){lock.readLock().lock();try {// do something...} finally {lock.readLock().unlock();}}
代码中的 lock 是 ReentrantReadWriteLock 类的实例,而 lock.readLock() 为获取其中的读锁,即共享锁,使用方式并无差别,但和独占锁是有区别的:
- 读锁与读锁可以共享
 - 读锁与写锁不能共享(排他)
 - 写锁与写锁不能共享(排他)
类图
AQS 的主要数据结构
由于使用 AQS 可以实现锁的功能,那么下面就要分析一下究竟是如何实现的。
AQS 内部维护着一个 FIFO 的队列,该队列就是用来实现线程的并发访问控制。队列中的元素是一个 Node 类型的节点,该 Node 的主要属性如下:static final class Node {int waitStatus;// 前驱节点,当节点加入同步队列时被设置(尾部添加)Node prev;// 后继节点Node next;// 等待队列中的后继节点。如果当前节点是共享的,那么这个字段将是一个 SHARED 常量,也就是说节点类型(独占和共享)和等待队列中的后继节点共用一个字段Node nextWaiter;// 获取同步状态的线程Thread thread;}
waitStatus:表示节点的状态,其中包含的状态如下表所示: 
| 状态 | 值 | 描述 | 
|---|---|---|
| CANCELLED | 1 | 由于在同步队列中等待的线程等待超时或者被中断,需要从同步队列中取消等待,节点进入该状态将不会变化 | 
| SIGNAL | -1 | 当前节点的后继节点将要或已经被阻塞,在当前节点释放时需要 unpark 后继节点 | 
| CONDITION | -2 | 当前节点在等待 condition,即在 Condition Queue 中 | 
| PROPAGATE | -3 | 表示下一次共享式同步状态获取将会无条件地被传播下去 | 
| INITIAL | 0 | 无状态,表示当前节点在队列中等待获取锁 | 
prev:前继节点next:后继节点nextWaiter:存储 Condition 队列中的后继节点thread:当前线程
其中,队列中还有一个 head 节点和一个 tail 节点,分别表示头结点和尾节点。
Node 节点示意图如下:
AQS 中有一个 state 变量,该变量对不同的子类实现具有不同的意义,对 ReentrantLock 来说,它表示加锁的状态:
- 无锁时 state=0,有锁时 state>0;
 - 第一次加锁时,将 state 设置为 1;
 - 由于 ReentrantLock 是可重入锁,所以持有锁的线程可以多次加锁,经过判断加锁线程就是当前持有锁的线程时(
exclusiveOwnerThread==Thread.currentThread()),即可加锁,每次加锁都会将 state 的值+1,state 等于几,就表明当前持有锁的线程加了几次锁; - 每解锁一次,state 的值 
-1,到 0 的时候代表锁已全部释放,其他线程可以争夺这把锁了。 - 当持有锁的线程释放锁后,如果是等待队列获取到了加锁权限,则会在等待队列头部取出第一个线程去获取锁,获取锁的线程会被移出队列;
 
state 变量定义如下:
private volatile int state;
ReentrantLock 类的结构
下面通过 ReentrantLock 的实现进一步分析重入锁的实现。
首先看一下 lock 方法:
public void lock() {sync.lock();}
该方法调用了 sync 实例的 lock 方法,这里要说明一下 ReentrantLock 中的几个内部类:
- Sync
 - FairSync
 - NonfairSync
 
对于 ReentrantLock,有两种获取锁的模式:公平锁和非公平锁。所以对应有两个内部类,都继承自 Sync。而 Sync 继承自 AQS。
本文主要通过公平锁来介绍,看一下 FairSync 的定义:
/*** Sync object for fair locks*/static final class FairSync extends Sync {private static final long serialVersionUID = -3000897897090466540L;final void lock() {acquire(1);}/*** Fair version of tryAcquire. Don't grant access unless* recursive call or no waiters or is first.*/protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();// 获取stateint c = getState();// state=0表示当前队列中没有线程被加锁if (c == 0) {/** 首先判断是否有前继结点,如果没有则当前队列中还没有其他线程;* 设置状态为acquires,即lock方法中写死的1(这里为什么不直接setState?因为可能同时有多个线程同时在执行到此处,所以用CAS来执行);* 设置当前线程独占锁。*/if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}/** 如果state不为0,表示已经有线程独占锁了,这时还需要判断独占锁的线程是否是当前的线程,原因是由于ReentrantLock为可重入锁;* 如果独占锁的线程是当前线程,则将状态加1,并setState;* 这里为什么不用compareAndSetState?因为独占锁的线程已经是当前线程,不需要通过CAS来设置。*/else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}}
AQS 获取独占锁的实现
acquire 方法
acquire 方法流程图:
acquire 是 AQS 中的方法,代码如下:
public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}
该方法主要工作如下:
- 尝试获取独占锁
 - 获取成功则返回,否则执行步骤 
acquireQueued(addWaiter(Node.EXCLUSIVE), arg) - addWaiter 方法将当前线程封装成 Node 对象,并添加到队列尾部
 - 自旋获取锁,并判断中断标志位。如果中断标志位为 true,执行步骤 
selfInterrupt(),否则返回 - 设置线程中断
tryAcquire 方法
tryAcquire 方法在 FairSync 中已经说明,它重写了 AQS 中的方法,在 AQS 中它的定义如下:protected boolean tryAcquire(int arg) {throw new UnsupportedOperationException();}
既然方法需要子类实现,为什么不用 abstract 修饰呢?上文提到 AQS 有独占和共享两种功能,如果用 abstract 来修饰,子类必定会实现一个没有用的空方法,这对子类来说不太友好,所以没有使用 abstract 来修饰。
 
该方法是在 ReentrantLock 中的 FairSync 和 NonfairSync 的两个内部类来实现的,这里以 FairSync 公平锁来说明:
protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {// 首先检查队列中的元素是否可以获得锁if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}// 当前线程是获得锁的线程,表示重入,state+1else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}
上面这段代码是队列中没有 Node 获取锁或锁已被当前线程获取到的代码。返回 false 说明需要把线程封装成 Node 元素,然后排队获取锁。
addWaiter 方法
看一下 addWaiter 方法的定义:
private Node addWaiter(Node mode) {// 根据当前线程创建一个Node对象Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failureNode pred = tail;// 判断tail是否为空,如果为空表示队列是空的,直接enqif (pred != null) {node.prev = pred;// 这里尝试CAS来设置队尾,如果成功则将当前节点设置为tail,否则enqif (compareAndSetTail(pred, node)) {pred.next = node;return node;}}enq(node);return node;}
该方法就是根据当前线程创建一个 Node,然后添加到队列尾部。
enq 方法
private Node enq(final Node node) {// 重复直到成功for (;;) {Node t = tail;// 如果tail为null,则必须创建一个Node节点并进行初始化if (t == null) { // Must initializeif (compareAndSetHead(new Node()))tail = head;} else {node.prev = t;// 尝试CAS来设置队尾if (compareAndSetTail(t, node)) {t.next = node;return t;}}}}
acquireQueued 方法
竞争方向示意图:
该方法将未获取锁的节点(线程封装在 Node 内)放入同步队列中,其主要功能是循环的尝试获取锁,直到成功为止,最后返回中断标志位。
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {// 中断标志位boolean interrupted = false;for (;;) {// 获取前继节点final Node p = node.predecessor();// 如果前继节点是head,则尝试获取if (p == head && tryAcquire(arg)) {// 设置head为当前节点(head中不包含thread)setHead(node);// 清除之前的headp.next = null; // help GCfailed = false;return interrupted;}// 如果p不是head或者获取锁失败,判断是否需要进行parkif (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}
这里有几个问题很重要:
- 头节点是成功获取到同步状态的节点,而头节点的线程释放了同步状态后,将会唤醒后继节点,后继节点的线程被唤醒后需要检查自己的前驱节点是否是头节点。
 - 维护同步队列 FIFO 原则。
什么条件下需要 park?
看下 shouldParkAfterFailedAcquire 方法的代码:private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws = pred.waitStatus;if (ws == Node.SIGNAL)/** This node has already set status asking a release* to signal it, so it can safely park.*/return true;if (ws > 0) {/** Predecessor was cancelled. Skip over predecessors and* indicate retry.*/do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {/** waitStatus must be 0 or PROPAGATE. Indicate that we* need a signal, but don't park yet. Caller will need to* retry to make sure it cannot acquire before parking.*/compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false;}
 
- 如果前一个节点的状态是 SIGNAL,需要 park;
 - 如果 
ws>0,表示已被取消,删除状态是已取消的节点; - 其它情况,设置前继节点的状态为 SIGNAL。
 
如果状态是 0 或 PROPAGATE,将当前节点设置为 SIGNAL,下一次进入循环时,需要 park。
可见,只有在前继节点的状态是 SIGNAL 时,需要 park。第二种情况稍后会详细介绍。
为什么要判断中断状态?
首先要知道,acquireQueued 方法中获取锁的方式是死循环,判断是否中断是在 parkAndCheckInterrupt 方法中实现的,看方法的代码:
private final boolean parkAndCheckInterrupt() {LockSupport.park(this);return Thread.interrupted();}
非常简单,阻塞当前线程,然后返回线程的中断状态并复位中断状态。
注意
Thread.interrupted()方法的作用,该方法是获取线程的中断状态,并复位。也就是说,如果当前线程是中断状态,则第一次调用该方法获取的是 true,第二次则是 false。而isInterrupted()方法则只是返回线程的中断状态,不执行复位操作。
为什么要多做这一步呢?先判断中断状态,然后复位,如果之前线程是中断状态,再进行中断?
这里就要介绍一下 park 方法了。park 方法是 Unsafe 类中的方法,与之对应的是 unpark 方法。简单来说,当前线程如果执行了 park 方法,也就是阻塞了当前线程,反之,unpark 方法就是唤醒一个线程。
具体说明请参考:Java的LockSupport.park()实现分析
park 与 wait 的作用类似,但是对中断状态的处理并不相同。如果当前线程不是中断状态,park 与 wait 的效果是一样的。
如果一个线程是中断状态,这时执行 wait 方法会报 java.lang.IllegalMonitorStateException ,而执行 park 时并不会报错,而是直接返回。
所以,知道了这一点,就可以知道为什么要进行中断状态的复位了:
- 如果当前状态是非中断状态,则在执行 park 时阻塞,这时返回的中断状态是 false;
 - 如果当前线程的中断状态,则 park 方法不起作用,会立即返回,然后 parkAndCheckInterrupt 方法会获取中断状态,也就是 true,并复位。
 再次执行循环的时候,由于在前一步已经把该线程的中断状态进行了复位,则再次调用 park 方法时会阻塞。
死循环不会引起 CPU 使用率飙升?
上面的 park 方法会把线程阻塞,不会引起 CPU 使用率飙升。
cancelAcquire 方法
在 acquireQueued 方法的 finally 语句块中,如果在循环的过程中出现了异常,则执行 cancelAcquire 方法,由于将该节点标记为取消状态。该方法代码如下:
private void cancelAcquire(Node node) {// Ignore if node doesn't existif (node == null)return;// 设置该节点不再关联任何线程node.thread = null;// Skip cancelled predecessors// 通过前继节点跳过取消状态的nodeNode pred = node.prev;while (pred.waitStatus > 0)node.prev = pred = pred.prev;// predNext is the apparent node to unsplice. CASes below will// fail if not, in which case, we lost race vs another cancel// or signal, so no further action is necessary.// 获取过滤后的前继节点的后继节点Node predNext = pred.next;// Can use unconditional write instead of CAS here.// After this atomic step, other Nodes can skip past us.// Before, we are free of interference from other threads.// 设置状态为取消状态node.waitStatus = Node.CANCELLED;/** If we are the tail, remove ourselves.* 1.如果当前节点是tail:* 尝试更新tail节点,设置tail为pred;* 更新失败则返回,成功则设置tail的后继节点为null*/if (node == tail && compareAndSetTail(node, pred)) {compareAndSetNext(pred, predNext, null);} else {// If successor needs signal, try to set pred's next-link// so it will get one. Otherwise wake it up to propagate.int ws;/** 2.如果当前节点不是head的后继节点:* 判断当前节点的前继节点的状态是否是SIGNAL,如果不是则尝试设置前继节点的状态为SIGNAL;* 上面两个条件如果有一个返回true,则再判断前继节点的thread是否不为空;* 若满足以上条件,则尝试设置当前节点的前继节点的后继节点为当前节点的后继节点,也就是相当于将当前节点从队列中删除*/if (pred != head &&((ws = pred.waitStatus) == Node.SIGNAL ||(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&pred.thread != null) {Node next = node.next;if (next != null && next.waitStatus <= 0)compareAndSetNext(pred, predNext, next);} else {// 3.如果是head的后继节点或者状态判断或设置失败,则唤醒当前节点的后继节点unparkSuccessor(node);}node.next = node; // help GC}}
该方法中的执行过程有些复杂,首先是要获取当前节点的前继节点,如果前继节点的状态不是取消状态(即
pred.waitStatus >0),则向前遍历队列,直到遇到第一个waitStatus <=0的节点,并把当前节点的前继节点设置为该节点,然后设置当前节点的状态为取消状态。
接下来的工作可以分为 3 种情况:当前节点是 tail;
- 当前节点不是 head 的后继节点(即队列的第一个节点,不包括 head),也不是 tail;
 - 当前节点是 head 的后继节点。
当前节点是 tail
这种情况很简单,因为 tail 是队列的最后一个节点,如果该节点需要取消,则直接把该节点的前继节点的 next 指向 null,也就是把当前节点移除队列。出队的过程如下:
当前节点不是 head 的后继节点,也不是 tail

这里将 node 的前继节点的 next 指向了 node 的后继节点,真正执行的代码就是下面这一行:compareAndSetNext(pred, predNext, next);
当前节点是 head 的后继节点

这里直接 unpark 后继节点的线程,然后将 next 指向了自己。
这里可能有疑问,既然要删除节点,为什么都没有对 prev 进行操作,而仅仅是修改了 next?
要明确的一点是,这里修改指针的操作都是 CAS ,在 AQS 中使用以compareAndSet开头的方法都是尝试更新,并不保证成功,图中所示的都是执行成功的情况。
那么在执行cancleAcquire方法时,当前节点的前继节点有可能已经执行完并移除队列了(参见 setHead方法),所以在这里只能用 CAS 来尝试更新,而就算是尝试更新,也只能更新 next,不能更新 prev。因为 prev 是不确定的,否则有可能会导致整个队列的不完整,例如把 prev 指向一个已经移除队列的 node。
什么时候更新 prev 呢?其实 prev 是由其它线程来修改的。回去看下shouldParkAfterFailedAcquire方法,该方法由这样一段代码:
这段代码的作用就是通过 prev 遍历到第一个不是取消状态的 node,并修改 prev。do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;
这里为什么能更新 prev?因为shouldParkAfterFailedAcquire方法是在获取锁失败的情况下才能执行,因此进入该方法时,说明已经有线程获得锁了,并且在执行该方法时,当前节点之前的节点不会变化(因为只有但下一个节点获得锁的时候才会设置 head),所以这里可以更新 prev,而且不用 CAS 来更新。AQS 释放独占锁的实现
通过 unlock 方法来实现:
该方法调用了 release 方法,release 是在 AQS 中定义的:public void unlock() {sync.release(1);}
这里首先尝试释放锁,成功了之后要去唤醒后继节点的线程,这里其它的线程才有机会去执行。public final boolean release(int arg) {// 尝试释放锁if (tryRelease(arg)) {// 释放成功后unpark后继节点的线程Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;}
tryRelease 代码如下:
是不是和 tryAcquire 方法类似?该方法也需要被重写,在 Sync 类中的代码如下:protected boolean tryRelease(int arg) {throw new UnsupportedOperationException();}
当前线程被释放后,需要唤醒下一个节点的线程,通过 unparkSuccessor 方法实现:protected final boolean tryRelease(int releases) {// 这里是将锁的数量减1int c = getState() - releases;// 如果释放的线程和获取锁的线程不是同一个,抛出非法监视器状态异常if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;// 由于重入的关系,不是每次释放锁c都等于0,// 直到最后一次释放锁时,才会把当前线程释放if (c == 0) {free = true;setExclusiveOwnerThread(null);}// 记录锁的数量setState(c);return free;}
主要功能就是要唤醒下一个线程,这里private void unparkSuccessor(Node node) {/** If status is negative (i.e., possibly needing signal) try* to clear in anticipation of signalling. It is OK if this* fails or if status is changed by waiting thread.*/int ws = node.waitStatus;if (ws < 0)compareAndSetWaitStatus(node, ws, 0);/** Thread to unpark is held in successor, which is normally* just the next node. But if cancelled or apparently null,* traverse backwards from tail to find the actual* non-cancelled successor.*/Node s = node.next;if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)LockSupport.unpark(s.thread);}
s == null || s.waitStatus > 0判断后继节点是否为空或者是否是取消状态,然后从队列尾部向前遍历找到最前面的一个 waitStatus 小于 0 的节点,至于为什么从尾部开始向前遍历,回想一下 cancelAcquire 方法的处理流程,cancelAcquire 只是设置了 next 的变化,没有设置 prev 的变化,在最后有这样一行代码:node.next = node,如果这时执行了 unparkSuccessor 方法,并且向后遍历的话,就成死循环了,所以这时只有 prev 是最稳定的。
到这里,通过 ReentrantLock 的 lock 和 unlock 来分析 AQS 独占锁的实现已基本完成了,但 ReentrantLock 还有一个非公平锁 NonfairSync。
其实 NonfairSync 和 FairSync 主要区别在于获取锁的方式,公平锁是按顺序获取,而非公平锁是抢占式获取,lock 的时候先去尝试修改 state 变量,如果抢占成功,则获取到锁:
非公平锁的 tryAcuire 方法调用了 nonfairTryAcquire 方法:final void lock() {if (compareAndSetState(0, 1))setExclusiveOwnerThread(Thread.currentThread());elseacquire(1);}
该方法比公平锁的 tryAcquire 方法在第二个 if 判断中少了一个是否存在前继节点判断,FaireSync 中的 tryAcquire 代码中的这个 if 语句块如下:final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}
if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}
总结
本文从 ReentrantLock 出发,比较完整地分析了 AQS 内部独占锁的实现,总体来说实现的思路十分清晰,使用了标志位+队列的方式来处理锁的状态,包括锁的获取,锁的竞争以及锁的释放。
在 AQS 中,state 可以表示锁的数量,也可以表示其它状态,state 的含义由子类去定义,自己只是提供了对 state 的维护。AQS 通过 state 来实现线程对资源的访问控制,而 state 具体的含义要在子类中定义。
AQS 在队列的维护上的实现比较复杂,尤其是节点取消时队列的维护,这里并不是通过一个线程来完成的。同时,AQS 中大量地使用 CAS 来实现更新,这种更新能保证状态和队列的完整性。 
