1、AQS基本概念
1.1 什么是AQS
AQS全称是AbstractQueuedSynchronizer,翻译过来就是抽象队列同步器,JUC下的一些类,比如Lock接口的实现类ReentrantLock、ReadWriteLock接口的实现类ReentrantReadWriteLock类都是基于AQS实现的,具体就是这些类的源码中加锁或者解锁的方法都是调用的AQS相关的方法实现的。
AbstractQueuedSynchronizer是JUC包下的抽象类,以ReentrantLock为例说明AQS和ReentrantLock之间的关系。ReentrantLock类中有一个抽象类Sync,该抽象类继承了AbstractQueuedSynchronizer,ReentrantLock类中还有两个实现类:NonfairSync和FairSync,这两个类都是继承了抽象类Sync,ReentrantLock类对外提供的lock和unlock等方法,底层都是调用抽象类Sync的两个实现类的实例方法完成的。如下图所示:
1.2 AQS类中的重要属性
1.2.1 AQS类重要方法、内部类、属性
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
protected AbstractQueuedSynchronizer() {}
// 在队列中阻塞状态的线程会被包装成一个Node实例
static final class Node {...}
// 被抢占锁的状态,线程刚获取锁时,state从0变成1;线程释放锁时,state从1变成0;线程重入锁时,state值+1
private volatile int state;
// 将Node实例加入队列尾部
private Node enq(final Node node) {...}
// 唤醒队列中可以尝试获取锁的线程
private void unparkSuccessor(Node node) {...}
// 尝试释放独占锁,同上
protected boolean tryRelease(int arg) {...}
// 此方法是独占模式下线程获取共享资源的顶层入口,如果获取到资源,线程直接返回,否则进入等待队列,直到获取到资源为止
public final void acquire(int arg) {...}
// 尝试获取独占锁,由继承了AQS的实现类去定义具体的获取锁的过程
protected boolean tryAcquire(int arg) {...}
// 节点入队进入等待阻塞状态,过程中自旋尝试获取锁
final boolean acquireQueued(final Node node, int arg) {...}
// 将当前线程加入到等待队列的队尾,并返回当前线程所在的结点
private Node addWaiter(Node mode) {...}
}
此外,由于AQS类继承了AbstractOwnableSynchronizer抽象类,该类中有一个属性exclusiveOwnerThread,保存当前获得锁的线程。
1.2.2 Node类
Node类是AQS类中的一个内部类,尝试获取锁失败的线程会被放入到一个双向链表中,这些线程会被包装成Node类的实例,Node类源码简要如下:
static final class Node {
// 代表对锁的共享模式
static final Node SHARED = new Node();
// 代表对锁的独占模式
static final Node EXCLUSIVE = null;
// 表示当前结点已取消调度。当timeout或被中断(响应中断的情况下),会触发变更为此状态,进入该状态后的结点将不会再变化。
static final int CANCELLED = 1;
// 表示后继结点在等待前置节点唤醒,只要前置节点释放锁,就会通知标识为 SIGNAL 状态的后续节点的线程
static final int SIGNAL = -1;
// 表示结点等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。
static final int CONDITION = -2;
// 共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点。
static final int PROPAGATE = -3;
// 当前节点的等待状态,初始值为0
volatile int waitStatus;
// 双向链表的上一个节点
volatile Node prev;
// 双向链表的下一个节点
volatile Node next;
// 被包装的线程
volatile Thread thread;
// 表示当前节点对锁的模式
Node nextWaiter;
...
}
1.3 AQS整体过程(互斥锁)
以线程1先获取到锁,线程2后尝试获取锁失败为例:
- 未有线程获取锁时,state值为0;
- 线程1调用lock方法尝试获取锁,这个加锁的过程,是用CAS操作将state值从0变为1;
- 线程1加锁成功,将当前获取锁的线程(exclusiveOwnerThread)设置成自己;
- 此时线程2尝试获取锁,查看state是1,说明已经有线程获取到了锁,此时线程2会先检查是不是自己之前加的锁,如果是则直接重入锁;如果不是,则线程2获取锁失败,此时线程2会被包装成一个Node实例放入等待队列(双向链表实现);
- 线程1执行完同步代码后释放锁,将state值减1,如果state值为0,则代表线程1彻底释放掉锁(会有重入锁的情况);
- 从等待队列中唤醒线程2;
- 线程2重复2、3步骤获取锁。
2、AQS源码浅析
AQS定义两种资源共享方式:
- Exclusive:独占模式,同一时刻只能有一个线程获取到锁,例如ReentrantLock实现的就是独占式的锁资源;
- Share:共享模式,允许同一时刻有多个线程获取到锁,比如ReentrantWriteLock和CountDownLatch等就是实现的这种模式。
实现自定义同步器在继承AQS抽象类时,不需要实现线程如何在等待队列中的唤醒和等待操作,仅需实现共享资源的获取和释放接口即可,具体实现AQS抽象类中以下几个被protected访问符修饰的方法:
// 该线程是否正在独占资源
isHeldExclusively()
// 独占方式。尝试获取资源,成功则返回true,失败则返回false。
tryAcquire(int)
// 独占方式。尝试释放资源,成功则返回true,失败则返回false。
tryRelease(int)
// 共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
tryAcquireShared(int)
// 共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。
tryReleaseShared(int)
一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一组即可。但AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。
在AQS类中维护线程阻塞和线程唤醒的逻辑或者方法中,会调用上面这些在AQS类中声明成protected的方法,这些protected方法需要在实现的同步器(同步器继承自AQS类)中进行实现,这样这些继承了AQS类的同步器里,线程阻塞和线程唤醒的方法中调用的protected的方法,就是同步器自身实现的逻辑。这样就把线程阻塞线程唤醒这些公共的逻辑抽成一个模板方法放在AQS抽象类里,具体如何定义线程获取锁成功与否,由具体继承了AQS的实现类区定义,模板方法中调用同步器自定义的逻辑,设计很巧妙。
2.1 独占模式加锁
2.1.1 acquire
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
此方法是独占模式下,如果线程通过CAS操作没有获取到锁对象时,会调用此方法,在ReentrantLock类中的NonfairSync内部类的lock方法的实现中可以找到此方法的调用,可以看到acquire方法中依次调用了以下4个方法,分别如下:
- tryAcquire:尝试直接去获取资源,如果成功则acquire方法直接返回,不走if语句;
- addWaiter:将当前线程加入等待队列的尾部,并标记为独占模式;
- acquireQueued:使线程阻塞在等待队列中尝试获取资源,直到获取到资源才返回。如果在整个等待过程中被中断过,则返回true,否则返回false;
selfInterrupt:底层实现是Thread类的Interrupt方法,使当前线程中断。
2.1.2 tryAcquire
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
该方法就是在第2节开头说过的,需要继承AQS抽象类的实现类去实现的方法,比如在ReentrantLock类中的NonfairSync内部类就实现了该方法。
2.1.3 addWaiter
private Node addWaiter(Node mode) {
// 通过入参的锁模式来构造一个Node实例,用来加入到双向链表的尾部,可以看到将当前线程也作为了构造函数的入参
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
// compareAndSetTail方法是通过CAS机制将当前线程的Node实例放入到链表尾部,
// 底层是调用unSafe类的实例方法compareAndSwapObject,该方法是一个native方法
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 如果当前链表没有尾结点,会调用enq方法将node节点入队
enq(node);
return node;
}
2.1.4 enq
private Node enq(final Node node) {
// for循环是一个死循环,其实就是自旋操作的实现
for (;;) {
Node t = tail;
if (t == null) {
// 如果t为空,创建一个头结点,且头结点没有对应具体线程
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 如果t非空,将node放入队列尾部,node节点的前置节点是t节点
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
// 返回node节点的前置节点
return t;
}
}
}
}
此方法是当链表的尾结点tail为空时,通过此方法将当前阻塞线程入队(双向链表)。
2.1.5 acquireQueued
final boolean acquireQueued(final Node node, int arg) {
// 是否获取资源的标记位,为false代表获取到共享资源(锁)
boolean failed = true;
try {
// 标记等待过程中是否被中断过,如果被中断过则为true
boolean interrupted = false;
// for循环是一个死循环,模拟自旋操作尝试获取锁
for (;;) {
// 获取node节点的前驱节点(队列是一个双向链表)
final Node p = node.predecessor();
// 如果前驱节点是头结点(头结点不对应线程,此时意味着node节点就是第一个应被唤醒的线程节点)并且当前线程尝试获取锁成功
if (p == head && tryAcquire(arg)) {
// 拿到资源后,将head指向该结点,即将node节点的pre指针和next指针均置空,意味着node节点对应的线程获取到锁后要出队了
setHead(node);
// 方便GC回收以前的head结点
p.next = null;
// 表示成功获取到资源,对应的标志位置为false
failed = false;
return interrupted;
}
// 如果当前线程在自旋过程中获取锁失败,就进入等待阻塞状态,如果线程被中断,则将标记位interrupted置为true
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 如果等待过程中没有成功获取资源(如timeout,或者可中断的情况下被中断了),那么取消结点在队列中的等待。
if (failed)
// 更新node节点的waitStatus为CANCELLED
cancelAcquire(node);
}
}
addWaiter方法是将node节点插入到双向链表尾部,而acquireQueued方法则维护了node节点在队列中的操作,比如不断地自旋尝试获取锁,何时唤醒队列中阻塞线程继续尝试获取锁。
2.1.6 shouldParkAfterFailedAcquire
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取当前node节点的前驱节点的waitStatus
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// 如果前驱节点的waitStatus为-1,代表当前节点就等待前驱节点的线程释放掉锁后,就会唤醒当前node节点对应的线程,直接返回true等就可以了
return true;
if (ws > 0) {
// 如果前驱节点的waitStatus为CANCELLED,那就顺着双向链表一直往前找,直到找到一个正常等待的状态的节点,并将node节点排在它的后边
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 如果顺着链表往前找的过程中,前驱节点状态不是CANCELLED,就通过CAS把前驱节点的状态设置成SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
正如方法名所示,该方法是在自旋过程中如果线程获取锁失败,会根据当前节点的前驱节点的waitStatus,来将当前节点设置为可能下一个就是唤醒他的状态。
2.1.7 parkAndCheckInterrupt
private final boolean parkAndCheckInterrupt() {
// 调用unsafe类中的park方法(native方法)使当前线程进入阻塞等待状态
LockSupport.park(this);
// 返回当前线程是否被中断
return Thread.interrupted();
}
正如方法名所示,该方法将当前线程进入阻塞等待状态,并返回当前线程是否被中断的结果。
至此独占模式的加锁源码解析完毕。2.2 独占模式解锁
2.2.1 release
public final boolean release(int arg) {
// 如果尝试释放资源成功,返回true
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
// 当前线程释放掉了锁,唤醒等待队列中的下一个可以尝试获取锁的线程。
unparkSuccessor(h);
return true;
}
// 如果尝试释放资源失败,返回false
return false;
}
ReentrantLock类的unlock解锁方法,底层就是AQS中的release方法实现。
2.2.2 tryRelease
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
该方法尝试释放资源,也是由继承AQS抽象类的实现类去实现的,在ReentrantLock就有具体实现。
2.2.3 unparkSuccessor
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
2.3 共享模式加锁
2.3.1 acquireShared
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
该法是共享模式下线程获取共享资源的顶层入口。它会获取指定量的资源,获取成功则直接返回,获取失败则进入等待队列,直到获取到资源为止,整个过程忽略中断。这里tryAcquireShared()依然需要自定义同步器去实现,但是AQS已经把其返回值的语义定义好了:负值代表获取失败;0代表获取成功,但没有剩余资源;正数表示获取成功,还有剩余资源,其他线程还可以去获取。所以这里acquireShared()的流程就是:
- tryAcquireShared()尝试获取资源,成功则直接返回;
- 失败则通过doAcquireShared()进入等待队列,直到获取到资源才返回。
2.3.2 tryAcquireShared
该方法也是由继承AQS的实现类实现,比如CountDownLatch类中就有该方法的实现。protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); }
2.3.3 doAcquireShared
private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
2.4 共享模式解锁
2.4.1 releaseShared
2.4.2 doReleaseShared
3、总结
如果我来大致介绍AQS,我会介绍以下几点:
- AQS是一个同步器的抽象类,JUC包下的类(比如ReentrantLock),内部类就是继承自AQS;我们如果想基于AQS的框架去实现自定义的同步器,可以令同步器继承AQS抽象类,并重点实现AQS抽象类中的protected方法;
- AQS内部维护了竞争锁失败的线程在阻塞队列中的维护及唤醒的逻辑,这部分逻辑抽象成模板方法放在在AQS抽象类中,在这些公共逻辑里也涉及protected方法的调用,子类同步器中的模板方法调用的也是子类同步器覆写的这些protected方法。通过这种方式将公共的模板方法抽象在AQS类,将如何判定获取锁成功还是失败的逻辑在子类的protected方法中实现,实现解耦;
- AQS维护的阻塞线程入队和唤醒的模型见1.3小节;
- 需要具体的同步器实现的protected方法见第二节开头。
参考
图文并茂详解AQS加锁
Java 并发高频面试题:聊聊你对 AQS 的理解?
Java并发之AQS详解