https://zhuanlan.zhihu.com/p/178991617

一、概述

AQS是AbstractQueuedSynchronizer类的简称,虽然我们不会直接使用这个类,但是这个类是Java很多并发工具的功能实现提供支撑。
image.png
从下图可看出,我们常用的ReentrantLock, Semaphore, CountDownLatch都依赖这个类实现。

二、深入源码

  1. /**
  2. * 可以看到AbstractQueuedSynchronizer是一个抽象类
  3. * 实现了Serializable 接口
  4. * @since 1.5
  5. * @author Doug Lea
  6. */
  7. public abstract class AbstractQueuedSynchronizer
  8. extends AbstractOwnableSynchronizer
  9. implements java.io.Serializable {
  10. /**
  11. * The synchronization state.
  12. * state变量表示锁的状态
  13. * 0 表示未锁定
  14. * 大于0表示已锁定
  15. * 需要注意的是,这个值可以用来实现锁的【可重入性】,例如 state=3 就表示锁被同一个线程获取了3次,想要完全解锁,必须要对应的解锁3次
  16. * 同时这个变量还是用volatile关键字修饰的,保证可见性
  17. */
  18. private volatile int state;
  19. /**
  20. * 等待队列的头节点,只能通过setHead方法修改
  21. * 如果head存在,能保证waitStatus状态不为CANCELLED
  22. */
  23. private transient volatile Node head;
  24. /**
  25. * 等待队列的尾结点,只能通过enq方法来添加新的等待节点
  26. */
  27. private transient volatile Node tail;
  28. }

AbstractQueuedSynchronizer从名字上就可看出本质是一个队列(Queue),其内部维护着FIFO的双向队列,也就是CLH(Craig, Landin, and Hagersten)队列。
这个队列中的每一个元素都是一个Node,所以接下来了解一下其内部类Node,内部类Node的定义如下

  1. static final class Node {
  2. // 节点正在共享模式下等待的标记
  3. static final Node SHARED = new Node();
  4. // 节点正在以独占模式等待的标记
  5. static final Node EXCLUSIVE = null;
  6. // waitStatus变量的可选值,因为超时或者或者被中断,节点会被设置成取消状态。被取消的节点不会参与锁竞争,状态也不会再改变
  7. static final int CANCELLED = 1;
  8. // waitStatus变量的可选值,表示后继节点处于等待状态,如果当前节点释放了锁或者被取消,会通知后继节点去运行
  9. static final int SIGNAL = -1;
  10. // waitStatus变量的可选值,表示节点处于condition队列中,正在等待被唤醒
  11. static final int CONDITION = -2;
  12. // waitStatus变量的可选值,下一次acquireShared应该无条件传播
  13. static final int PROPAGATE = -3;
  14. // 节点的等待状态
  15. volatile int waitStatus;
  16. // 前驱节点
  17. volatile Node prev;
  18. // 后继节点
  19. volatile Node next;
  20. // 获取同步状态的线程
  21. volatile Thread thread;
  22. // 下一个condition队列等待节点
  23. Node nextWaiter;
  24. // 是否是共享模式
  25. final boolean isShared() {
  26. return nextWaiter == SHARED;
  27. }
  28. // 返回前驱节点或者抛出异常
  29. final Node predecessor() throws NullPointerException {
  30. Node p = prev;
  31. if (p == null)
  32. throw new NullPointerException();
  33. else
  34. return p;
  35. }
  36. Node() { // Used to establish initial head or SHARED marker
  37. }
  38. Node(Thread thread, Node mode) { // Used by addWaiter
  39. this.nextWaiter = mode;
  40. this.thread = thread;
  41. }
  42. Node(Thread thread, int waitStatus) { // Used by Condition
  43. this.waitStatus = waitStatus;
  44. this.thread = thread;
  45. }
  46. }

再来直观看一下AQS的基本结构
image.png

2.1 核心方法

我们都知道CountDownLatch、CyclicBarrier、Semaphore、ReentrantLock这些工具类中,有的只支持独占,如ReentrantLock#lock(),有的支持共享,多个线程同时执行,如Semaphore。并且,从前文Node类的定义也可以看到

  1. // 节点正在共享模式下等待的标记
  2. static final Node SHARED = new Node();
  3. // 节点正在以独占模式等待的标记
  4. static final Node EXCLUSIVE = null;

AQS实现了两套加锁解锁的方式,那就是独占式共享式。我们先看下独占式的实现,独占式的实现,就从ReentrantLock#lock()方法开始。

ReentrantLock#lock()

该方法体如下

  1. public void lock() {
  2. sync.lock();
  3. }

其中sync是AbstractQueuedSynchronizer的实现,我们知道,ReentrantLock支持公平锁和非公平锁,其实现类分别是FairSync和NonfairSync,我们看看公平锁和非公平锁分别是怎么实现的。

  1. // FairSync 公平锁的实现
  2. final void lock() {
  3. acquire(1);
  4. }
  5. // NonfairSync 非公平锁的实现
  6. final void lock() {
  7. if (compareAndSetState(0, 1))
  8. setExclusiveOwnerThread(Thread.currentThread());
  9. else
  10. acquire(1);
  11. }

可以看到,非公平锁的实现仅仅是多了一个步骤:通过CAS的方式(compareAndSetState)尝试改变state的状态,修改成功后设置当前线程以独占的方式获取了锁,不会去判断等待队列中在当前线程之前是否还有等待获取锁的线程,所以是“不公平”的,修改失败执行的逻辑和公平锁一样。
这就是公平锁和非公平锁的本质区别
从这段代码中可以看到,独占锁加锁的核心逻辑就是acquire方法,接下来就看看这个方法

AbstractQueuedSynchronizer#acquire()

方法体如下:

  1. public final void acquire(int arg) {
  2. if (!tryAcquire(arg) &&
  3. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  4. selfInterrupt();
  5. }

该方法主要调用tryAcquire方法尝试获取锁,成功返回true,失败就将线程封装成Node对象,放入队列。

FairSync#tryAcquire()

tryAcquire方法在AQS中并没有直接实现,而是采用模板方法的设计模式,交给子类去实现。我们来看公平锁的实现。

  1. protected final boolean tryAcquire(int acquires) {
  2. // 当前线程
  3. final Thread current = Thread.currentThread();
  4. // 获取state状态,0表示未锁定,大于1表示重入
  5. int c = getState();
  6. if (c == 0) {
  7. // 表示没有线程获取锁
  8. if (!hasQueuedPredecessors() &&
  9. compareAndSetState(0, acquires)) {
  10. // 没有比当前线程等待更久的线程了,通过CAS的方式修改state
  11. // 成功之后,设置当前拥有独占访问权的线程
  12. setExclusiveOwnerThread(current);
  13. return true;
  14. }
  15. }
  16. else if (current == getExclusiveOwnerThread()) {
  17. // 独占访问权的线程就是当前线程,重入
  18. // 此处就是【可重入性】的实现
  19. int nextc = c + acquires;
  20. if (nextc < 0)
  21. throw new Error("Maximum lock count exceeded");
  22. // 直接修改state
  23. setState(nextc);
  24. return true;
  25. }
  26. return false;
  27. }

可以看到该方法就是以独占的方式获取锁,获取成功后返回true。从这个方法可以看出state变量是实现可重入性的关键。
非公平锁的实现方式大同小异,感兴趣的同学可以自行阅读源码。
acquire方法除了调用tryAcquire,还调用了acquireQueued(addWaiter(Node.EXCLUSIVE), arg),这里分为两步,先看下addWaiter方法。

AbstractQueuedSynchronizer#addWaiter

  1. /**
  2. * Creates and enqueues node for current thread and given mode.
  3. * 为当前线程和给定模式创建并排队节点,给的的模式分为:
  4. * 1、Node.EXCLUSIVE:独占模式
  5. * 2、Node.SHARED:共享模式
  6. *
  7. * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
  8. */
  9. private Node addWaiter(Node mode) {
  10. // 创建Node节点
  11. Node node = new Node(Thread.currentThread(), mode);
  12. // Try the fast path of enq; backup to full enq on failure
  13. // 尝试快速添加尾结点,失败就执行enq方法
  14. Node pred = tail;
  15. if (pred != null) {
  16. node.prev = pred;
  17. // CAS的方式设置尾结点
  18. if (compareAndSetTail(pred, node)) {
  19. pred.next = node;
  20. return node;
  21. }
  22. }
  23. // 快速添加失败,执行该方法
  24. enq(node);
  25. return node;
  26. }

如果前面添加失败,则enq()方法保证添加成功

  1. /**
  2. * Inserts node into queue, initializing if necessary. See picture above.
  3. * 将节点插入队列,必要时进行初始化
  4. *
  5. * @param node the node to insert
  6. * @return node's predecessor
  7. */
  8. private Node enq(final Node node) {
  9. for (;;) {
  10. // 自旋
  11. Node t = tail;
  12. if (t == null) { // Must initialize
  13. // 尾结点为空,队列还没有进行初始化
  14. // 设置头节点
  15. if (compareAndSetHead(new Node()))
  16. tail = head;
  17. } else {
  18. node.prev = t;
  19. // CAS的方式设置尾结点,失败就进入下次循环
  20. // 也就是【自旋 + CAS】的方式保证设置成功
  21. if (compareAndSetTail(t, node)) {
  22. t.next = node;
  23. return t;
  24. }
  25. }
  26. }
  27. }

可以看到该方法就是用来往队列尾部插入一个新的节点,通过自旋 + CAS的方式保证线程安全插入成功
需要注意的是,该方法返回的Node节点不是新插入的节点,而是新插入节点的前驱节点。

AbstractQueuedSynchronizer#acquireQueued

  1. /**
  2. * Acquires in exclusive uninterruptible mode for thread already in
  3. * queue. Used by condition wait methods as well as acquire.
  4. *
  5. */
  6. final boolean acquireQueued(final Node node, int arg) {
  7. // 操作是否成功
  8. boolean failed = true;
  9. try {
  10. boolean interrupted = false;
  11. for (;;) {
  12. // 自旋
  13. // 获取当前节点的前驱节点
  14. final Node p = node.predecessor();
  15. if (p == head && tryAcquire(arg)) {
  16. // 前驱节点是头节点,并且已经获取了锁(tryAcquire方法在前文中详细讲解过)
  17. // 就把当前节点设置成头节点(因为前驱节点已经获取了锁,所以前驱节点不用再留在队列)
  18. setHead(node);
  19. p.next = null; // help GC
  20. failed = false;
  21. return interrupted;
  22. }
  23. if (shouldParkAfterFailedAcquire(p, node) &&
  24. parkAndCheckInterrupt())
  25. // 如果前驱节点不是头节点或者没有获取锁
  26. // shouldParkAfterFailedAcquire方法用于判断当前线程是否需要被阻塞
  27. // parkAndCheckInterrupt方法用于阻塞线程并且检测线程是否被中断
  28. // 没抢到锁的线程需要被阻塞,避免一直去争抢锁,浪费CPU资源
  29. interrupted = true;
  30. }
  31. } finally {
  32. if (failed)
  33. // 自旋异常退出,取消正在进行锁争抢
  34. cancelAcquire(node);
  35. }
  36. }

AbstractQueuedSynchronizer#shouldParkAfterFailedAcquire

用于判断当前线程获取锁失败后是否需要被阻塞

  1. /**
  2. * Checks and updates status for a node that failed to acquire.
  3. * Returns true if thread should block. This is the main signal
  4. * control in all acquire loops. Requires that pred == node.prev.
  5. *
  6. * @param pred node's predecessor holding status
  7. * @param node the node
  8. * @return {@code true} if thread should block
  9. */
  10. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  11. // 获取前驱节点的等待状态
  12. int ws = pred.waitStatus;
  13. if (ws == Node.SIGNAL)
  14. /*
  15. * SIGNAL表示后继节点处于等待状态,如果当前节点释放了锁或者被取消,会通知后继节点去运行
  16. * 所以作为后继节点,node直接返回true,表示需要被阻塞
  17. */
  18. return true;
  19. if (ws > 0) {
  20. /*
  21. * 前驱节点被取消了,需要从队列中移除,并且循环找到下一个不是取消状态的节点
  22. */
  23. do {
  24. node.prev = pred = pred.prev;
  25. } while (pred.waitStatus > 0);
  26. pred.next = node;
  27. } else {
  28. /*
  29. * 通过CAS将前驱节点的status设置成SIGNAL
  30. */
  31. compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
  32. }
  33. return false;
  34. }

AbstractQueuedSynchronizer#parkAndCheckInterrupt

  1. private final boolean parkAndCheckInterrupt() {
  2. // 阻塞当前线程
  3. LockSupport.park(this);
  4. // 检测当前线程是否被中断(该方法会清除中断标识位)
  5. return Thread.interrupted();
  6. }

至此,独占锁的整个加锁过程就已经完成。再来回顾下整个流程

  1. public final void acquire(int arg) {
  2. if (!tryAcquire(arg) &&
  3. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  4. selfInterrupt();
  5. }

首先执行tryAcquire方法用于尝试获取锁,成功后就直接返回,失败后就通过addWaiter方法把当前线程封装成一个Node,加到队列的尾部,再通过acquireQueued方法尝试获取同步锁,成功获取锁的线程的Node节点会被移出队列。
如果以上条件都满足,会执行selfInterrupt方法中断当前线程。
看完了独占锁的加锁,再来看看独占锁的解锁。同样从ReentrantLock入手

ReentrantLock#unlock

  1. public void unlock() {
  2. sync.release(1);
  3. }

我们已经知道了sync是AQS的实现,所以直接查看AQS中的release方法

  1. /**
  2. * Releases in exclusive mode. Implemented by unblocking one or
  3. * more threads if {@link #tryRelease} returns true.
  4. * This method can be used to implement method {@link Lock#unlock}.
  5. *
  6. * @param arg the release argument. This value is conveyed to
  7. * {@link #tryRelease} but is otherwise uninterpreted and
  8. * can represent anything you like.
  9. * @return the value returned from {@link #tryRelease}
  10. */
  11. public final boolean release(int arg) {
  12. if (tryRelease(arg)) {
  13. // 尝试释放锁
  14. Node h = head;
  15. if (h != null && h.waitStatus != 0)
  16. // 头节点已经释放,唤醒后继节点
  17. unparkSuccessor(h);
  18. return true;
  19. }
  20. return false;
  21. }

ReentrantLock.Sync#tryRelease

  1. protected final boolean tryRelease(int releases) {
  2. // 计算剩余的重入次数
  3. int c = getState() - releases;
  4. if (Thread.currentThread() != getExclusiveOwnerThread())
  5. throw new IllegalMonitorStateException();
  6. // 是否完全的释放了锁(针对可重入性)
  7. boolean free = false;
  8. if (c == 0) {
  9. // 表示完全释放了锁
  10. free = true;
  11. // 设置独占锁的持有者为null
  12. setExclusiveOwnerThread(null);
  13. }
  14. // 设置AQS的state
  15. setState(c);
  16. return free;
  17. }

AbstractQueuedSynchronizer#unparkSuccessor

unparkSuccessor方法用于唤醒后继节点,其定义如下

  1. /**
  2. * Wakes up node's successor, if one exists.
  3. *
  4. * @param node the node
  5. */
  6. private void unparkSuccessor(Node node) {
  7. // 获取当前节点的状态
  8. int ws = node.waitStatus;
  9. if (ws < 0)
  10. compareAndSetWaitStatus(node, ws, 0);
  11. Node s = node.next;
  12. if (s == null || s.waitStatus > 0) {
  13. // 当前节点的后继节点为null,或者被取消了
  14. s = null;
  15. for (Node t = tail; t != null && t != node; t = t.prev)
  16. // 从尾结点查找状态不为取消的可用节点
  17. if (t.waitStatus <= 0)
  18. s = t;
  19. }
  20. if (s != null)
  21. // 唤醒后继节点
  22. LockSupport.unpark(s.thread);
  23. }

前文说过AQS实现了两套同步逻辑,也就是独占式共享式。看完了独占式锁的实现,再来看一下共享式。这里以Semaphore为例。

Semaphore#acquire

该方法是作用是请求一个许可,如果暂时没有可用的许可,则被阻塞,等待将来的某个时间被唤醒。因为Semaphore可以允许多个线程同时执行,所以可以看成是共享锁的实现。该方法定义如下

  1. public void acquire() throws InterruptedException {
  2. sync.acquireSharedInterruptibly(1);
  3. }

sync是AQS的实现,可以看到acquire方法底层调用的是acquireSharedInterruptibly方法。
在JDK中,与锁相关的方法,Interruptibly表示可中断,也就是可中断锁。可中断锁的意思是线程在等待获取锁的过程中可以被中断,换言之,线程在等待锁的过程中可以响应中断
接下来看看acquireSharedInterruptibly方法的实现

AbstractQueuedSynchronizer#acquireSharedInterruptibly

  1. public final void acquireSharedInterruptibly(int arg)
  2. throws InterruptedException {
  3. if (Thread.interrupted())
  4. // 检测线程的中断中断状态,如果已经被中断了,就响应中断
  5. // 该方法会清除线程的中断标识位
  6. throw new InterruptedException();
  7. if (tryAcquireShared(arg) < 0)
  8. doAcquireSharedInterruptibly(arg);
  9. }

Semaphore.FairSync#tryAcquireShared

tryAcquireShared方法,相信大家已经能看出来,这里使用了模板方法模式,具体实现由子类去实现。Semaphore也实现了公平模式和非公平模式。公平的方式和非公平的方式实现逻辑大同小异。所以具体看下公平模式下的实现方式

  1. protected int tryAcquireShared(int acquires) {
  2. for (;;) {
  3. // 自旋
  4. if (hasQueuedPredecessors())
  5. // 如果有线程排在自己的前面(公平锁排队),直接返回
  6. return -1;
  7. // 获取同步状态的值
  8. int available = getState();
  9. // 可用的(许可)减去申请的,等于剩余的
  10. int remaining = available - acquires;
  11. if (remaining < 0 ||
  12. compareAndSetState(available, remaining))
  13. // 如果剩余的小于0,或者设置状态成功,就返回,如果设置失败,则进入下一次循环
  14. // 如果剩余小于0,返回负数,表示失败
  15. // 如果设置状态成功,表示申请许可成功,返回正数
  16. return remaining;
  17. }
  18. }

此处还是自旋 + CAS的方式保证线程安全和设置成功。

AbstractQueuedSynchronizer#doAcquireSharedInterruptibly

方法定义如下

  1. /**
  2. * Acquires in shared interruptible mode.
  3. * 在共享可中断模式下请求(许可)
  4. */
  5. private void doAcquireSharedInterruptibly(int arg)
  6. throws InterruptedException {
  7. // 为当前线程和给定模式创建节点并插入队列尾部,addWaiter方法前文讲解过
  8. final Node node = addWaiter(Node.SHARED);
  9. // 操作是否失败
  10. boolean failed = true;
  11. try {
  12. for (;;) {
  13. // 自旋
  14. // 获取当前节点的前驱节点
  15. final Node p = node.predecessor();
  16. if (p == head) {
  17. // 如果前驱节点是头节点,以共享的方式请求获取锁,tryAcquireShared方法前文讲解过
  18. int r = tryAcquireShared(arg);
  19. if (r >= 0) {
  20. // 成功获取锁,设置头节点和共享模式传播
  21. setHeadAndPropagate(node, r);
  22. p.next = null; // help GC
  23. failed = false;
  24. return;
  25. }
  26. }
  27. if (shouldParkAfterFailedAcquire(p, node) &&
  28. parkAndCheckInterrupt())
  29. // 如果前驱节点不是头节点或者没有获取锁
  30. // shouldParkAfterFailedAcquire方法用于判断当前线程是否需要被阻塞,该方法前文讲解过
  31. // parkAndCheckInterrupt方法用于阻塞线程并且检测线程是否被中断,该方法前文讲解过
  32. // 没抢到锁的线程需要被阻塞,避免一直去争抢锁,浪费CPU资源
  33. throw new InterruptedException();
  34. }
  35. } finally {
  36. if (failed)
  37. // 自旋异常退出,取消正在进行锁争抢
  38. cancelAcquire(node);
  39. }
  40. }

加锁的逻辑已经完成,再来看看解锁的逻辑。

Semaphore#release

release用于释放许可,其方法定义如下

  1. public void release() {
  2. sync.releaseShared(1);
  3. }

AbstractQueuedSynchronizer#releaseShared

  1. public final boolean releaseShared(int arg) {
  2. if (tryReleaseShared(arg)) {
  3. doReleaseShared();
  4. return true;
  5. }
  6. return false;
  7. }

Semaphore.Sync#tryReleaseShared

  1. protected final boolean tryReleaseShared(int releases) {
  2. for (;;) {
  3. // 自旋
  4. // 获取同步状态的值
  5. int current = getState();
  6. // 可用的(许可)加上释放的,等于剩余的
  7. int next = current + releases;
  8. if (next < current) // overflow
  9. throw new Error("Maximum permit count exceeded");
  10. if (compareAndSetState(current, next))
  11. // CAS的方式设置同步状态
  12. return true;
  13. }
  14. }

可以看到此处依旧是自旋 + CAS的操作

AbstractQueuedSynchronizer#doReleaseShared

  1. /**
  2. * Release action for shared mode -- signals successor and ensures
  3. * propagation. (Note: For exclusive mode, release just amounts
  4. * to calling unparkSuccessor of head if it needs signal.)
  5. */
  6. private void doReleaseShared() {
  7. /*
  8. * Ensure that a release propagates, even if there are other
  9. * in-progress acquires/releases. This proceeds in the usual
  10. * way of trying to unparkSuccessor of head if it needs
  11. * signal. But if it does not, status is set to PROPAGATE to
  12. * ensure that upon release, propagation continues.
  13. * Additionally, we must loop in case a new node is added
  14. * while we are doing this. Also, unlike other uses of
  15. * unparkSuccessor, we need to know if CAS to reset status
  16. * fails, if so rechecking.
  17. */
  18. for (;;) {
  19. // 自旋
  20. // 记录头节点
  21. Node h = head;
  22. if (h != null && h != tail) {
  23. // 头节点不为null,且不等于尾结点,说明队列中还有节点
  24. // 获取头节点等待状态
  25. int ws = h.waitStatus;
  26. if (ws == Node.SIGNAL) {
  27. // 头节点等待状态是SIGNAL
  28. if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
  29. // 如果修改节点等待状态失败,进入下一次循环
  30. continue; // loop to recheck cases
  31. // 修改成功后,唤醒后继节点,unparkSuccessor前文讲过
  32. unparkSuccessor(h);
  33. }
  34. else if (ws == 0 &&
  35. !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
  36. continue; // loop on failed CAS
  37. }
  38. if (h == head) // loop if head changed
  39. break;
  40. }
  41. }

总结

AQS可以说是整个并发编程中最难的一个类。但是理解AQS的实现却非常重要,因为它是JDK中和其他同步工具实现的基础。