AQS简介
AQS的定义
AbstractQueuedSynchronizer简称AQS,它是Java中的锁的一个核心类,几乎所有Java层面的锁都是基于AQS设计的。AQS是一个用来构建锁和同步器的框架,使用AQS能简单且高效地构造出应用广泛的大量的同步器,比如:ReentrantLock、Semaphore、ReentrantReadWriteLock、SynchronousQueue、FutureTask等等皆是基于AQS的。当然,我们自己也能利用AQS非常轻松容易地构造出符合我们自己需求的同步器。下面则是AQS的方法架构图:
当有自定义同步器接入时,只需重写第一层所需要的部分方法即可,不需要关注底层具体的实现流程。当自定义同步器进行加锁或者解锁操作时,先经过第一层的API进入AQS内部方法,然后经过第二层进行锁的获取,接着对于获取锁失败的流程,进入第三层和第四层的等待队列处理,而这些处理方式均依赖于第五层的基础数据提供层。
AQS原理概述
AQS核心思想是:如果被请求的共享资源空闲,那么就将当前请求资源的线程设置为有效的工作线程,将共享资源设置为锁定状态;如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是CLH队列的变体实现的,将暂时获取不到锁的线程加入到队列中,如下图所示:
注意:
- CLH:Craig、Landin and Hagersten队列,是单向链表,AQS中的队列是CLH变体的虚拟双向队列,AQS是通过将每条请求共享资源的线程封装成一个节点来实现锁的分配
- 虚拟的双向队列即不存在队列实例对象,仅存在结点之间的关联关系
- AQS使用一个Volatile的int类型的成员变量来表示同步状态,通过内置的FIFO队列来完成资源获取的排队工作,通过CAS完成对State值的修改
AQS底层数据结构
AQS中包含两个内部类:Node类和ConditionObject类。
内部类 - Node类
首先了解一下它主要的属性:int waitStatus、Node prev、Node next、Thread thread、Node nextWaiter;主要的方法是:boolean isShared()、Node predecessor() 以及几个构造方法。
核心属性
waitSatus属性
被volatile修饰的属性waitSatus表示当前结点在CLH变体队列中的状态。而waitStatus有以下几个状态量:
prev属性
prev是一个被volatile修饰的属性,它表示的是CLH队列中当前结点的前一个Node结点。
next属性
thread属性
nextWaiter属性
AQS中的阻塞队列(即LCH)采用的是用双向链表保存,用prve和next相互链接;而AQS中条件队列(即下面将要提到的ConditionObject内部类的底层结构)是使用单向链表保存的,而这里的nextWaiter就是用来连接这个单向链表的。此外,还有两个属性,而这两个属性其实是nextWaiter的两个状态:
这两个属性是用来标识是独占模式还是共享模式,即对应AQS的独占锁和共享锁。
核心方法
构造方法
isShared方法
predecessor方法
内部类 - ConditionObject类
核心属性
firstWaiter和lastWaiter属性
可以看到ConditionObject实现了Condition接口,因此,之前的RenntrantLock的条件变量其实就是使用的ConditionObject对象。
可以看到,最主要的两个属性就是“Node firstWaiter”和“Node lastWaiter”,他们分别指向条件队列的头结点和尾结点,这个条件队列各个结点之间又使用了前面的Node类的nextWaiter
属性来关联,并且通过前面waitStatus的状态可知位于条件队列中的Node的waitStatus=-1,即对应“Node.CONDITION”。
核心方法
addConditionWaiter方法
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWailastWaiterter is cancelled, clean out.
//如果lastWaiter不处于Node.CONDITION
if (t != null && t.waitStatus != Node.CONDITION) {
//清空等待队列中状态不为-1(waitStatus=Node.CONDITION)
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
可以看到,该方法的作用其实就是把当前线程添加进等待队列里面去。
await方法
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//将当前线程添加进等待队列
Node node = addConditionWaiter();
//释放锁
int savedState = fullyRelease(node);
int interruptMode = 0;
//判断当前线程的结点是否在CLH中,
//如果没有放入,则阻塞,继续 while 循环(如果已经中断了,则退出)
while (!isOnSyncQueue(node)) {
//不再CLH中,则park当前线程
LockSupport.park(this);
////如果已经中断了,则退出
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//如果在LCH中,则会尝试获取锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
注意:
- 先只需要知道release方法是释放当前线程持有的锁即可,具体实现暂不展开。
- “如果在LCH中,则会尝试获取锁”这一块也暂时不展开
可见,调用 Condition 的 await() 方法会使当前线程进入等待队列并释放锁,线程状态变为等待状态。当前,还有几个其它await一样功能的方法,比如:awaitNanos(long nanosTimeout)、awaitUntil(Date deadline)、await(long time, TimeUnit unit),实现都是一样的,只是加了一些时间上的限制。形象流程图如下所示:
signal方法
public final void signal() {
//判断当前线程是否获取了锁,只有获得锁的线程才有资格调用该方法
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
- 先判断当前线程是否获取了锁;
- 然后对首节点调用 doSignal() 方法。
isHeldExclusively方法的作用是判断当前实现是否获得锁,由AQS的子类重写,这里暂不具体展开。
private void doSignal(Node first) {
do {
//等待队列的首结点修改为first.nextWaiter(即firstWaiter.nextWaiter)
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
- 修改首节点;
- 调用 transferForSignal() 方法将节点移动到同步队列。
final boolean transferForSignal(Node node) {
//利用CAS将结点状态变为0.即初始化状态
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//将该节点加入同步队列
Node p = enq(node);
int ws = p.waitStatus;
//如果结点p的状态为cancel 或者修改waitStatus失败,则直接unpark唤醒
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
注意:compareAndSetWaitStatus(p, ws, Node.SIGNAL)是将结点修改为“准备抢锁状态”
可见,enq方法在不断自旋地设置新的头结点和尾结点,注意这里的指的是LCH队列的头结点和尾结点;至于为什么当头结点为null要做一步“compareAndSetHead(new Node())”,其原因是LCH队列的头结点其实是一个占位结点,不对应任何线程,如下图所示:
可见,调用 Condition 的 signal() 方法,可以唤醒等待队列的首节点(等待时间最长),唤醒之前会将该节点移动到同步队列中。
形象流程如下:
AQS的核心属性与方法
核心属性
state属性
了解AQS底层的数据结构后,接下来学习AQS的同步状态——State。AQS中维护了一个名为state的字段,意为同步状态,是由Volatile修饰的,用于展示当前临界资源的获锁情况。state是在AQS中维护的唯一的共享状态,用来实现同步器同步。相关方法如下所示:
/**
* The synchronization state.
*/
private volatile int state;
/**
* Returns the current value of synchronization state.
* This operation has memory semantics of a {@code volatile} read.
* @return current state value
*/
protected final int getState() {
return state;
}
/**
* Sets the value of synchronization state.
* This operation has memory semantics of a {@code volatile} write.
* @param newState the new state value
*/
protected final void setState(int newState) {
state = newState;
}
/**
* Atomically sets synchronization state to the given updated
* value if the current state value equals the expected value.
* This operation has memory semantics of a {@code volatile} read
* and write.
*
* @param expect the expected value
* @param update the new value
* @return {@code true} if successful. False return indicates that the actual
* value was not equal to the expected value.
*/
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
我们可以通过修改State字段表示的同步状态来实现多线程的独占模式和共享模式,加锁过程如下图所示:
同步器是独占模式还是共享模式,取决AQS子类的具体实现,比如:ReentrantLock实现采用的是独占模式。
head和tail属性
从上图很容易看出,head和tail分别代表的就是CLH队列的头结点和尾结点。
其它属性
其它的属性就是一些对象属性的偏移地址、自旋时间spinForTimeoutThreshold,如:
核心方法
一般来说,自定义同步器要么是独占方式,要么是共享方式,因此它们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。ReentrantLock是独占锁,所以实现了tryAcquire-tryRelease。
独占锁的核心方法
acquire方法 - ReentrantLock原理
tryAcquire方法实现的是加锁作用。以ReentrantLock的非公平锁为例,下面是tryAcquire方法的调用流程:
首先是ReentrantLock的lock方法:
可见,CAS成功则表示获取锁成功,然后调用setExclusiverOwnerThread方法:
而这个setExclusiverOwnerThread方法,它是AQS的父类AbstractOwnableSynchronizer的方法
不难看出这个“exclusiveOwnerThread”就是独占锁的占用线程。如果没有竞争一定会独占成功,则加锁成功,如下示意图:
如果存在竞争,当前线程独占失败就会进入acquire(1):
其中tryAcquire是再尝试一次获取锁:
但是,它需要由AQS子类具体实现,下面是ReentrantLock的非公平锁的tryAcquire方法:
除了再一次尝试,ReentrantLock还再该方法实现了可重入的性质。如果还是获取锁失败,进入到“addWaiter”方法:
主要的流程如下:
- 通过当前的线程和锁模式新建一个节点
- Pred指针指向尾节点Tail
- 将New中Node的Prev指针指向Pred
- 通过compareAndSetTail方法,完成尾节点的设置。这个方法主要是对tailOffset和Expect进行比较,如果tailOffset的Node和Expect的Node地址是相同的,那么设置Tail的值为Update的值
当然,如果如果Pred指针是Null(说明等待队列中没有元素),或者当前Pred指针和Tail指向的位置不同(说明被别的线程已经修改),就需要进入前面提到过的Enq的方法(如下图所示的Thread-1),即把当前线程加入CLH队列。
接着,进入到acquireQueued:
当前线程(Thread-1)进入 acquireQueued 逻辑:
- acquireQueued 会在一个死循环中不断尝试获得锁,失败后进入 park 阻塞
- 如果自己是紧邻着 head(排第二位),那么再次 tryAcquire 尝试获取锁,当然这时 state 仍为 1,失败
- 进入 shouldParkAfterFailedAcquire 逻辑,将前驱 node,即 head 的 waitStatus 改为 -1,这次返回 false
- shouldParkAfterFailedAcquire 执行完毕回到 acquireQueued ,再次 tryAcquire 尝试获取锁,当然这时 state 仍为 1,失败
- 当再次进入 shouldParkAfterFailedAcquire 时,这时因为其前驱 node 的 waitStatus 已经是 -1,这次返回 true
- 进入 parkAndCheckInterrupt, Thread-1 park(灰色表示)
release方法 - ReentrantLock原理
接上图,如果Thread-0这时候释放锁,即调用unlock:
进入release方法:
首先tryRelease:
如果成功了,设置“exclusiveOwnerThread”为null,state为0(因为没有重入,所以就会导致“getState() - releases=0”):
回到releas方法,当前队列不为null,且 head 的 waitStatus = -1,进入unparkSuccessor 流程找到队列中离 head 最近的一个 Node(没取消的),unpark 恢复其运行,本例中即为 Thread-1回到 Thread-1 的 acquireQueued 流程:
如果加锁成功(没有竞争),会设置:
- exclusiveOwnerThread 为 Thread-1,state = 1
- head 指向刚刚 Thread-1 所在的 Node,该 Node 清空 Thread
- 原本的 head 因为从链表断开,而可被垃圾回收
如果和其它线程竞争(非公平锁)又失败了,又要回到CLH队列去了:
共享锁的核心方法(todo)
acquireShared方法 - ReadLock原理
以ReentrantReadWriteLock中的读锁ReadLock为例,他就是共享锁,下面是下的lock方法:
调用了核心方法acquireShared:
注意: tryAcquireShared 返回负数, 表示获取读锁失败
下面调用读写锁同步器的tryAcquireShared方法:
// Sync 继承过来的方法, 方便阅读, 放在此处
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
// 如果是其它线程持有写锁, 获取读锁失败
if ( exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current) {
return -1;
}
int r = sharedCount(c);
if ( // 读锁不该阻塞(如果老二是写锁,读锁该阻塞), 并且
!readerShouldBlock() &&
// 小于读锁计数, 并且
r < MAX_COUNT &&
// 尝试增加计数成功
compareAndSetState(c, c + SHARED_UNIT)
) {
// ... 省略不重要的代码
return 1;
}
return fullTryAcquireShared(current);
}
从这里其实就可以看出共享锁了,这段代码没有判断是否已经上了读锁(共享锁),只是判断是否上了写锁(独占锁)
如果获取读锁失败,进入doAcquireShared方法:
private void doAcquireShared(int arg) {
// 将当前线程关联到一个 Node 对象上, 模式为共享模式
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
// 再一次尝试获取读锁
int r = tryAcquireShared(arg);
if (r >= 0) {
// r 表示可用资源数, 在这里总是 1 允许传播,(唤醒 AQS 中下一个 Share 节点)
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 是否在获取读锁失败时阻塞(前一个阶段 waitStatus == Node.SIGNAL)
if (shouldParkAfterFailedAcquire(p, node) &&
// park 当前线程
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
releaseShared方法 - ReadLock原理
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
// ... 省略不重要的代码
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// 读锁的计数不会影响其它获取读锁线程, 但会影响其它获取写锁线程
// 计数为 0 才是真正释放
return nextc == 0;
}
}
private void doReleaseShared() {
// 如果 head.waitStatus == Node.SIGNAL ==> 0 成功, 下一个节点 unpark
// 如果 head.waitStatus == 0 ==> Node.PROPAGATE
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
// 如果有其它线程也在释放读锁,那么需要将 waitStatus 先改为 0
// 防止 unparkSuccessor 被多次执行
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}