AQS简介

AQS的定义

AbstractQueuedSynchronizer简称AQS,它是Java中的锁的一个核心类,几乎所有Java层面的锁都是基于AQS设计的。AQS是一个用来构建锁和同步器的框架,使用AQS能简单且高效地构造出应用广泛的大量的同步器,比如:ReentrantLock、Semaphore、ReentrantReadWriteLock、SynchronousQueue、FutureTask等等皆是基于AQS的。当然,我们自己也能利用AQS非常轻松容易地构造出符合我们自己需求的同步器。下面则是AQS的方法架构图:
image.png
当有自定义同步器接入时,只需重写第一层所需要的部分方法即可,不需要关注底层具体的实现流程。当自定义同步器进行加锁或者解锁操作时,先经过第一层的API进入AQS内部方法,然后经过第二层进行锁的获取,接着对于获取锁失败的流程,进入第三层和第四层的等待队列处理,而这些处理方式均依赖于第五层的基础数据提供层。

AQS原理概述

AQS核心思想是:如果被请求的共享资源空闲,那么就将当前请求资源的线程设置为有效的工作线程,将共享资源设置为锁定状态;如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是CLH队列的变体实现的,将暂时获取不到锁的线程加入到队列中,如下图所示:
image.png

注意:

  • CLH:Craig、Landin and Hagersten队列,是单向链表,AQS中的队列是CLH变体的虚拟双向队列,AQS是通过将每条请求共享资源的线程封装成一个节点来实现锁的分配
  • 虚拟的双向队列即不存在队列实例对象,仅存在结点之间的关联关系
  • AQS使用一个Volatile的int类型的成员变量来表示同步状态,通过内置的FIFO队列来完成资源获取的排队工作,通过CAS完成对State值的修改

AQS底层数据结构

AQS中包含两个内部类:Node类和ConditionObject类。
image.png

内部类 - Node类

image.png
首先了解一下它主要的属性:int waitStatus、Node prev、Node next、Thread thread、Node nextWaiter;主要的方法是:boolean isShared()、Node predecessor() 以及几个构造方法。

核心属性

waitSatus属性

image.png
被volatile修饰的属性waitSatus表示当前结点在CLH变体队列中的状态。而waitStatus有以下几个状态量:
image.png
image.png

prev属性

image.png
prev是一个被volatile修饰的属性,它表示的是CLH队列中当前结点的前一个Node结点。

next属性

image.png
next表示的是CLH队列中当前结点的下一个Node结点。

thread属性

image.png
thread属性表示的是当前结点关联的线程对象。

nextWaiter属性

image.png
AQS中的阻塞队列(即LCH)采用的是用双向链表保存,用prve和next相互链接;而AQS中条件队列(即下面将要提到的ConditionObject内部类的底层结构)是使用单向链表保存的,而这里的nextWaiter就是用来连接这个单向链表的。此外,还有两个属性,而这两个属性其实是nextWaiter的两个状态:
image.png
这两个属性是用来标识是独占模式还是共享模式,即对应AQS的独占锁和共享锁。
image.png

核心方法

构造方法

image.png

isShared方法

image.png
该方法用于判断当前Node是否为共享结点。

predecessor方法

image.png
该方法用于获取当前结点的前置节点。

内部类 - ConditionObject类

核心属性

firstWaiter和lastWaiter属性

image.png
可以看到ConditionObject实现了Condition接口,因此,之前的RenntrantLock的条件变量其实就是使用的ConditionObject对象。
image.png
可以看到,最主要的两个属性就是“Node firstWaiter”和“Node lastWaiter”,他们分别指向条件队列的头结点和尾结点,这个条件队列各个结点之间又使用了前面的Node类的nextWaiter
属性来关联,并且通过前面waitStatus的状态可知位于条件队列中的Node的waitStatus=-1,即对应“Node.CONDITION”。

核心方法

addConditionWaiter方法

  1. private Node addConditionWaiter() {
  2. Node t = lastWaiter;
  3. // If lastWailastWaiterter is cancelled, clean out.
  4. //如果lastWaiter不处于Node.CONDITION
  5. if (t != null && t.waitStatus != Node.CONDITION) {
  6. //清空等待队列中状态不为-1(waitStatus=Node.CONDITION)
  7. unlinkCancelledWaiters();
  8. t = lastWaiter;
  9. }
  10. Node node = new Node(Thread.currentThread(), Node.CONDITION);
  11. if (t == null)
  12. firstWaiter = node;
  13. else
  14. t.nextWaiter = node;
  15. lastWaiter = node;
  16. return node;
  17. }

image.png
可以看到,该方法的作用其实就是把当前线程添加进等待队列里面去。

await方法

  1. public final void await() throws InterruptedException {
  2. if (Thread.interrupted())
  3. throw new InterruptedException();
  4. //将当前线程添加进等待队列
  5. Node node = addConditionWaiter();
  6. //释放锁
  7. int savedState = fullyRelease(node);
  8. int interruptMode = 0;
  9. //判断当前线程的结点是否在CLH中,
  10. //如果没有放入,则阻塞,继续 while 循环(如果已经中断了,则退出)
  11. while (!isOnSyncQueue(node)) {
  12. //不再CLH中,则park当前线程
  13. LockSupport.park(this);
  14. ////如果已经中断了,则退出
  15. if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  16. break;
  17. }
  18. //如果在LCH中,则会尝试获取锁
  19. if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
  20. interruptMode = REINTERRUPT;
  21. if (node.nextWaiter != null) // clean up if cancelled
  22. unlinkCancelledWaiters();
  23. if (interruptMode != 0)
  24. reportInterruptAfterWait(interruptMode);
  25. }

image.png
image.png

注意:

  • 先只需要知道release方法是释放当前线程持有的锁即可,具体实现暂不展开。
  • “如果在LCH中,则会尝试获取锁”这一块也暂时不展开

可见,调用 Condition 的 await() 方法会使当前线程进入等待队列并释放锁,线程状态变为等待状态。当前,还有几个其它await一样功能的方法,比如:awaitNanos(long nanosTimeout)、awaitUntil(Date deadline)、await(long time, TimeUnit unit),实现都是一样的,只是加了一些时间上的限制。形象流程图如下所示:
image.png
image.png
image.png

signal方法

  1. public final void signal() {
  2. //判断当前线程是否获取了锁,只有获得锁的线程才有资格调用该方法
  3. if (!isHeldExclusively())
  4. throw new IllegalMonitorStateException();
  5. Node first = firstWaiter;
  6. if (first != null)
  7. doSignal(first);
  8. }
  • 先判断当前线程是否获取了锁;
  • 然后对首节点调用 doSignal() 方法。

    isHeldExclusively方法的作用是判断当前实现是否获得锁,由AQS的子类重写,这里暂不具体展开。

  1. private void doSignal(Node first) {
  2. do {
  3. //等待队列的首结点修改为first.nextWaiter(即firstWaiter.nextWaiter)
  4. if ( (firstWaiter = first.nextWaiter) == null)
  5. lastWaiter = null;
  6. first.nextWaiter = null;
  7. } while (!transferForSignal(first) &&
  8. (first = firstWaiter) != null);
  9. }
  • 修改首节点;
  • 调用 transferForSignal() 方法将节点移动到同步队列。
    1. final boolean transferForSignal(Node node) {
    2. //利用CAS将结点状态变为0.即初始化状态
    3. if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
    4. return false;
    5. //将该节点加入同步队列
    6. Node p = enq(node);
    7. int ws = p.waitStatus;
    8. //如果结点p的状态为cancel 或者修改waitStatus失败,则直接unpark唤醒
    9. if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
    10. LockSupport.unpark(node.thread);
    11. return true;
    12. }

    注意:compareAndSetWaitStatus(p, ws, Node.SIGNAL)是将结点修改为“准备抢锁状态”

image.png
image.png
image.png
可见,enq方法在不断自旋地设置新的头结点和尾结点,注意这里的指的是LCH队列的头结点和尾结点;至于为什么当头结点为null要做一步“compareAndSetHead(new Node())”,其原因是LCH队列的头结点其实是一个占位结点,不对应任何线程,如下图所示:
image.png
可见,调用 Condition 的 signal() 方法,可以唤醒等待队列的首节点(等待时间最长),唤醒之前会将该节点移动到同步队列中。
image.png
形象流程如下:
image.png
image.png
image.png

AQS的核心属性与方法

核心属性

state属性

了解AQS底层的数据结构后,接下来学习AQS的同步状态——State。AQS中维护了一个名为state的字段,意为同步状态,是由Volatile修饰的,用于展示当前临界资源的获锁情况。state是在AQS中维护的唯一的共享状态,用来实现同步器同步。相关方法如下所示:

  1. /**
  2. * The synchronization state.
  3. */
  4. private volatile int state;
  5. /**
  6. * Returns the current value of synchronization state.
  7. * This operation has memory semantics of a {@code volatile} read.
  8. * @return current state value
  9. */
  10. protected final int getState() {
  11. return state;
  12. }
  13. /**
  14. * Sets the value of synchronization state.
  15. * This operation has memory semantics of a {@code volatile} write.
  16. * @param newState the new state value
  17. */
  18. protected final void setState(int newState) {
  19. state = newState;
  20. }
  21. /**
  22. * Atomically sets synchronization state to the given updated
  23. * value if the current state value equals the expected value.
  24. * This operation has memory semantics of a {@code volatile} read
  25. * and write.
  26. *
  27. * @param expect the expected value
  28. * @param update the new value
  29. * @return {@code true} if successful. False return indicates that the actual
  30. * value was not equal to the expected value.
  31. */
  32. protected final boolean compareAndSetState(int expect, int update) {
  33. // See below for intrinsics setup to support this
  34. return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
  35. }

image.png
我们可以通过修改State字段表示的同步状态来实现多线程的独占模式和共享模式,加锁过程如下图所示:
image.png
image.png
同步器是独占模式还是共享模式,取决AQS子类的具体实现,比如:ReentrantLock实现采用的是独占模式。

head和tail属性

image.png
从上图很容易看出,head和tail分别代表的就是CLH队列的头结点和尾结点。

其它属性

其它的属性就是一些对象属性的偏移地址、自旋时间spinForTimeoutThreshold,如:
image.png

核心方法

一般来说,自定义同步器要么是独占方式,要么是共享方式,因此它们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。ReentrantLock是独占锁,所以实现了tryAcquire-tryRelease。

独占锁的核心方法

acquire方法 - ReentrantLock原理

tryAcquire方法实现的是加锁作用。以ReentrantLock的非公平锁为例,下面是tryAcquire方法的调用流程:
image.png
首先是ReentrantLock的lock方法:
image.png
可见,CAS成功则表示获取锁成功,然后调用setExclusiverOwnerThread方法:
image.png
而这个setExclusiverOwnerThread方法,它是AQS的父类AbstractOwnableSynchronizer的方法
image.png
不难看出这个“exclusiveOwnerThread”就是独占锁的占用线程。如果没有竞争一定会独占成功,则加锁成功,如下示意图:
image.png
如果存在竞争,当前线程独占失败就会进入acquire(1):
image.png
其中tryAcquire是再尝试一次获取锁:
image.png
但是,它需要由AQS子类具体实现,下面是ReentrantLock的非公平锁的tryAcquire方法:
image.png
除了再一次尝试,ReentrantLock还再该方法实现了可重入的性质。如果还是获取锁失败,进入到“addWaiter”方法:
image.png
主要的流程如下:

  • 通过当前的线程和锁模式新建一个节点
  • Pred指针指向尾节点Tail
  • 将New中Node的Prev指针指向Pred
  • 通过compareAndSetTail方法,完成尾节点的设置。这个方法主要是对tailOffset和Expect进行比较,如果tailOffset的Node和Expect的Node地址是相同的,那么设置Tail的值为Update的值

当然,如果如果Pred指针是Null(说明等待队列中没有元素),或者当前Pred指针和Tail指向的位置不同(说明被别的线程已经修改),就需要进入前面提到过的Enq的方法(如下图所示的Thread-1),即把当前线程加入CLH队列。
image.png
image.png
接着,进入到acquireQueued:
image.png
当前线程(Thread-1)进入 acquireQueued 逻辑:

  1. acquireQueued 会在一个死循环中不断尝试获得锁,失败后进入 park 阻塞
  2. 如果自己是紧邻着 head(排第二位),那么再次 tryAcquire 尝试获取锁,当然这时 state 仍为 1,失败
  3. 进入 shouldParkAfterFailedAcquire 逻辑,将前驱 node,即 head 的 waitStatus 改为 -1,这次返回 false

image.png
image.png

  1. shouldParkAfterFailedAcquire 执行完毕回到 acquireQueued ,再次 tryAcquire 尝试获取锁,当然这时 state 仍为 1,失败
  2. 当再次进入 shouldParkAfterFailedAcquire 时,这时因为其前驱 node 的 waitStatus 已经是 -1,这次返回 true
  3. 进入 parkAndCheckInterrupt, Thread-1 park(灰色表示)

image.png
再次有多个线程经历上述过程竞争失败,变成这个样子:
image.png

release方法 - ReentrantLock原理

接上图,如果Thread-0这时候释放锁,即调用unlock:
image.png
进入release方法:
image.png
首先tryRelease:
image.png
如果成功了,设置“exclusiveOwnerThread”为null,state为0(因为没有重入,所以就会导致“getState() - releases=0”):
image.png
回到releas方法,当前队列不为null,且 head 的 waitStatus = -1,进入unparkSuccessor 流程找到队列中离 head 最近的一个 Node(没取消的),unpark 恢复其运行,本例中即为 Thread-1回到 Thread-1 的 acquireQueued 流程:
如果加锁成功(没有竞争),会设置:

  • exclusiveOwnerThread 为 Thread-1,state = 1
  • head 指向刚刚 Thread-1 所在的 Node,该 Node 清空 Thread
  • 原本的 head 因为从链表断开,而可被垃圾回收

image.png
如果和其它线程竞争(非公平锁)又失败了,又要回到CLH队列去了:
image.png

共享锁的核心方法(todo)

acquireShared方法 - ReadLock原理

以ReentrantReadWriteLock中的读锁ReadLock为例,他就是共享锁,下面是下的lock方法:
image.png
调用了核心方法acquireShared:
image.png

注意: tryAcquireShared 返回负数, 表示获取读锁失败

下面调用读写锁同步器的tryAcquireShared方法:

  1. // Sync 继承过来的方法, 方便阅读, 放在此处
  2. protected final int tryAcquireShared(int unused) {
  3. Thread current = Thread.currentThread();
  4. int c = getState();
  5. // 如果是其它线程持有写锁, 获取读锁失败
  6. if ( exclusiveCount(c) != 0 &&
  7. getExclusiveOwnerThread() != current) {
  8. return -1;
  9. }
  10. int r = sharedCount(c);
  11. if ( // 读锁不该阻塞(如果老二是写锁,读锁该阻塞), 并且
  12. !readerShouldBlock() &&
  13. // 小于读锁计数, 并且
  14. r < MAX_COUNT &&
  15. // 尝试增加计数成功
  16. compareAndSetState(c, c + SHARED_UNIT)
  17. ) {
  18. // ... 省略不重要的代码
  19. return 1;
  20. }
  21. return fullTryAcquireShared(current);
  22. }

从这里其实就可以看出共享锁了,这段代码没有判断是否已经上了读锁(共享锁),只是判断是否上了写锁(独占锁)

如果获取读锁失败,进入doAcquireShared方法:

  1. private void doAcquireShared(int arg) {
  2. // 将当前线程关联到一个 Node 对象上, 模式为共享模式
  3. final Node node = addWaiter(Node.SHARED);
  4. boolean failed = true;
  5. try {
  6. boolean interrupted = false;
  7. for (;;) {
  8. final Node p = node.predecessor();
  9. if (p == head) {
  10. // 再一次尝试获取读锁
  11. int r = tryAcquireShared(arg);
  12. if (r >= 0) {
  13. // r 表示可用资源数, 在这里总是 1 允许传播,(唤醒 AQS 中下一个 Share 节点)
  14. setHeadAndPropagate(node, r);
  15. p.next = null; // help GC
  16. if (interrupted)
  17. selfInterrupt();
  18. failed = false;
  19. return;
  20. }
  21. }
  22. // 是否在获取读锁失败时阻塞(前一个阶段 waitStatus == Node.SIGNAL)
  23. if (shouldParkAfterFailedAcquire(p, node) &&
  24. // park 当前线程
  25. parkAndCheckInterrupt())
  26. interrupted = true;
  27. }
  28. } finally {
  29. if (failed)
  30. cancelAcquire(node);
  31. }
  32. }

其余逻辑不写了,贼jb多

releaseShared方法 - ReadLock原理

image.png
image.png

  1. protected final boolean tryReleaseShared(int unused) {
  2. Thread current = Thread.currentThread();
  3. // ... 省略不重要的代码
  4. for (;;) {
  5. int c = getState();
  6. int nextc = c - SHARED_UNIT;
  7. if (compareAndSetState(c, nextc))
  8. // 读锁的计数不会影响其它获取读锁线程, 但会影响其它获取写锁线程
  9. // 计数为 0 才是真正释放
  10. return nextc == 0;
  11. }
  12. }
  1. private void doReleaseShared() {
  2. // 如果 head.waitStatus == Node.SIGNAL ==> 0 成功, 下一个节点 unpark
  3. // 如果 head.waitStatus == 0 ==> Node.PROPAGATE
  4. for (;;) {
  5. Node h = head;
  6. if (h != null && h != tail) {
  7. int ws = h.waitStatus;
  8. // 如果有其它线程也在释放读锁,那么需要将 waitStatus 先改为 0
  9. // 防止 unparkSuccessor 被多次执行
  10. if (ws == Node.SIGNAL) {
  11. if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
  12. continue; // loop to recheck cases
  13. unparkSuccessor(h);
  14. }
  15. else if (ws == 0 &&
  16. !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
  17. continue; // loop on failed CAS
  18. }
  19. if (h == head) // loop if head changed
  20. break;
  21. }
  22. }