概述
AbstractQueuedSynchronizer,抽象的队列同步器,用来构建锁或者其他同步组件的基础框架
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
implements java.io.Serializable {
// 同步队列的头节点、尾节点
private transient volatile Node head;
private transient volatile Node tail;
// 同步状态
private volatile int state;
}
public abstract class AbstractOwnableSynchronizer
implements java.io.Serializable {
// 独占模式下当前持有锁的线程,有效的工作线程
private transient Thread exclusiveOwnerThread;
}
- AQS主要使用方式是继承,子类通过继承AQS并实现它的抽象方法来管理同步状态state
- AQS提供了getState()、setState(newState)、compareAndSetState(expect,update)来对state进行操作
- AQS本身没有实现任何同步接口,仅仅是定义了若干同步状态获取和释放的方法供自定义同步组件使用,AQS既支持独占式地获取同步状态,又支持共享式地获取同步状态,方便实现不同地同步组件(ReentrantLock、Semaphore、SynchronousQueue、ReentrantReadWriteLock和CountDownLatch等)
基本实现原理
核心思想
- 被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态
- 被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制
加锁、解锁
- AQS内部维护了一个int类型的成员变量state表示同步状态、一个加锁线程、一个内置的FIFO队列来完成获取资源线程的排队工作
- 加锁过程:尝试CAS设置state=1
- CAS成功表示加锁成功,将当前线程设置成加锁线程
- CAS失败表示state已经不是0了,有线程持有锁了
- 检查持有锁的线程是不是当前线程,如果是当前线程,state++,表示又重入了一次
- 加锁线程不是当前线程,当先线程进入同步队列排队等待锁释放
- 解锁过程
- state—,如果state=0,就表示彻底释放锁
- 这时会将“加锁线程”变量也设置为null
- 从同步队列的队头唤醒等待的线程重新尝试加锁
AQS资源共享方式
AQS的设计是基于模板方法模式设计的,如果需要自定义同步器一般的方式是这样:
- 使用者继承AQS并重写指定的方法
- 将AQS组合在自定义同步组件的实现中,并调用其模板方法,而这些模板方法会调用使用者重写的方法
Exclusive(独占式)
ReentrantLock
常用方法
// 模板方法,使用子类重写的这个方法
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
// 模板方法,使用子类重写的这个方法
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
公平锁
公平锁直接进入tryAcquire方法中,在tryAcquire方法中,如果发现锁这个时候被释放了(state==0),公平锁会判断同步队列是否处于等待状态,如果有则不去抢锁,乖乖排到后面
非公平锁
- 非公平锁在调用lock后,首先会调用CAS进行一次加锁,如果这个时候恰巧锁没有被占用,那么就直接获取锁返回了
- 非公平锁在第一次CAS失败后,和公平锁一样会进入到tryAcquire方法,在tryAcquire方法中,如果发现锁这个时候被释放了(state==0),非公平锁会直接CAS抢锁,抢锁失败再进入同步队列
Share(共享式)
- Semaphore:信号量
- CountDownLatch:倒计数器
- ReadWriteLock:读写锁
- CyclicBarrier:循环栅栏
常用方法
// 模板方法,使用子类重写的这个方法
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
// 模板方法,使用子类重写的这个方法
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
同步队列
- CLH(Craig,Landin,and Hagersten)队列是一个虚拟的双向队列(即不存在队列实例,仅存在节点之间的关联关系)
- AQS是将每一条请求共享资源的线程封装成一个CLH锁队列的一个结点(Node),来实现锁的分配
Node
AQS是CLH队列锁的一种变体实现,毫无疑问,作为队列来说,必然要有一个节点的数据结构来保存各种信息,比如前驱节点、节点的状态等等,这个数据结构就是AQS中的内部类Node,这个数据结构需要哪些信息呢?
- 线程信息,我是那个线程
- 队列中的线程状态,既然知道是那个线程,肯定还要知道线程处于什么状态,是已经取消了“获取锁”的请求,还是在“等待获取锁”,或者是“即将得到锁”
- 前驱和后继线程,因为是一个同步队列,那么也要知道当前线程前面是哪个线程,后面是哪个线程(当前线程释放锁后,就应该通知后继线程去获取锁)
static final class Node {
// 表示当前Node结点的等待状态
volatile int waitStatus;
// 指向当前节点的前置节点
volatile Node prev;
// 指向当前节点的后置节点
volatile Node next;
// 进入该节点的等待线程
volatile Thread thread;
//等待condition条件的Node节点
Node nextWaiter;
//……
}
waitStatus
- CANCELLED(1):表示线程的获取锁请求已经“取消”
- SIGNAL(-1):表示线程一切就绪,就等待锁空闲让出来
- CONDITION(-2):表示线程等待某一个条件(Condition)被满足
- PROPAGATE(-3):当前线程处于“SHARED”模式时,该字段才会被使用
- 0:初始化Node对象时,默认是0
注意,负值表示结点处于有效等待状态,而正值表示结点已被取消。所以源码中很多地方用>0、<0来判断结点的状态是否正常
当前线程获取同步状态失败时,AQS会将当前线程以及等待状态等信息构成一个节点(Node)并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点中的线程唤醒,使其再次尝试同步状态。AQS中的节点(Node)用来保存获取同步状态失败的线程引用、等待状态以及前驱和后继节点
head和tail
- head:AQS的首节点引用,指向队列头节点
- tail:AQS的尾节点引用,指向队列尾节点
注意:
首节点head是不保存线程信息的节点,仅仅是因为数据结构设计上的需要,在数据结构上,这种做法往往叫做“空头节点链表”。对应的也有“非空头节点链表”
节点在同步队列中的插入和移出
节点加入同步队列
当一个线程成功地获取了同步状态(或者锁),其他线程将无法获取到同步状态,也就是获取同步状态失败,AQS会将这个线程以及等待状态等信息构造成一个节点(Node)并将其加入同步队列的尾部。而这个加入队列的过程必须保证线程安全
因此同步器提供了一个基于CAS的设置尾节点的方法:
compareAndSetTail(Node expect,Node update),它需要传递当前线程“认为”的尾节点和当前节点,只有设置成功后,当前节点才正式与之前的尾节点建立关联
首节点的变化
首节点是获取同步状态成功的节点,首节点的线程在释放释放同步状态时,将会唤醒后继节点,而后继节点将会在获取同步状态成功时将自己设置为首节点。
设置首节点是通过获取同步状态成功的线程来完成的,由于只有一个线程能够成功获取同步状态,因此设置头节点的方法并不需要CAS来保证,它只需要将首节点设置成原首节点的后继节点并断开原首节点的next引用即可
独占式同步状态的获取和释放
获取
通过调用AQS的acquire(arg)方法可以获取同步状态,主要完成了同步状态获取、构造节点、加入同步队列以及在同步队列中循环等待的的相关工作
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
- 首先调用自定义同步器实现的tryAcquire方法,该方法需要保证安全的获取同步状态
- 如果同步状态获取失败(tryAcquire返回false)
- 构造同步节点(独占式Node.EXCLUSIVE,同一时刻只有一个线程能成功获取同步状态)
- 再通过addWaiter方法将该节点加入同步队列尾部
- 最后调用acquireQueued方法,使得该节点以死循环的方式获取同步状态
addWaiter(Node mode)
构造节点,将节点按照指定模式放到等待同步尾部
/**
* 根据当前线程及给定的同步模式构造节点,将节点按照指定模式放到等待同步尾部
* mode:Node.EXCLUSIVE-独占式, Node.SHARED-共享式
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 队列尾节点
Node pred = tail;
// 尾节点不为null,说明队列不为空
if (pred != null) {
// 当前节点的前置节点设置为原先的尾节点
node.prev = pred;
// cas设置当前节点为同步队列尾节点
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 原先同步队列为空或者cas设置尾节点失败,将节点插入队列,必要时进行初始化
enq(node);
return node;
}
构造节点后,同步队列不为空的情况下先尝试将节点加入同步队列尾节点,如果加入失败或者同步队列为空,那么进入enq(final Node node)方法
// 将节点插入队列,必要时进行初始化
private Node enq(final Node node) {
// 自旋
for (;;) {
// 尾节点
Node t = tail;
// 同步队列为空
if (t == null) { // 先初始化,下一次循环再进行实际的节点加入
// 创建一个新节点,cas设置新节点为同步队列头节点
if (compareAndSetHead(new Node()))
// 尾节点赋值为头节点
tail = head;
} else { // 同步队列不为空
// 节点的前置节点设置为原先的尾节点
node.prev = t;
// cas设置节点为尾节点
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
同步器通过“死循环”的方式来保证节点正确添加到同步队列中,循环中做了两件事:
- 初始化:如果同步队列为空,初始化一个空节点,并将首节点和尾节点两个引用都指向这个空节点
- 将当前节点加入同步队列:只有通过cas将当前节点设置成尾节点后,当前线程才能从该方法返回,否则会不断尝试
acquireQueued(final Node node, int arg)
节点进入同步队列后,通过acquireQueued方法保证该节点以“死循环”的方式获取同步状态
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 获取当前节点的前置节点p
final Node p = node.predecessor();
// p是首节点且成功获取到同步状态
if (p == head && tryAcquire(arg)) {
// 设置当前节点为头节点
setHead(node);
// 断开原先头节点和队列的链接,帮助GC
p.next = null; // help GC
// 加锁失败标识置为false
failed = false;
// 返回
return interrupted;
}
// 当前线程获取锁失败需要阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
// 线程进入阻塞状态,等待被唤醒
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 获取锁失败且跳出自旋(可能是遇到异常之类的),取消获取锁
if (failed)
cancelAcquire(node);
}
}
就是一个自旋的过程,每个节点(线程)都在自省的观察,当条件满足,获取到了同步状态就可以从自旋中退出,否则就继续留在这个自旋的过程中(会阻塞节点的线程)
为什么只有前驱节点是首节点才能尝试获取同步状态?
- 头节点是成功获取同步状态的节点,头节点线程释放同步状态后,会唤醒其后继节点,后继节点的线程被唤醒后需要检查自己的前驱节点是否是头节点
- 维护同步队列FIFO的原则
当前线程获取到同步状态后,让首节点的引用指向自己这个节点,即当前节点就是首节点。同步状态获取成功后,当前线程就从acquire返回了。如果同步器实现的是锁,就代表当前线程获取了锁
释放
当前线程获取同步状态并执行完相应逻辑之后,就需要释放同步状态,使得后继节点能继续获取同步状态。调用AQS的release(arg)方法可以释放同步状态,该方法在释放同步状态之后,会唤醒其后继节点(使后继节点继续尝试获取同步状态)
public final boolean release(int arg) {
// 释放同步状态
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
// 唤醒后继节点
unparkSuccessor(h);
return true;
}
return false;
}
// 唤醒后继节点
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 后继节点
Node s = node.next;
// 后继节点为空或者已取消获取锁的请求
if (s == null || s.waitStatus > 0) {
s = null;
// 从尾节点向前遍历找到实际未取消的后继者(最靠前的一个)
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 唤醒后继节点的线程,使其尝试获取锁
if (s != null)
LockSupport.unpark(s.thread);
}
- 正常情况下,被唤醒的线程是head指向节点的后继节点线程
- 另一种情况,如果后继节点线程“已取消”获取同步请求,直接从尾部向前遍历找到最前面一个实际处于等待状态的节点线程
- 可能的原因:后继节点处于cancel状态,表示当时锁竞争很激烈,队列的第一个节点等了很久都无法获取同步状态(一直被还未加入队列的节点获取),包括后续的节点被cancel的几率都比较大,所以从尾部向前找到最前面一个处于未cancel状态的节点,然后唤醒这个节点
总结
- 在获取同步状态时,同步器维护一个同步队列,获取状态失败的线程都会被加入到队列中并在队列中进行自旋
- 移出队列(或停止自旋)的条件是前驱节点为头节点且成功获取了同步状态。在释放同步状态时,同步器调用tryRelease(int arg)方法释放同步状态,然后唤醒head指向节点的后继节点
共享式同步状态的获取和释放
共享式获取与独占式获取最主要的区别在于同一时刻能否有多个线程同时 获取到同步状态
以读写为例,如果一个程序在进行读操作,那么这一时刻写操作均被阻塞,而读操作能够同时进行。写操作要求对资源的独占式访问,而读操作可以是共享式访问。
获取
public final void acquireShared(int arg) {
// 尝试获取共享锁,<0表示获取失败
if (tryAcquireShared(arg) < 0)
// 执行获取锁失败的后续操作
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
// 创建一个共享类型的节点,加入同步队列
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 获取前驱节点p
final Node p = node.predecessor();
// 前驱节点是头节点
if (p == head) {
// 尝试获取锁
int r = tryAcquireShared(arg);
// 获取锁成功
if (r >= 0) {
// 设置头节点并传播唤醒部分节点
setHeadAndPropagate(node, r);
// 断开头节点和同步队列的链接,帮助GC
p.next = null; // help GC
// 因为线程中断醒来的设置中断标识位
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
/**
* 两个入参:
* 一个是当前成功获取共享锁的节点
* 一个就是tryAcquireShared方法的返回值,可能大于0也可能等于0
*/
private void setHeadAndPropagate(Node node, int propagate) {
// 原先的头节点
Node h = head;
// 设置头节点
setHead(node);
/**
* 有两种情况需要唤醒
* 1、propagate > 0 表示调用方指明了后继节点需要被唤醒
* 2、头节点后面的节点需要被唤醒(waitStatus<0),不论是老的头结点还是新的头结点
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
//如果当前节点的后继节点是共享类型或者没有后继节点,则进行唤醒
//这里可以理解为除非明确指明不需要唤醒(后继等待节点是独占类型),否则都要唤醒
if (s == null || s.isShared())
doReleaseShared();
}
}
- tryAcquireShared方法尝试获取同步状态,返回值大于等于 0 时,表示成功获取到同步状态
- tryAcquireShared方法获取同步状态失败后,调用doAcquireShared方法
- 创建共享式的节点加入同步队列
- 一个自旋过程
- 如果当前节点的前驱为头节点时,尝试获取同步状态
- 如果返回值大于等于 0,说明获取到同步状态
- 设置当前节点为头节点并进行唤醒后续节点操作(两种情况下需要唤醒)
- 从自旋中退出
- 前驱节点不是头节点,当前线程加锁失败,线程进入阻塞状态,等待被唤醒
- 如果当前节点的前驱为头节点时,尝试获取同步状态
释放
public final boolean releaseShared(int arg) {
// 尝试释放共享锁
if (tryReleaseShared(arg)) {
// 唤醒后续节点线程操作
doReleaseShared();
return true;
}
return false;
}
private void doReleaseShared() {
// 自旋
for (;;) {
// 头节点
Node h = head;
// 队列不为空
if (h != null && h != tail) {
// 头节点等待状态
int ws = h.waitStatus;
// 后续节点需要被唤醒
if (ws == Node.SIGNAL) {
// 并发控制,防止多次unpark
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 执行唤醒操作,唤醒h的后继节点
unparkSuccessor(h);
}
//如果后继节点暂时不需要唤醒,则把当前节点状态设置为PROPAGATE确保以后可以传递下去
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
//如果头结点没有发生变化,表示设置完成,退出循环
//如果头结点发生变化,比如说其他线程获取到了锁,为了使自己的唤醒动作可以传递,必须进行重试
if (h == head) // loop if head changed
break;
}
}
该方法在释放同步状态之后,将会唤醒后续处于等待状态的节点。对于能够支持多个线程同时访问的并发组件(比如 Semaphore),它和独占式主要区别在于 tryReleaseShared(int arg)方法必须确保同步状态(或者资源数)线程安全释放,一般是通过循环和 CAS 来保证的,因为释放同步状态的操作会同时来自多个线程
Condition
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
//……
}
等待队列
- 等待队列也是一个FIFO的队列,队列中每个节点都包含一个线程的引用,这个线程就是在Condition对象上等待的线程
- 如果一个线程调用了Condition.await()方法,那么
- 该线程将会释放锁
- 构造成节点加入等待队列并进入等待状态
- 实际上,等待队列节点的定义复用了同步队列节点的定义,节点类型都是AQS的静态内部类
// 新增节点到等待队列
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
一个Condition包含一个等待队列,Condition包含首节点和尾节点,当前线程调用Condition.await()方法,将会以当前线程构造节点,并将节点从尾部加入等待队列。Condition拥有尾节点的引用,所以新增节点只需要将尾节点的nextWaiter指向这个节点,并更新Condition中的尾节点引用即可。这个过程并不需要CAS操作,因为调用await()方法的线程必然是获取锁的线程,该过程已经由锁保证线程安全了
同步器AQS拥有一个同步队列和多个等待队列
await
public final void await() throws InterruptedException {
// 线程被中断,抛异常
if (Thread.interrupted())
throw new InterruptedException();
// 根据当前线程创建新节点加入等待队列中
Node node = addConditionWaiter();
// 释放同步状态
int savedState = fullyRelease(node);
int interruptMode = 0;
// 线程正在等待条件
while (!isOnSyncQueue(node)) {
// 线程进入等待状态
LockSupport.park(this);
// 检查中断
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 线程被signal方法唤醒,尝试获取同步状态
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
- 调用Condition的await()方法(或者以await开头的方法),会使当前线程进入等待队列并释放锁,同时线程状态变为等待状态。当从await()方法返回时,当前线程一定获取了Condition相关联的锁
- 如果从队列(同步队列、等待队列)的角度看await方法,当调用await方法时,相对于同步队列的首节点(获取锁的节点)移动到Condition的等待队列中
- 调用该方法的线程成功获取了锁的线程,也就是同步队列的首节点,该方法会将当前线程构造成节点并加入等待队列中
- 然后释放同步状态,唤醒同步队列中的后继节点,然后当前线程进入等待状态
- 当等待队列中的节点被唤醒,则被唤醒节点的线程开始尝试获取同步状态
- 如果没有被其他线程调用Condition.signal()方法唤醒,而是对线程进行中断,会抛出InterruptException
同步队列的首节点并不会直接加入等待队列,而是通过addConditionWaiter()方法把当前线程构造成一个新的节点并将其加入等待队列中
signal
调用condition的signal()方法,将会唤醒在等待队列中等待时间最长的节点(首节点),在唤醒节点之前,会将节点移到同步队列中
// conditionObject的方法
public final void signal() {
// 当前线程是否持有锁
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 等待队列头节点
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
// conditionObject的方法
private void doSignal(Node first) {
do {
// 等待队列中只剩头节点
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
//AQS的方法
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// 等待队列头节点移动到同步队列中
Node p = enq(node);
int ws = p.waitStatus;
// 使用LockSupport唤醒该节点
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
- 调用该方法的前置条件是当前线程必须获得了锁,可以看到signal()方法进行了isHeldExclusively()检查,也就是当前线程必须是获取了锁的线程
- 获取等待队列的首节点,将其移动到同步队列并使用LockSupport唤醒节点中的线程
- 通过调用同步器的enq(node)方法,等待队列中的头节点线程安全移动到同步队列。当节点移动到同步队列后,当前线程再使用LockSupport唤醒该节点
- 被唤醒后的线程,将从await()方法中的while循环中退出(isOnSyncQueue方法返回true,节点已经在同步队列中),进而调用同步器的acquireQueued()方法加入到获取同步状态的竞争中
- 成功获取同步状态之后,被唤醒的线程将先前调用的await()方法返回,此时线程已经成功获取了锁
- condition的signalAll()方法,相当于对等待队列中的所有节点都执行了一次signal()方法,效果就是将等待队列中的所有节点全部移动到同步队列中,并唤醒每个节点的线程
signal方法执行后,头节点从等待队列移动到同步队列过程中需要唤醒节点中的线程主要是为了让await()方法退出while循环