这篇文章来看下AQS的源码,是前面很多同步工具类的一个核心,都是基于AQS实现的。 本文基于 JDK 1.8 (JDK 1.9 中会有个 addWaiter 里的 VarHandle 改进)
简介
AQS,全名 AbstractQueuedSynchronizer ,AQS队列又称CLH队列,其核心在于一个属性 state 和 一个双向队列。Java中的大部分同步类(Lock、Semaphore、ReentrantLock等)都是基于 AbstractQueuedSynchronizer 实现的。AQS是一种提供了原子式管理同步状态、阻塞和唤醒线程功能以及队列模型的简单框架。

从ReentrantLock看AQS
还记得我们提到过的新同步工具类 ReentrantLock 吗,它以其可重入锁以及其它附带的强大的功能,如 tryLock() 和 lockInterruptibly(),公平锁等出现在我们眼前。我可能也提到过 ReentrantLock 的加锁实现原理是 CAS,那你知道它具体是怎么实现的吗?
public int put(){try {lock.lock();//判断如果满了就不再生产while (count == length) {producer.await();}count++;//唤醒消费者consumer.signalAll();}catch (InterruptedException e){e.printStackTrace();}finally {lock.unlock();}return count;}
先上一段代码,上面这个是生产者消费者例子模拟的put方法,这里使用了 ReentrantLock 实现同步。
public class ReentrantLock implements Lock, java.io.Serializable {private final Sync sync;public void lock() {sync.lock();}//......}
当程序执行到 lock.lock() 时会跑入 ReentrantLock 的 lock() 方法,这个方法调用了 sync.lock()。Reentrant 内有一个 Sync 类常量,使用的就是这个类的 lock() 方法。
public class ReentrantLock implements Lock, java.io.Serializable {abstract static class Sync extends AbstractQueuedSynchronizer {private static final long serialVersionUID = -5179523762034025860L;/*** Performs {@link Lock#lock}. The main reason for subclassing* is to allow fast path for nonfair version.*/abstract void lock();//......}/*** Sync object for non-fair locks*/static final class NonfairSync extends Sync {private static final long serialVersionUID = 7316153563782823691L;/*** Performs lock. Try immediate barge, backing up to normal* acquire on failure.*/final void lock() {//使用CAS操作对state进行赋值为1,如果成功,则获得该锁,当前线程获得执行权if (compareAndSetState(0, 1))setExclusiveOwnerThread(Thread.currentThread());else//CAS操作失败,则调用acquire()方法acquire(1);}protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);}}/*** Sync object for fair locks*/static final class FairSync extends Sync {private static final long serialVersionUID = -3000897897090466540L;final void lock() {//直接调用acquire()方法acquire(1);}/*** Fair version of tryAcquire. Don't grant access unless* recursive call or no waiters or is first.*/protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}}//......}
这个 Sync 类是一个内部类,lock() 方法是个抽象方法,同时 ReentrantLock 提供了两个该类的子类 NonfairSync 和 FairSync 实现了该方法,即不公平锁和公平锁。
我们前面也提到过,ReentrantLock 的默认实现是不公平锁,我们可以从 ReentrantLock 的构造器看出来当 fair 为 true 时会构造 FairSync 给到我们那个 Sync 属性,否则就是 NonfairSync 。
先看默认实现的不公平锁 NonfairSync,可以看到,其实现的 lock() 方法里加锁的操作是 compareAndSetState(0, 1),即我们熟悉的 CAS 操作。这里是对 AQS 内部的一个属性 state 进行赋值1操作,如果成功了,即 state 从0变成1,即当前线程获得锁成功,然后执行 setExclusiveOwnerThread(Thread.``_currentThread()_``) 方法占据执行权。如果 CAS 操作失败,则该锁已经被其他线程获取了,就执行 acquire(1) 方法。
对于公平锁来说,则会直接执行 acquire(1) 方法
//AQSpublic final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}//NonfairSyncprotected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);}//NonfairSyncfinal boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();//判断 state 是否为0,即是否没被获得锁if (c == 0) {//进行CAS操作if (compareAndSetState(0, acquires)) {//成功,设置锁的OwnersetExclusiveOwnerThread(current);return true;}}//state 不为0,已被占用,判断是不是自己的锁else if (current == getExclusiveOwnerThread()) {//如果是,重入int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}
当 CAS 操作失败时,即调用父父类 AQS 的 acquire() 方法,该方法里又执行了 tryAcquire() 方法,该方法被 NonfairSync 实现了,其调用了该类的 nonfairTryAcquire() 。该方法里会再次进行一次 CAS 操作,如果 state 以被占用了,则判断是不是自己的,如果是就重入,不是就获取失败。
实际上,如果第一次获取锁失败,会直接跳到 acquireQueued() 方法,因为 tryAcquire() 方法实际上是对拿到锁之后的操作。
/*** acquireQueued(addWaiter(Node.EXCLUSIVE), arg)* static final Node EXCLUSIVE = null;* AQS*/private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failure//这里实际上是把当前线程保存到Node中,然后把Node加入到AQS的队尾Node pred = tail;if (pred != null) {node.prev = pred;//仍然使用的CAS操作添加if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}enq(node);return node;}/*** 该方法也是将结点插入到队尾中*/private Node enq(final Node node) {for (;;) {Node t = tail;if (t == null) { // Must initializeif (compareAndSetHead(new Node()))tail = head;} else {node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}}//AQSfinal boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {//获取该结点前一个结点final Node p = node.predecessor();//如果是头结点,进行获得锁的尝试if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return interrupted;}//如果不是头结点,检查和更新结点的状态if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}
上面代码的流程是,调用 addWaiter() 将当前结点以排他锁的形式添加进队列中,然后循环执行检查队列中的结点的状态,并且给对头尝试 CAS 获得锁。这是一个死循环,只有当当前结点获得锁了才会退出。
到了这里,我们大概知道了 ReentrantLock 的一些实现原理了,也看到了我们的主角 AQS 的身影。最主要的就是 Sync 类继承自 AbstractQueuedSynchronizer 类,而所有主要的线程竞争等核心方法都在该类里面。还有就是前面提到的一个 state 属性,在各种子类中,这个 state 被用于各种不同的用途,而之前说的各种同步工具也都是自己实现了一套关于 state 的操作。
AQS
AQS核心思想是,如果被请求的共享资源空闲,那么就将当前请求资源的线程设置为有效的工作线程,将共享资源设置为锁定状态;如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是CLH队列的变体实现的,将暂时获取不到锁的线程加入到队列中。
CLH:Craig、Landin and Hagersten队列,是单向链表,AQS中的队列是CLH变体的虚拟双向队列(FIFO),AQS是通过将每条请求共享资源的线程封装成一个节点来实现锁的分配。
重要属性
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizerimplements java.io.Serializable {static final class Node {//共享锁static final Node SHARED = new Node();//排他锁static final Node EXCLUSIVE = null;//表示线程获取锁的请求已经取消了static final int CANCELLED = 1;//表示线程已经准备好了,就等资源释放了static final int SIGNAL = -1;//表示节点在等待队列中,节点线程等待唤醒static final int CONDITION = -2;//当前线程处在SHARED情况下,该字段才会使用static final int PROPAGATE = -3;//当前线程的状态,取上面的那些值volatile int waitStatus;//前一个结点的引用volatile Node prev;//后一个结点的引用volatile Node next;//线程volatile Thread thread;//指向下一个处于CONDITION状态的节点Node nextWaiter;//......}//对头private transient volatile Node head;//队尾private transient volatile Node tail;private volatile int state;//......}

这个图也可以看出 AQS 的一个数据结构,其实现的是一个双向队列,Node 就是上面的每个结点,结点内保存的是线程 thread。
此外,AQS 维护了一个 volatile 修饰的 int 类型成员变量 state,用于表示同步状态,内部的对线程的加锁等判断都是通过 CAS 操作完成对 state 值的修改。
State
AQS 中的 state 代表的是同步状态,但具体的含义在不同的子类中按照不同的实现赋予了不同的用途,譬如 ReentrantLock 中,state 为0时表示锁空闲,为1时表示锁占用,2、3、4等值时表示锁重入的次数。
protected final int getState() {return state;}protected final void setState(int newState) {state = newState;}protected final boolean compareAndSetState(int expect, int update) {// See below for intrinsics setup to support thisreturn unsafe.compareAndSwapInt(this, stateOffset, expect, update);}
关于 state 的一些方法都很简单,主要的就是这个 compareAndSetState() 方法,使用了 CAS 的操作来给 state 赋值。
此外这些方法都是final的,无法重写它们。
获取资源和释放资源
/*** 排他锁模式尝试获得锁方法 arg为操作次数*/protected boolean tryAcquire(int arg) {throw new UnsupportedOperationException();}/*** 排他锁模式尝试释放锁方法 arg为操作次数*/protected boolean tryRelease(int arg) {throw new UnsupportedOperationException();}/*** 共享锁模式尝试获得锁方法 arg为操作次数*/protected int tryAcquireShared(int arg) {throw new UnsupportedOperationException();}/*** 共享锁模式尝试释放锁方法 arg为操作次数*/protected boolean tryReleaseShared(int arg) {throw new UnsupportedOperationException();}/*** 该线程是否正在独占资源。只有用到Condition才需要去实现它。*/protected boolean isHeldExclusively() {throw new UnsupportedOperationException();}
上面这些方法都是钩子方法,提供对自定义同步器实现的方法。自定义同步器的方法可以通过实现相关的方法来修改 state 字段来实现自己的多线程模式。
/*** 排他锁获得锁,忽略中断*/public final void acquire(int arg) {}/*** 排他锁获得锁,中断即中止*/public final void acquireInterruptibly(int arg) {}/*** 排他锁尝试获得锁,中断即中止,如果给定超时超时即失败*/public final boolean tryAcquireNanos(int arg, long nanosTimeout) {}/*** 排他锁释放锁*/public final boolean release(int arg) {}/*** 共享锁获得锁,忽略中断*/public final void acquireShared(int arg) {}/*** 共享锁获得锁,中断即中止*/public final void acquireSharedInterruptibly(int arg) {}/*** 共享锁尝试获得锁,中断即中止,如果给定超时超时即失败*/public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) {}/*** 共享释放锁*/public final boolean releaseShared(int arg) {}
上面是一些和获取锁资源以及释放锁资源相关的主要API,也是对外开放操作的主要API。这部分方法也是不可重写的,保证了 AQS 的正常运作。
AQS里的设计模式
public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}protected boolean tryAcquire(int arg) {throw new UnsupportedOperationException();}
我们回头来看看这两个方法,上面 ReentrantLock 里我们在不公平锁和公平锁的 lock() 方法里都有发现这个 acquirFe() 方法,而这个方法来自父类 AQS,但是其调用的 tryAcquire() 方法却只是抛出了个 UnsupportedOperationException异常,是不是很奇怪?以及上面提到的多个需要实现的钩子方法都是抛出异常,钩子方法又是什么呢?
但其实你可以发现在我们的子类 NonfairSync和 FairSync里其实重写了这个 tryAcquire() 方法。在 AQS 中还有很多譬如 tryRelease() 、 isHeldExclusively() 等方法都是直接抛出个异常的,这种抛出个异常其实是说明不支持直接使用 AQS 的这些方法,需要重写。而这种做法就是一种设计模式,叫做模板方法模式。在抽象父类中定义了一个模板方法,譬如这里的 acquire() ,然后定义一些钩子方法,这些方法可以是抽象方法,也可以是有实现的方法。子类想要使用这个模板方法的时候,必须重写那些钩子方法,来实现自己的个性化的功能逻辑。关于模板方法模式在这里不多说。
AQS性能
AQS 在上面 ReentrantLock 里也分析了加锁的一整个流程是怎么样的,不难发现,AQS 加锁全都是围绕双向队列和state属性来操作的,而且效率很高。为什么呢?你可以从上面的代码中看到,AQS 其实使用了大量的 CAS 操作,其核心都是围绕 CAS 操作来运作的,主要在于两个方面:
- 对 state 的属性进行操作时,使用了 CAS 的方法进行判断锁的状态以及尝试获得锁,比起去操作系统申请锁效率更高;
- 对内部的双向队列进行维护的时候,不是监控整个队列,而是直接通过对比队尾这一个结点,仍然采用 CAS 操作,对队尾进行设置,省去了监控整个队列线程安全时需要消耗的加锁性能。
小结
Q:某个线程获取锁失败的后续流程是什么呢?
A:存在某种排队等候机制,线程继续等待,仍然保留获取锁的可能,获取锁流程仍在继续。
Q:既然说到了排队等候机制,那么就一定会有某种队列形成,这样的队列是什么数据结构呢?
A:是CLH变体的FIFO双端队列。
Q:处于排队等候机制中的线程,什么时候可以有机会获取锁呢?
A:当该结点是头结点且未被阻塞的时候
Q:如果处于排队等候机制中的线程一直无法获取锁,需要一直等待么?还是有别的策略来解决这一问题?
A:线程所在节点的状态会变成取消状态,取消状态的节点会从队列中释放
Q:Lock函数通过Acquire方法进行加锁,但是具体是如何加锁的呢?
A:AQS的Acquire会调用tryAcquire方法,tryAcquire由各个自定义同步器实现,通过tryAcquire完成加锁过程。
