AQS即AbstractQueuedSynchronizer,队列同步器,我们所熟悉的ReentrantLock等并发框架中都有它的身影。如其类注释所言,This class is designed to be a useful basis for most kinds of synchronizers that rely on a single atomic value to represent state,该类被设计为大多数同步类的基础。了解了它的实现,Java并发包下大部分的组件的理解过程将事半功倍。
类的注释
首先我们可以从类的注释中,了解很多AQS的实现思想。了解这些思想可以帮助我们在阅读源码时更好的理解类设计的用意以及其子类的正确使用方式。
Provides a framework for implementing blocking locks and related synchronizers (semaphores, events, etc) that rely on first-in-first-out (FIFO) wait queues。
AQS是实现阻塞队列,同步器,信号量等组件的基础框架,基于一个FIFO队列实现。
Subclasses should be defined as non-public internal helper classes that are used to implement the synchronization properties of their enclosing class。 子类应该被定义为非公有的内部类,用来帮助其封闭类实现同步属性。这句话翻译着有些拗口,看过实际使用的应该可以理解。例如同步锁ReentrantLock的同步属性就是依靠其内部类Sync实现的,而Sync正是AQS的子类。
To use this class as the basis of a synchronizer, redefine the
- following methods, as applicable, by inspecting and/or modifying
- the synchronization state using {@link #getState}, {@link
setState} and/or {@link #compareAndSetState}:
*AQS提供了以下几个模板方法供使用者自定义同步器,通过对state状态的检查与修改(来调用这几个方法) 这段话给了我们一个鲜明的提示,提供给子类自定义的几个方法,访问的入口是getState和setState等几个方法
- {@link #tryAcquire}
- {@link #tryRelease}
- {@link #tryAcquireShared}
- {@link #tryReleaseShared}
- {@link #isHeldExclusively}
内部类 Node
AQS内部定义了一个Node类来保存阻塞线程的信息,并且Node类也是队列的元素组成。在队列中,一个Node实例表示一个等待的线程。其具体实现如下
static final class Node {
/** 标记节点正在共享模式中等待 */
static final Node SHARED = new Node();
/**标记节点正在独占模式中等待 */
static final Node EXCLUSIVE = null;
/** 表示线程已取消等待 */
static final int CANCELLED = 1;
/** 标记后继者线程正在等待当前线程取消或释放锁 */
static final int SIGNAL = -1;
/*
* 标记节点正在等待Condition,其它线程唤醒condition后,
* 该节点进入同步状态等待
*/
static final int CONDITION = -2;
/**
* 下一个同步状态将会无条件传播
*/
static final int PROPAGATE = -3;
/**
* 当前节点封装的线程的等待状态,取值就是上边的4个int常量,初始为0 * 大于0表示取消状态,小于0表示有效状态 */
volatile int waitStatus;
/**
* 前驱节点,当前节点用来检查waitStatus状态的节点。入队列时分配 */
volatile Node prev;
/**
* 当前节点状态释放后的后继节点 */
volatile Node next;
/**
* 当前节点的排队线程,节点构造时初始化 */
volatile Thread thread;
/**
* 下一个等待的节点 */
Node nextWaiter;
/**
* 是否共享 */
final boolean isShared()
{
return nextWaiter == SHARED;
}
/** * 获取前驱节点 */
final Node predecessor() throws NullPointerException
{
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
Node之外,AQS类还定义了以下关键属性:
/**
* 等待队列的头节点,除了初始化之外只在调用setHead方法时被修改 */
private transient volatile Node head;
/** * 队列的尾节点,懒加载。只在调用enq方法时被修改 */
private transient volatile Node tail;
/**
* The synchronization state. */
private volatile int state;
这几个属性加上Node节点就组成了AQS很多方法的基本实现
protected final boolean compareAndSetState(int expect, int update)
{
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
compareAndSetState 方法是一个典型的CAS乐观锁做原子更新的实现,调用的是Unsafe类的方法。Unsafe类—Java做内存和线程等底层操作的后门,方法都是native的本地方法,在此不做过多介绍。
入列出列
addWaiter方法
addWaiter先尝试直接设置尾节点,失败后再调用enq方法
/**
* Creates and enqueues node for current thread and given mode.
* * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node */
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
//通过cas设置当前的node节点为尾节点
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//cas设置尾节点失败则调用enq方法
enq(node);
return node;
}
enq方法
/**
* The number of nanoseconds for which it is faster to spin
* rather than to use timed park. A rough estimate suffices
* to improve responsiveness with very short timeouts. */
static final long spinForTimeoutThreshold = 1000L;
/** * 队列尾部插入节点的方法
* 取出尾部节点,如果为null则将节点设置为尾部节点,如果不为null,
* 则当前节点的前驱节点为尾节点,当前节点成为新的尾节点 */
private Node enq(final Node node) {
//通过一个无限循环的cas来设置node为尾节点
for (;;) {
Node t = tail;
//尾节点不存在,设置一个新节点为头节点和尾节点,然后下一个循环再设置尾节点
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
//CAS设置尾节点直到成功
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
如上代码所示,队列在插入新节点时,也使用了CAS的方式,以compareAndSetTail方法为例
/**
* CAS tail field. Used only by enq. */
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
tailOffset为AQS类初始化时就定义好的地址偏移量,简单的说就是tail节点在整个类内存空间中相对于起始内存地址的偏移量,也就是这个保存tail这个变量的内存地址的起点,和tail对象具体是什么没有多大的联系。
unparkSuccessor方法
unparkSuccessor方法用于调用来唤醒入参node的下一个等待线程
首先获取当前节点node的waitStatus,并将其状态释放(值设置为0)
如果node的next是null,则从尾部节点tail从后向前遍历,找到一个状态小于0的正在等待的节点,作为下一个唤醒的节点,唤醒是由LockSupport.unpark方法实现的。LockSupport也是Unsafe的一个简单的封装,unpark调用的也是Unsafe的native方法unpark()
/**
* 唤醒下一个等待线程 */
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread. */
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor. */
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
资源获取
acquire方法
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
此方法是独占模式下获取资源的入口
首先调用了tryAcquire方法,尝试直接获取资源。此方法在AQS中没有给出实现,而是把实现交给了子类(模板方法模式)
addWaiter方法则是将线程以独占模式加入到等待队列的最末尾
acquireQueued方法,在节点被加入到等待队列的最末尾之后,使其一直等待
acquireQueued方法
/**
* 对于已经在队列中的线程以独占模式不间断的获取资源
*
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;//等待过程中是否中断过
for (;;) {//自旋获取
final Node p = node.predecessor();//获取前驱节点
//如果前驱节点是队列首节点,表示当前节点在队列的最前面
//那就可以获取资源了
if (p == head && tryAcquire(arg)) {
setHead(node);//获取后将节点设为首节点
// 原首节点的后置节点为null,则表示该节点
//已经从队列中脱离。可以被回收了
p.next = null;
failed = false;
return interrupted;//成功获取资源,结束自旋。
}
//shouldParkAfterFailedAcquire判断线程是否需要挂起
//shouldParkAfterFailedAcquire源码解读见下一段
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
获取锁的流程图总结如下(图片来自网络)
**
资源释放
cancelAcquire方法
取消正在进行的获取资源的尝试
/**
* Cancels an ongoing attempt to acquire.
*/
private void cancelAcquire(Node node) {
if (node == null)
return;
node.thread = null;
//如果前驱节点不是等待状态,则将node的前驱节点设置成前驱节点的前驱节点
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
Node predNext = pred.next;
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
//node的状态也设置成取消
node.waitStatus = Node.CANCELLED;
// 如果是尾节点,把node从队列里移出去
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
**
release方法
release方法释放当前占用的锁(如果是已重入的,需要多次释放)
public final boolean release(int arg) {
if (tryRelease(arg)) {
//调用tryRelease方法成功后,寻找头节点
Node h = head;
//如果头节点存在且无等待状态
if (h != null && h.waitStatus != 0)
//唤醒h后面的节点
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
//获取当前状态与入参的差值
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//差值为0则表明锁被完全释放
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
//将差值设置为最新的status值
setState(c);
return free;
}
private void unparkSuccessor(Node node) {
//将当前节点的状态修改为0
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
//从队列里找出下一个需要唤醒的节点
//首先是直接后继
Node s = node.next;
//如果直接后继为空或者它的waitStatus大于0(已经放弃获取锁了),我们就遍历整个队列,
//获取第一个需要唤醒的节点
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
//将节点唤醒
LockSupport.unpark(s.thread);
}