前言
AbstractQueuedSynchronizer,基于队列实现的抽象同步器,一般被我们称之为 AQS。
AbstractQueuedSynchronizer 提供了一个 FIFO 队列,可以看做是一个可以用来实现锁以及其它需要同步功能的框架。AQS 的使用依靠继承来完成,子类通过继承自 AQS 并实现所需的方法来管理同步状态。例如 ReentrantLock, CountDownLatch 等。
同步器的设计是基于模板模式的,也就是说,使用者需要继承同步器并重写指定的方法,随后将同步器组合在自定义同步组件的实现中,并调用同步器提供的模板方法,而这些模板方法将会调用使用者重写的方法。
Doug Lea 编写 AQS 是有严谨的理论基础的,他的个人博客上有他的论文《The java.util.concurrent Synchronizer Framework》 。
如果想要深入研究 AQS ,可以先了解一下该论文的内容。
AQS 的主要功能
AQS 是 JUC 包中用于构建锁或者其它同步组件(信号量、事件等)的基础框架类。
AQS 主要提供了下面的功能:
- 同步状态的原子性管理
- 线程的阻塞和解除阻塞
- 提供阻塞线程的存储队列
基于这三大功能,衍生出下面的附加功能:
- 通过中断实现的任务取消,基于线程中断实现
- 可选的超时设置,也就是调用者可以选择放弃等待
定义了 Condition 接口,用于支持管程形式的 await/signal/signalAll 操作
AQS 的使用功能
从使用上来说,AQS 的功能可以分为两种:独占和共享。对于这两种功能,有一个很常用的类:ReentrantReadWriteLock,其就是通过两个内部类来分别实现了这两种功能,提供了读锁和写锁的功能。
但子类实现时,只能实现其中的一种功能,要么是独占功能,要么是共享功能。
对于独占功能,例如如下代码: ```java ReentrantLock lock = new ReentrantLock();… public void function(){lock.lock(); try {
// do something…
} finally {
lock.unlock();
}
}
这个很好理解,通过 ReentrantLock 来保证在 `lock.lock()` 之后的代码在同一时刻只能有一个线程来执行,其余的线程将会被阻塞,直到该线程执行了 `lock.unlock()` 。这就是一个独占锁功能。<br />对于共享功能,例如如下代码:
```java
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();...
public void function(){
lock.readLock().lock();
try {
// do something...
} finally {
lock.readLock().unlock();
}
}
代码中的 lock
是 ReentrantReadWriteLock
类的实例,而 lock.readLock()
为获取其中的读锁,即共享锁,使用方式并无差别,但和独占锁是有区别的:
- 读锁与读锁可以共享
- 读锁与写锁不能共享(排他)
- 写锁与写锁不能共享(排他)
类图
AQS 的主要数据结构
由于使用 AQS 可以实现锁的功能,那么下面就要分析一下究竟是如何实现的。
AQS 内部维护着一个 FIFO 的队列,该队列就是用来实现线程的并发访问控制。队列中的元素是一个 Node 类型的节点,该 Node 的主要属性如下:static final class Node {
int waitStatus;
// 前驱节点,当节点加入同步队列时被设置(尾部添加)
Node prev;
// 后继节点
Node next;
// 等待队列中的后继节点。如果当前节点是共享的,那么这个字段将是一个 SHARED 常量,也就是说节点类型(独占和共享)和等待队列中的后继节点共用一个字段
Node nextWaiter;
// 获取同步状态的线程
Thread thread;
}
waitStatus
:表示节点的状态,其中包含的状态如下表所示:
状态 | 值 | 描述 |
---|---|---|
CANCELLED | 1 | 由于在同步队列中等待的线程等待超时或者被中断,需要从同步队列中取消等待,节点进入该状态将不会变化 |
SIGNAL | -1 | 当前节点的后继节点将要或已经被阻塞,在当前节点释放时需要 unpark 后继节点 |
CONDITION | -2 | 当前节点在等待 condition,即在 Condition Queue 中 |
PROPAGATE | -3 | 表示下一次共享式同步状态获取将会无条件地被传播下去 |
INITIAL | 0 | 无状态,表示当前节点在队列中等待获取锁 |
prev
:前继节点next
:后继节点nextWaiter
:存储 Condition 队列中的后继节点thread
:当前线程
其中,队列中还有一个 head 节点和一个 tail 节点,分别表示头结点和尾节点。
Node 节点示意图如下:
AQS 中有一个 state 变量,该变量对不同的子类实现具有不同的意义,对 ReentrantLock 来说,它表示加锁的状态:
- 无锁时 state=0,有锁时 state>0;
- 第一次加锁时,将 state 设置为 1;
- 由于 ReentrantLock 是可重入锁,所以持有锁的线程可以多次加锁,经过判断加锁线程就是当前持有锁的线程时(
exclusiveOwnerThread==Thread.currentThread()
),即可加锁,每次加锁都会将 state 的值+1
,state 等于几,就表明当前持有锁的线程加了几次锁; - 每解锁一次,state 的值
-1
,到 0 的时候代表锁已全部释放,其他线程可以争夺这把锁了。 - 当持有锁的线程释放锁后,如果是等待队列获取到了加锁权限,则会在等待队列头部取出第一个线程去获取锁,获取锁的线程会被移出队列;
state 变量定义如下:
private volatile int state;
ReentrantLock 类的结构
下面通过 ReentrantLock 的实现进一步分析重入锁的实现。
首先看一下 lock 方法:
public void lock() {
sync.lock();}
该方法调用了 sync 实例的 lock 方法,这里要说明一下 ReentrantLock 中的几个内部类:
- Sync
- FairSync
- NonfairSync
对于 ReentrantLock,有两种获取锁的模式:公平锁和非公平锁。所以对应有两个内部类,都继承自 Sync。而 Sync 继承自 AQS。
本文主要通过公平锁来介绍,看一下 FairSync 的定义:
/**
* Sync object for fair locks
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// 获取state
int c = getState();
// state=0表示当前队列中没有线程被加锁
if (c == 0) {
/*
* 首先判断是否有前继结点,如果没有则当前队列中还没有其他线程;
* 设置状态为acquires,即lock方法中写死的1(这里为什么不直接setState?因为可能同时有多个线程同时在执行到此处,所以用CAS来执行);
* 设置当前线程独占锁。
*/
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
/*
* 如果state不为0,表示已经有线程独占锁了,这时还需要判断独占锁的线程是否是当前的线程,原因是由于ReentrantLock为可重入锁;
* 如果独占锁的线程是当前线程,则将状态加1,并setState;
* 这里为什么不用compareAndSetState?因为独占锁的线程已经是当前线程,不需要通过CAS来设置。
*/
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
AQS 获取独占锁的实现
acquire 方法
acquire 方法流程图:
acquire 是 AQS 中的方法,代码如下:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
该方法主要工作如下:
- 尝试获取独占锁
- 获取成功则返回,否则执行步骤
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
- addWaiter 方法将当前线程封装成 Node 对象,并添加到队列尾部
- 自旋获取锁,并判断中断标志位。如果中断标志位为 true,执行步骤
selfInterrupt()
,否则返回 - 设置线程中断
tryAcquire 方法
tryAcquire 方法在 FairSync 中已经说明,它重写了 AQS 中的方法,在 AQS 中它的定义如下:protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();}
既然方法需要子类实现,为什么不用 abstract 修饰呢?上文提到 AQS 有独占和共享两种功能,如果用 abstract 来修饰,子类必定会实现一个没有用的空方法,这对子类来说不太友好,所以没有使用 abstract 来修饰。
该方法是在 ReentrantLock 中的 FairSync 和 NonfairSync 的两个内部类来实现的,这里以 FairSync 公平锁来说明:
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 首先检查队列中的元素是否可以获得锁
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 当前线程是获得锁的线程,表示重入,state+1
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
上面这段代码是队列中没有 Node 获取锁或锁已被当前线程获取到的代码。返回 false 说明需要把线程封装成 Node 元素,然后排队获取锁。
addWaiter 方法
看一下 addWaiter 方法的定义:
private Node addWaiter(Node mode) {
// 根据当前线程创建一个Node对象
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
// 判断tail是否为空,如果为空表示队列是空的,直接enq
if (pred != null) {
node.prev = pred;
// 这里尝试CAS来设置队尾,如果成功则将当前节点设置为tail,否则enq
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
该方法就是根据当前线程创建一个 Node,然后添加到队列尾部。
enq 方法
private Node enq(final Node node) {
// 重复直到成功
for (;;) {
Node t = tail;
// 如果tail为null,则必须创建一个Node节点并进行初始化
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
// 尝试CAS来设置队尾
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}}
acquireQueued 方法
竞争方向示意图:
该方法将未获取锁的节点(线程封装在 Node 内)放入同步队列中,其主要功能是循环的尝试获取锁,直到成功为止,最后返回中断标志位。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
// 中断标志位
boolean interrupted = false;
for (;;) {
// 获取前继节点
final Node p = node.predecessor();
// 如果前继节点是head,则尝试获取
if (p == head && tryAcquire(arg)) {
// 设置head为当前节点(head中不包含thread)
setHead(node);
// 清除之前的head
p.next = null; // help GC
failed = false;
return interrupted;
}
// 如果p不是head或者获取锁失败,判断是否需要进行park
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
这里有几个问题很重要:
- 头节点是成功获取到同步状态的节点,而头节点的线程释放了同步状态后,将会唤醒后继节点,后继节点的线程被唤醒后需要检查自己的前驱节点是否是头节点。
- 维护同步队列 FIFO 原则。
什么条件下需要 park?
看下 shouldParkAfterFailedAcquire 方法的代码:private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
- 如果前一个节点的状态是 SIGNAL,需要 park;
- 如果
ws>0
,表示已被取消,删除状态是已取消的节点; - 其它情况,设置前继节点的状态为 SIGNAL。
如果状态是 0 或 PROPAGATE,将当前节点设置为 SIGNAL,下一次进入循环时,需要 park。
可见,只有在前继节点的状态是 SIGNAL 时,需要 park。第二种情况稍后会详细介绍。
为什么要判断中断状态?
首先要知道,acquireQueued 方法中获取锁的方式是死循环,判断是否中断是在 parkAndCheckInterrupt 方法中实现的,看方法的代码:
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
非常简单,阻塞当前线程,然后返回线程的中断状态并复位中断状态。
注意
Thread.interrupted()
方法的作用,该方法是获取线程的中断状态,并复位。也就是说,如果当前线程是中断状态,则第一次调用该方法获取的是 true,第二次则是 false。而isInterrupted()
方法则只是返回线程的中断状态,不执行复位操作。
为什么要多做这一步呢?先判断中断状态,然后复位,如果之前线程是中断状态,再进行中断?
这里就要介绍一下 park 方法了。park 方法是 Unsafe 类中的方法,与之对应的是 unpark 方法。简单来说,当前线程如果执行了 park 方法,也就是阻塞了当前线程,反之,unpark 方法就是唤醒一个线程。
具体说明请参考:Java的LockSupport.park()实现分析
park 与 wait 的作用类似,但是对中断状态的处理并不相同。如果当前线程不是中断状态,park 与 wait 的效果是一样的。
如果一个线程是中断状态,这时执行 wait 方法会报 java.lang.IllegalMonitorStateException
,而执行 park 时并不会报错,而是直接返回。
所以,知道了这一点,就可以知道为什么要进行中断状态的复位了:
- 如果当前状态是非中断状态,则在执行 park 时阻塞,这时返回的中断状态是 false;
- 如果当前线程的中断状态,则 park 方法不起作用,会立即返回,然后 parkAndCheckInterrupt 方法会获取中断状态,也就是 true,并复位。
再次执行循环的时候,由于在前一步已经把该线程的中断状态进行了复位,则再次调用 park 方法时会阻塞。
死循环不会引起 CPU 使用率飙升?
上面的 park 方法会把线程阻塞,不会引起 CPU 使用率飙升。
cancelAcquire 方法
在 acquireQueued 方法的 finally 语句块中,如果在循环的过程中出现了异常,则执行 cancelAcquire 方法,由于将该节点标记为取消状态。该方法代码如下:
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
// 设置该节点不再关联任何线程
node.thread = null;
// Skip cancelled predecessors
// 通过前继节点跳过取消状态的node
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
// 获取过滤后的前继节点的后继节点
Node predNext = pred.next;
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
// 设置状态为取消状态
node.waitStatus = Node.CANCELLED;
/*
* If we are the tail, remove ourselves.
* 1.如果当前节点是tail:
* 尝试更新tail节点,设置tail为pred;
* 更新失败则返回,成功则设置tail的后继节点为null
*/
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
/*
* 2.如果当前节点不是head的后继节点:
* 判断当前节点的前继节点的状态是否是SIGNAL,如果不是则尝试设置前继节点的状态为SIGNAL;
* 上面两个条件如果有一个返回true,则再判断前继节点的thread是否不为空;
* 若满足以上条件,则尝试设置当前节点的前继节点的后继节点为当前节点的后继节点,也就是相当于将当前节点从队列中删除
*/
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
// 3.如果是head的后继节点或者状态判断或设置失败,则唤醒当前节点的后继节点
unparkSuccessor(node);
}
node.next = node; // help GC
}}
该方法中的执行过程有些复杂,首先是要获取当前节点的前继节点,如果前继节点的状态不是取消状态(即
pred.waitStatus >0
),则向前遍历队列,直到遇到第一个waitStatus <=0
的节点,并把当前节点的前继节点设置为该节点,然后设置当前节点的状态为取消状态。
接下来的工作可以分为 3 种情况:当前节点是 tail;
- 当前节点不是 head 的后继节点(即队列的第一个节点,不包括 head),也不是 tail;
- 当前节点是 head 的后继节点。
当前节点是 tail
这种情况很简单,因为 tail 是队列的最后一个节点,如果该节点需要取消,则直接把该节点的前继节点的 next 指向 null,也就是把当前节点移除队列。出队的过程如下:
当前节点不是 head 的后继节点,也不是 tail
这里将 node 的前继节点的 next 指向了 node 的后继节点,真正执行的代码就是下面这一行:compareAndSetNext(pred, predNext, next);
当前节点是 head 的后继节点
这里直接 unpark 后继节点的线程,然后将 next 指向了自己。
这里可能有疑问,既然要删除节点,为什么都没有对 prev 进行操作,而仅仅是修改了 next?
要明确的一点是,这里修改指针的操作都是 CAS ,在 AQS 中使用以compareAndSet
开头的方法都是尝试更新,并不保证成功,图中所示的都是执行成功的情况。
那么在执行cancleAcquire
方法时,当前节点的前继节点有可能已经执行完并移除队列了(参见 setHead方法),所以在这里只能用 CAS 来尝试更新,而就算是尝试更新,也只能更新 next,不能更新 prev。因为 prev 是不确定的,否则有可能会导致整个队列的不完整,例如把 prev 指向一个已经移除队列的 node。
什么时候更新 prev 呢?其实 prev 是由其它线程来修改的。回去看下shouldParkAfterFailedAcquire
方法,该方法由这样一段代码:
这段代码的作用就是通过 prev 遍历到第一个不是取消状态的 node,并修改 prev。do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
这里为什么能更新 prev?因为shouldParkAfterFailedAcquire
方法是在获取锁失败的情况下才能执行,因此进入该方法时,说明已经有线程获得锁了,并且在执行该方法时,当前节点之前的节点不会变化(因为只有但下一个节点获得锁的时候才会设置 head),所以这里可以更新 prev,而且不用 CAS 来更新。AQS 释放独占锁的实现
通过 unlock 方法来实现:
该方法调用了 release 方法,release 是在 AQS 中定义的:public void unlock() {
sync.release(1);}
这里首先尝试释放锁,成功了之后要去唤醒后继节点的线程,这里其它的线程才有机会去执行。public final boolean release(int arg) {
// 尝试释放锁
if (tryRelease(arg)) {
// 释放成功后unpark后继节点的线程
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;}
tryRelease 代码如下:
是不是和 tryAcquire 方法类似?该方法也需要被重写,在 Sync 类中的代码如下:protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();}
当前线程被释放后,需要唤醒下一个节点的线程,通过 unparkSuccessor 方法实现:protected final boolean tryRelease(int releases) {
// 这里是将锁的数量减1
int c = getState() - releases;
// 如果释放的线程和获取锁的线程不是同一个,抛出非法监视器状态异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 由于重入的关系,不是每次释放锁c都等于0,
// 直到最后一次释放锁时,才会把当前线程释放
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
// 记录锁的数量
setState(c);
return free;}
主要功能就是要唤醒下一个线程,这里private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);}
s == null || s.waitStatus > 0
判断后继节点是否为空或者是否是取消状态,然后从队列尾部向前遍历找到最前面的一个 waitStatus 小于 0 的节点,至于为什么从尾部开始向前遍历,回想一下 cancelAcquire 方法的处理流程,cancelAcquire 只是设置了 next 的变化,没有设置 prev 的变化,在最后有这样一行代码:node.next = node
,如果这时执行了 unparkSuccessor 方法,并且向后遍历的话,就成死循环了,所以这时只有 prev 是最稳定的。
到这里,通过 ReentrantLock 的 lock 和 unlock 来分析 AQS 独占锁的实现已基本完成了,但 ReentrantLock 还有一个非公平锁 NonfairSync。
其实 NonfairSync 和 FairSync 主要区别在于获取锁的方式,公平锁是按顺序获取,而非公平锁是抢占式获取,lock 的时候先去尝试修改 state 变量,如果抢占成功,则获取到锁:
非公平锁的 tryAcuire 方法调用了 nonfairTryAcquire 方法:final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);}
该方法比公平锁的 tryAcquire 方法在第二个 if 判断中少了一个是否存在前继节点判断,FaireSync 中的 tryAcquire 代码中的这个 if 语句块如下:final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;}
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;}
总结
本文从 ReentrantLock 出发,比较完整地分析了 AQS 内部独占锁的实现,总体来说实现的思路十分清晰,使用了标志位+队列的方式来处理锁的状态,包括锁的获取,锁的竞争以及锁的释放。
在 AQS 中,state 可以表示锁的数量,也可以表示其它状态,state 的含义由子类去定义,自己只是提供了对 state 的维护。AQS 通过 state 来实现线程对资源的访问控制,而 state 具体的含义要在子类中定义。
AQS 在队列的维护上的实现比较复杂,尤其是节点取消时队列的维护,这里并不是通过一个线程来完成的。同时,AQS 中大量地使用 CAS 来实现更新,这种更新能保证状态和队列的完整性。