这篇文章来看下AQS的源码,是前面很多同步工具类的一个核心,都是基于AQS实现的。 本文基于 JDK 1.8 (JDK 1.9 中会有个 addWaiter 里的 VarHandle 改进)

简介

AQS,全名 AbstractQueuedSynchronizer ,AQS队列又称CLH队列,其核心在于一个属性 state 和 一个双向队列。Java中的大部分同步类(Lock、Semaphore、ReentrantLock等)都是基于 AbstractQueuedSynchronizer 实现的。AQS是一种提供了原子式管理同步状态、阻塞和唤醒线程功能以及队列模型的简单框架。

image.png

从ReentrantLock看AQS

还记得我们提到过的新同步工具类 ReentrantLock 吗,它以其可重入锁以及其它附带的强大的功能,如 tryLock() 和 lockInterruptibly(),公平锁等出现在我们眼前。我可能也提到过 ReentrantLock 的加锁实现原理是 CAS,那你知道它具体是怎么实现的吗?

  1. public int put(){
  2. try {
  3. lock.lock();
  4. //判断如果满了就不再生产
  5. while (count == length) {
  6. producer.await();
  7. }
  8. count++;
  9. //唤醒消费者
  10. consumer.signalAll();
  11. }catch (InterruptedException e){
  12. e.printStackTrace();
  13. }finally {
  14. lock.unlock();
  15. }
  16. return count;
  17. }

先上一段代码,上面这个是生产者消费者例子模拟的put方法,这里使用了 ReentrantLock 实现同步。

  1. public class ReentrantLock implements Lock, java.io.Serializable {
  2. private final Sync sync;
  3. public void lock() {
  4. sync.lock();
  5. }
  6. //......
  7. }

当程序执行到 lock.lock() 时会跑入 ReentrantLock 的 lock() 方法,这个方法调用了 sync.lock()。Reentrant 内有一个 Sync 类常量,使用的就是这个类的 lock() 方法。

  1. public class ReentrantLock implements Lock, java.io.Serializable {
  2. abstract static class Sync extends AbstractQueuedSynchronizer {
  3. private static final long serialVersionUID = -5179523762034025860L;
  4. /**
  5. * Performs {@link Lock#lock}. The main reason for subclassing
  6. * is to allow fast path for nonfair version.
  7. */
  8. abstract void lock();
  9. //......
  10. }
  11. /**
  12. * Sync object for non-fair locks
  13. */
  14. static final class NonfairSync extends Sync {
  15. private static final long serialVersionUID = 7316153563782823691L;
  16. /**
  17. * Performs lock. Try immediate barge, backing up to normal
  18. * acquire on failure.
  19. */
  20. final void lock() {
  21. //使用CAS操作对state进行赋值为1,如果成功,则获得该锁,当前线程获得执行权
  22. if (compareAndSetState(0, 1))
  23. setExclusiveOwnerThread(Thread.currentThread());
  24. else
  25. //CAS操作失败,则调用acquire()方法
  26. acquire(1);
  27. }
  28. protected final boolean tryAcquire(int acquires) {
  29. return nonfairTryAcquire(acquires);
  30. }
  31. }
  32. /**
  33. * Sync object for fair locks
  34. */
  35. static final class FairSync extends Sync {
  36. private static final long serialVersionUID = -3000897897090466540L;
  37. final void lock() {
  38. //直接调用acquire()方法
  39. acquire(1);
  40. }
  41. /**
  42. * Fair version of tryAcquire. Don't grant access unless
  43. * recursive call or no waiters or is first.
  44. */
  45. protected final boolean tryAcquire(int acquires) {
  46. final Thread current = Thread.currentThread();
  47. int c = getState();
  48. if (c == 0) {
  49. if (!hasQueuedPredecessors() &&
  50. compareAndSetState(0, acquires)) {
  51. setExclusiveOwnerThread(current);
  52. return true;
  53. }
  54. }
  55. else if (current == getExclusiveOwnerThread()) {
  56. int nextc = c + acquires;
  57. if (nextc < 0)
  58. throw new Error("Maximum lock count exceeded");
  59. setState(nextc);
  60. return true;
  61. }
  62. return false;
  63. }
  64. }
  65. //......
  66. }

这个 Sync 类是一个内部类,lock() 方法是个抽象方法,同时 ReentrantLock 提供了两个该类的子类 NonfairSync 和 FairSync 实现了该方法,即不公平锁和公平锁。

我们前面也提到过,ReentrantLock 的默认实现是不公平锁,我们可以从 ReentrantLock 的构造器看出来当 fair 为 true 时会构造 FairSync 给到我们那个 Sync 属性,否则就是 NonfairSync 。

先看默认实现的不公平锁 NonfairSync,可以看到,其实现的 lock() 方法里加锁的操作是 compareAndSetState(0, 1),即我们熟悉的 CAS 操作。这里是对 AQS 内部的一个属性 state 进行赋值1操作,如果成功了,即 state 从0变成1,即当前线程获得锁成功,然后执行 setExclusiveOwnerThread(Thread.``_currentThread()_``) 方法占据执行权。如果 CAS 操作失败,则该锁已经被其他线程获取了,就执行 acquire(1) 方法。
对于公平锁来说,则会直接执行 acquire(1) 方法

  1. //AQS
  2. public final void acquire(int arg) {
  3. if (!tryAcquire(arg) &&
  4. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  5. selfInterrupt();
  6. }
  7. //NonfairSync
  8. protected final boolean tryAcquire(int acquires) {
  9. return nonfairTryAcquire(acquires);
  10. }
  11. //NonfairSync
  12. final boolean nonfairTryAcquire(int acquires) {
  13. final Thread current = Thread.currentThread();
  14. int c = getState();
  15. //判断 state 是否为0,即是否没被获得锁
  16. if (c == 0) {
  17. //进行CAS操作
  18. if (compareAndSetState(0, acquires)) {
  19. //成功,设置锁的Owner
  20. setExclusiveOwnerThread(current);
  21. return true;
  22. }
  23. }//state 不为0,已被占用,判断是不是自己的锁
  24. else if (current == getExclusiveOwnerThread()) {
  25. //如果是,重入
  26. int nextc = c + acquires;
  27. if (nextc < 0) // overflow
  28. throw new Error("Maximum lock count exceeded");
  29. setState(nextc);
  30. return true;
  31. }
  32. return false;
  33. }

当 CAS 操作失败时,即调用父父类 AQS 的 acquire() 方法,该方法里又执行了 tryAcquire() 方法,该方法被 NonfairSync 实现了,其调用了该类的 nonfairTryAcquire() 。该方法里会再次进行一次 CAS 操作,如果 state 以被占用了,则判断是不是自己的,如果是就重入,不是就获取失败。
实际上,如果第一次获取锁失败,会直接跳到 acquireQueued() 方法,因为 tryAcquire() 方法实际上是对拿到锁之后的操作。

  1. /**
  2. * acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
  3. * static final Node EXCLUSIVE = null;
  4. * AQS
  5. */
  6. private Node addWaiter(Node mode) {
  7. Node node = new Node(Thread.currentThread(), mode);
  8. // Try the fast path of enq; backup to full enq on failure
  9. //这里实际上是把当前线程保存到Node中,然后把Node加入到AQS的队尾
  10. Node pred = tail;
  11. if (pred != null) {
  12. node.prev = pred;
  13. //仍然使用的CAS操作添加
  14. if (compareAndSetTail(pred, node)) {
  15. pred.next = node;
  16. return node;
  17. }
  18. }
  19. enq(node);
  20. return node;
  21. }
  22. /**
  23. * 该方法也是将结点插入到队尾中
  24. */
  25. private Node enq(final Node node) {
  26. for (;;) {
  27. Node t = tail;
  28. if (t == null) { // Must initialize
  29. if (compareAndSetHead(new Node()))
  30. tail = head;
  31. } else {
  32. node.prev = t;
  33. if (compareAndSetTail(t, node)) {
  34. t.next = node;
  35. return t;
  36. }
  37. }
  38. }
  39. }
  40. //AQS
  41. final boolean acquireQueued(final Node node, int arg) {
  42. boolean failed = true;
  43. try {
  44. boolean interrupted = false;
  45. for (;;) {
  46. //获取该结点前一个结点
  47. final Node p = node.predecessor();
  48. //如果是头结点,进行获得锁的尝试
  49. if (p == head && tryAcquire(arg)) {
  50. setHead(node);
  51. p.next = null; // help GC
  52. failed = false;
  53. return interrupted;
  54. }
  55. //如果不是头结点,检查和更新结点的状态
  56. if (shouldParkAfterFailedAcquire(p, node) &&
  57. parkAndCheckInterrupt())
  58. interrupted = true;
  59. }
  60. } finally {
  61. if (failed)
  62. cancelAcquire(node);
  63. }
  64. }

上面代码的流程是,调用 addWaiter() 将当前结点以排他锁的形式添加进队列中,然后循环执行检查队列中的结点的状态,并且给对头尝试 CAS 获得锁。这是一个死循环,只有当当前结点获得锁了才会退出。

到了这里,我们大概知道了 ReentrantLock 的一些实现原理了,也看到了我们的主角 AQS 的身影。最主要的就是 Sync 类继承自 AbstractQueuedSynchronizer 类,而所有主要的线程竞争等核心方法都在该类里面。还有就是前面提到的一个 state 属性,在各种子类中,这个 state 被用于各种不同的用途,而之前说的各种同步工具也都是自己实现了一套关于 state 的操作。

AQS

AQS核心思想是,如果被请求的共享资源空闲,那么就将当前请求资源的线程设置为有效的工作线程,将共享资源设置为锁定状态;如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是CLH队列的变体实现的,将暂时获取不到锁的线程加入到队列中。

CLH:Craig、Landin and Hagersten队列,是单向链表,AQS中的队列是CLH变体的虚拟双向队列(FIFO),AQS是通过将每条请求共享资源的线程封装成一个节点来实现锁的分配。

重要属性

  1. public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
  2. implements java.io.Serializable {
  3. static final class Node {
  4. //共享锁
  5. static final Node SHARED = new Node();
  6. //排他锁
  7. static final Node EXCLUSIVE = null;
  8. //表示线程获取锁的请求已经取消了
  9. static final int CANCELLED = 1;
  10. //表示线程已经准备好了,就等资源释放了
  11. static final int SIGNAL = -1;
  12. //表示节点在等待队列中,节点线程等待唤醒
  13. static final int CONDITION = -2;
  14. //当前线程处在SHARED情况下,该字段才会使用
  15. static final int PROPAGATE = -3;
  16. //当前线程的状态,取上面的那些值
  17. volatile int waitStatus;
  18. //前一个结点的引用
  19. volatile Node prev;
  20. //后一个结点的引用
  21. volatile Node next;
  22. //线程
  23. volatile Thread thread;
  24. //指向下一个处于CONDITION状态的节点
  25. Node nextWaiter;
  26. //......
  27. }
  28. //对头
  29. private transient volatile Node head;
  30. //队尾
  31. private transient volatile Node tail;
  32. private volatile int state;
  33. //......
  34. }

AQS - 图2

这个图也可以看出 AQS 的一个数据结构,其实现的是一个双向队列,Node 就是上面的每个结点,结点内保存的是线程 thread。
此外,AQS 维护了一个 volatile 修饰的 int 类型成员变量 state,用于表示同步状态,内部的对线程的加锁等判断都是通过 CAS 操作完成对 state 值的修改。

State

AQS 中的 state 代表的是同步状态,但具体的含义在不同的子类中按照不同的实现赋予了不同的用途,譬如 ReentrantLock 中,state 为0时表示锁空闲,为1时表示锁占用,2、3、4等值时表示锁重入的次数。

  1. protected final int getState() {
  2. return state;
  3. }
  4. protected final void setState(int newState) {
  5. state = newState;
  6. }
  7. protected final boolean compareAndSetState(int expect, int update) {
  8. // See below for intrinsics setup to support this
  9. return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
  10. }

关于 state 的一些方法都很简单,主要的就是这个 compareAndSetState() 方法,使用了 CAS 的操作来给 state 赋值。
此外这些方法都是final的,无法重写它们。

获取资源和释放资源

  1. /**
  2. * 排他锁模式尝试获得锁方法 arg为操作次数
  3. */
  4. protected boolean tryAcquire(int arg) {
  5. throw new UnsupportedOperationException();
  6. }
  7. /**
  8. * 排他锁模式尝试释放锁方法 arg为操作次数
  9. */
  10. protected boolean tryRelease(int arg) {
  11. throw new UnsupportedOperationException();
  12. }
  13. /**
  14. * 共享锁模式尝试获得锁方法 arg为操作次数
  15. */
  16. protected int tryAcquireShared(int arg) {
  17. throw new UnsupportedOperationException();
  18. }
  19. /**
  20. * 共享锁模式尝试释放锁方法 arg为操作次数
  21. */
  22. protected boolean tryReleaseShared(int arg) {
  23. throw new UnsupportedOperationException();
  24. }
  25. /**
  26. * 该线程是否正在独占资源。只有用到Condition才需要去实现它。
  27. */
  28. protected boolean isHeldExclusively() {
  29. throw new UnsupportedOperationException();
  30. }

上面这些方法都是钩子方法,提供对自定义同步器实现的方法。自定义同步器的方法可以通过实现相关的方法来修改 state 字段来实现自己的多线程模式。

  1. /**
  2. * 排他锁获得锁,忽略中断
  3. */
  4. public final void acquire(int arg) {}
  5. /**
  6. * 排他锁获得锁,中断即中止
  7. */
  8. public final void acquireInterruptibly(int arg) {}
  9. /**
  10. * 排他锁尝试获得锁,中断即中止,如果给定超时超时即失败
  11. */
  12. public final boolean tryAcquireNanos(int arg, long nanosTimeout) {}
  13. /**
  14. * 排他锁释放锁
  15. */
  16. public final boolean release(int arg) {}
  17. /**
  18. * 共享锁获得锁,忽略中断
  19. */
  20. public final void acquireShared(int arg) {}
  21. /**
  22. * 共享锁获得锁,中断即中止
  23. */
  24. public final void acquireSharedInterruptibly(int arg) {}
  25. /**
  26. * 共享锁尝试获得锁,中断即中止,如果给定超时超时即失败
  27. */
  28. public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) {}
  29. /**
  30. * 共享释放锁
  31. */
  32. public final boolean releaseShared(int arg) {}

上面是一些和获取锁资源以及释放锁资源相关的主要API,也是对外开放操作的主要API。这部分方法也是不可重写的,保证了 AQS 的正常运作。

AQS里的设计模式

  1. public final void acquire(int arg) {
  2. if (!tryAcquire(arg) &&
  3. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  4. selfInterrupt();
  5. }
  6. protected boolean tryAcquire(int arg) {
  7. throw new UnsupportedOperationException();
  8. }

我们回头来看看这两个方法,上面 ReentrantLock 里我们在不公平锁和公平锁的 lock() 方法里都有发现这个 acquirFe() 方法,而这个方法来自父类 AQS,但是其调用的 tryAcquire() 方法却只是抛出了个 UnsupportedOperationException异常,是不是很奇怪?以及上面提到的多个需要实现的钩子方法都是抛出异常,钩子方法又是什么呢?

但其实你可以发现在我们的子类 NonfairSync和 FairSync里其实重写了这个 tryAcquire() 方法。在 AQS 中还有很多譬如 tryRelease() 、 isHeldExclusively() 等方法都是直接抛出个异常的,这种抛出个异常其实是说明不支持直接使用 AQS 的这些方法,需要重写。而这种做法就是一种设计模式,叫做模板方法模式。在抽象父类中定义了一个模板方法,譬如这里的 acquire() ,然后定义一些钩子方法,这些方法可以是抽象方法,也可以是有实现的方法。子类想要使用这个模板方法的时候,必须重写那些钩子方法,来实现自己的个性化的功能逻辑。关于模板方法模式在这里不多说。

AQS性能

AQS 在上面 ReentrantLock 里也分析了加锁的一整个流程是怎么样的,不难发现,AQS 加锁全都是围绕双向队列和state属性来操作的,而且效率很高。为什么呢?你可以从上面的代码中看到,AQS 其实使用了大量的 CAS 操作,其核心都是围绕 CAS 操作来运作的,主要在于两个方面:

  1. 对 state 的属性进行操作时,使用了 CAS 的方法进行判断锁的状态以及尝试获得锁,比起去操作系统申请锁效率更高;
  2. 对内部的双向队列进行维护的时候,不是监控整个队列,而是直接通过对比队尾这一个结点,仍然采用 CAS 操作,对队尾进行设置,省去了监控整个队列线程安全时需要消耗的加锁性能。

小结

Q:某个线程获取锁失败的后续流程是什么呢?
A:存在某种排队等候机制,线程继续等待,仍然保留获取锁的可能,获取锁流程仍在继续。

Q:既然说到了排队等候机制,那么就一定会有某种队列形成,这样的队列是什么数据结构呢?
A:是CLH变体的FIFO双端队列。

Q:处于排队等候机制中的线程,什么时候可以有机会获取锁呢?
A:当该结点是头结点且未被阻塞的时候

Q:如果处于排队等候机制中的线程一直无法获取锁,需要一直等待么?还是有别的策略来解决这一问题?
A:线程所在节点的状态会变成取消状态,取消状态的节点会从队列中释放

Q:Lock函数通过Acquire方法进行加锁,但是具体是如何加锁的呢?
A:AQS的Acquire会调用tryAcquire方法,tryAcquire由各个自定义同步器实现,通过tryAcquire完成加锁过程。

从ReentrantLock的实现看AQS的原理及应用