CAS 通过自旋来达到轻量级锁,避免用户态和内核态的切换。但是 CAS 自旋会带来两个问题:
- CAS 恶性循环自旋会浪费大量的 CPU 资源
- 在 SMP 架构上的 CPU 上会导致 “总线风暴”
解决该问题的有效方案之一就是以空间换时间,常见的方案有:
- 热点分离(参考LongAdder)
- 队列削峰(AQS)
AQS 概述
AbstractQueuedSynchronizer 抽象同步队列简称 AQS,它是实现同步器的基础组件,并发包中锁的底层就是使用 AQS 实现的。AQS 队列内部维护的是一个 FIFO 的双向链表,这种结构的特点是每个数据结构都有两个指针,分别指向直接的前驱节点和直接的后继节点。所以双向链表可以从任意一个节点开始很方便地访问前驱节点和后继节点。每个节点其实是由线程封装的,当线程争抢锁失败后会封装成节点加入 AQS 队列中;当获取锁的线程释放锁以后,会从队列中唤醒一个阻塞的节点(线程)。
AQS 基于模板模式实现,内部为获取锁、释放锁的入队和出队过程提供了一系列的模板方法。因此不同功能的实现类都根据自身的特点实现了 AQS 留下来的方法。
AbstractOwnableSynchronizer
AQS 的基类,提供了一个 exclusiveOwnerThread 变量,表示当前持有锁的线程。
public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {
protected AbstractOwnableSynchronizer() { }
/**
* The current owner of exclusive mode synchronization.
*/
private transient Thread exclusiveOwnerThread;
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
protected final Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
}
状态标志位
/**
* 同步状态,具有线程可见性
*/
private volatile int state;
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
// 通过CAS设置同步状态
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
state 字段用来表示锁状态信息,是一个 volatile 变量,用来保证线程可见性。AQS 提供了 getState()
、setSate(int newState)
来获取和设置同步状态,但是由于 setState(int newState)
无法保证原子性,因此有提供了 compareAndSetState(int expect, int update)
方法,使用 CAS 操作保证了状态设置的原子性。
对于 ReentrantLock 的实现来说,state 可以用来表示当前线程获取锁的可重入次数;对于读写锁 ReentrantReadWriteLock 来说,state 的高 16 位表示读状态,也就是获取该读锁的次数,低 16 位表示获取到写锁的线程的可重入次数;对于 semaphore 来说,state 用来表示当前可用信号的个数;对于 CountDownlatch 来说,state 用来表示计数器当前的值。
队列节点-Node
AQS 是一个 FIFO 的双向队列,其内部字段 head 和 tail 分别表示队列的头结点和尾节点。Node 作为 AQS 的内部类,是专门用来定义队列节点的类:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
implements java.io.Serializable {
...
static final class Node {
/** 标记该线程是获取共享资源时被阻赛挂起后放入AQS队列的 */
static final Node SHARED = new Node();
/** 标记该线程是获取独占资源时被阻赛挂起后放入AQS队列的 */
static final Node EXCLUSIVE = null;
/**
* 等待状态:线程被取消了,不再等待
*
* 表示该线程节点已释放(超时、中断),已取消的节点不会再阻塞,表示线程因为中断或者
* 等待超时,需要从等待队列中取消等待。且节点一旦进入了取消状态,该类型节点不会参与竞争,
* 且会一直保持取消状态。
*/
static final int CANCELLED = 1;
/** 等待状态:后继线程处于等待状态 */
static final int SIGNAL = -1;
/** 等待状态:当前线程正在进行条件等待,需要signal唤醒 */
static final int CONDITION = -2;
/**
* 等待状态:标识下一次共享锁的acquireShared操作需要无条件传播,也即是需要通知其他节点。
*
* 为什么当一个节点的线程获取共享锁后,要唤醒后继共享节点?共享锁是可以多个线程共有的,当
* 一个节点的线程获取共享锁后,必然要通知后继共享节点的线程也可以获取锁了,这样就不会让其
* 他等待的线程等很久,这种向后通知(传播)的目的也是尽快通知其他等待的线程尽快获取锁。
*/
static final int PROPAGATE = -3;
/**
* 节点状态:取值为 0,CANCELLED、CONDITION、SIGNAL、PROPAGATE,
*
* 普通节点的初始值为0,条件等待节点的初始值为CONDITION(-2)
*/
volatile int waitStatus;
/**
* 前驱结点,当前节点会在前驱结点上自旋,循环检查前驱结点的waitStatus状态
*/
volatile Node prev;
/**
* 后继节点
*/
volatile Node next;
/**
* 节点所对应的线程,为抢锁线程或条件等待线程
*/
volatile Thread thread;
/**
* 若当前节点不是普通节点而是条件等待节点,则节点处于某个条件的等待队列中
*
* 该字段指向下一个条件等待节点,即其条件队列上的后继节点
*
* 此成员只有线程处于条件等待队列中的时候使用。
*/
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
/**
* @return the predecessor of this node
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
......
}
ConditionObject
ConditionObject 是用来结合锁实现线程同步的。他可以直接访问 AQS 对象的内部变量,比如 state 状态值和 AQS 队列。ConditionObject 是条件变量,每个条件变量对应一个条件队列(单向链表队列),其用来存放调用条件变量的 await 方法后被阻塞的线程,该条件队列的头结点和尾节点分别为 firstWaite r和 lastWaiter 。
public class ConditionObject implements Condition, java.io.Serializable {
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
public final void await() throws InterruptedException {
...
}
public final void signal() {
...
}
public final void signalAll() {
...
}
}
FIFO 双向同步队列
AQS 队列内部维护的是一个 FIFO 的双向链表,部通过节点 head 和 tail 记录队首和队尾元素。每当线程通过 AQS 获取锁失败时,线程将被封装成一个 Node 节点,通过 CAS 原子操作插入队列尾部。当有线程释放锁时,AQS 会尝试让队头的后继节点占用锁。
/**
* 等待队列头结点(懒加载),可以通过初始化和setHead()进行赋值。
* 注意:如果该头结点存在,它的等待状态就不能是CANCELLED
*/
private transient volatile Node head;
/**
* 等待队列的尾节点(懒加载),只能通过入队添加等待节点的方式进行赋值
*/
private transient volatile Node tail;
AQS 同步队列的基本结构大概如下所示:
AQS 中的钩子方法
AQS 定义了两种资源共享的方式:
- Exclusive(独占):只有一个线程能占有锁资源,如 ReentrantLock。独占锁又可分为公平锁和非公平锁
- Shared(共享):多个线程可同时占有锁资源,如 Semaphore、CountDownLatch 等
AQS 为不同的资源共享方式提供了不同的模板流程,包括共享锁、独享锁模板流程。这些模板流程完成了具体线程进出等待队列的(如获取资源失败入队/唤醒出队等)通用逻辑。基于这些通用逻辑,AQS 提供了一种实现阻塞锁和依赖 FIFO 等待队列的同步器的框架,AQS 模板为 ReentedLocK、CountDownLatch、Semaphore 提供了优秀的解决方案。自定义的同步器只需要实现共享资源 state 的获取与释放方式即可,这些逻辑都编写在钩子方法中。无论是共享锁还是独享锁,AQS 在执行模板流程时都会回调自定义的钩子方法。
protected boolean tryAcquire(int arg)
独占锁钩子,尝试获取资源,若成功则返回 true,若失败则返回 false
protected boolean tryRelease(int arg)
独占锁钩子,尝试释放资源,若成功则返回 true,若失败则返回 false
protected int tryAcquireShared(int arg)
共享锁钩子,尝试获取资源,负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源
protected boolean tryReleaseShared(int arg)
共享锁钩子,尝试释放资源,若成功则返回true,若失败则返回false
protected boolean isHeldExclusively()
独占锁钩子,判断该线程是否正在独占资源。只有用到 condition 条件队列时才需要去实现它