1、AQS基本概念

1.1 什么是AQS

AQS全称是AbstractQueuedSynchronizer,翻译过来就是抽象队列同步器,JUC下的一些类,比如Lock接口的实现类ReentrantLock、ReadWriteLock接口的实现类ReentrantReadWriteLock类都是基于AQS实现的,具体就是这些类的源码中加锁或者解锁的方法都是调用的AQS相关的方法实现的。
AbstractQueuedSynchronizer是JUC包下的抽象类,以ReentrantLock为例说明AQS和ReentrantLock之间的关系。ReentrantLock类中有一个抽象类Sync,该抽象类继承了AbstractQueuedSynchronizer,ReentrantLock类中还有两个实现类:NonfairSync和FairSync,这两个类都是继承了抽象类Sync,ReentrantLock类对外提供的lock和unlock等方法,底层都是调用抽象类Sync的两个实现类的实例方法完成的。如下图所示:
AQS和ReentrantLock的关系.png

1.2 AQS类中的重要属性

1.2.1 AQS类重要方法、内部类、属性

  1. public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
  2. protected AbstractQueuedSynchronizer() {}
  3. // 在队列中阻塞状态的线程会被包装成一个Node实例
  4. static final class Node {...}
  5. // 被抢占锁的状态,线程刚获取锁时,state从0变成1;线程释放锁时,state从1变成0;线程重入锁时,state值+1
  6. private volatile int state;
  7. // 将Node实例加入队列尾部
  8. private Node enq(final Node node) {...}
  9. // 唤醒队列中可以尝试获取锁的线程
  10. private void unparkSuccessor(Node node) {...}
  11. // 尝试释放独占锁,同上
  12. protected boolean tryRelease(int arg) {...}
  13. // 此方法是独占模式下线程获取共享资源的顶层入口,如果获取到资源,线程直接返回,否则进入等待队列,直到获取到资源为止
  14. public final void acquire(int arg) {...}
  15. // 尝试获取独占锁,由继承了AQS的实现类去定义具体的获取锁的过程
  16. protected boolean tryAcquire(int arg) {...}
  17. // 节点入队进入等待阻塞状态,过程中自旋尝试获取锁
  18. final boolean acquireQueued(final Node node, int arg) {...}
  19. // 将当前线程加入到等待队列的队尾,并返回当前线程所在的结点
  20. private Node addWaiter(Node mode) {...}
  21. }

此外,由于AQS类继承了AbstractOwnableSynchronizer抽象类,该类中有一个属性exclusiveOwnerThread,保存当前获得锁的线程。

1.2.2 Node类

Node类是AQS类中的一个内部类,尝试获取锁失败的线程会被放入到一个双向链表中,这些线程会被包装成Node类的实例,Node类源码简要如下:

  1. static final class Node {
  2. // 代表对锁的共享模式
  3. static final Node SHARED = new Node();
  4. // 代表对锁的独占模式
  5. static final Node EXCLUSIVE = null;
  6. // 表示当前结点已取消调度。当timeout或被中断(响应中断的情况下),会触发变更为此状态,进入该状态后的结点将不会再变化。
  7. static final int CANCELLED = 1;
  8. // 表示后继结点在等待前置节点唤醒,只要前置节点释放锁,就会通知标识为 SIGNAL 状态的后续节点的线程
  9. static final int SIGNAL = -1;
  10. // 表示结点等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。
  11. static final int CONDITION = -2;
  12. // 共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点。
  13. static final int PROPAGATE = -3;
  14. // 当前节点的等待状态,初始值为0
  15. volatile int waitStatus;
  16. // 双向链表的上一个节点
  17. volatile Node prev;
  18. // 双向链表的下一个节点
  19. volatile Node next;
  20. // 被包装的线程
  21. volatile Thread thread;
  22. // 表示当前节点对锁的模式
  23. Node nextWaiter;
  24. ...
  25. }

1.3 AQS整体过程(互斥锁)

以线程1先获取到锁,线程2后尝试获取锁失败为例:

  1. 未有线程获取锁时,state值为0;
  2. 线程1调用lock方法尝试获取锁,这个加锁的过程,是用CAS操作将state值从0变为1;
  3. 线程1加锁成功,将当前获取锁的线程(exclusiveOwnerThread)设置成自己;
  4. 此时线程2尝试获取锁,查看state是1,说明已经有线程获取到了锁,此时线程2会先检查是不是自己之前加的锁,如果是则直接重入锁;如果不是,则线程2获取锁失败,此时线程2会被包装成一个Node实例放入等待队列(双向链表实现);
  5. 线程1执行完同步代码后释放锁,将state值减1,如果state值为0,则代表线程1彻底释放掉锁(会有重入锁的情况);
  6. 从等待队列中唤醒线程2;
  7. 线程2重复2、3步骤获取锁。

如下图所示:
AQS整体流程-线程1加锁成功,线程2加锁失败 (2).png

2、AQS源码浅析

AQS定义两种资源共享方式:

  • Exclusive:独占模式,同一时刻只能有一个线程获取到锁,例如ReentrantLock实现的就是独占式的锁资源;
  • Share:共享模式,允许同一时刻有多个线程获取到锁,比如ReentrantWriteLock和CountDownLatch等就是实现的这种模式。

实现自定义同步器在继承AQS抽象类时,不需要实现线程如何在等待队列中的唤醒和等待操作,仅需实现共享资源的获取和释放接口即可,具体实现AQS抽象类中以下几个被protected访问符修饰的方法:

  1. // 该线程是否正在独占资源
  2. isHeldExclusively()
  3. // 独占方式。尝试获取资源,成功则返回true,失败则返回false。
  4. tryAcquire(int)
  5. // 独占方式。尝试释放资源,成功则返回true,失败则返回false。
  6. tryRelease(int)
  7. // 共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
  8. tryAcquireShared(int)
  9. // 共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。
  10. tryReleaseShared(int)

一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一组即可。但AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。
在AQS类中维护线程阻塞和线程唤醒的逻辑或者方法中,会调用上面这些在AQS类中声明成protected的方法,这些protected方法需要在实现的同步器(同步器继承自AQS类)中进行实现,这样这些继承了AQS类的同步器里,线程阻塞和线程唤醒的方法中调用的protected的方法,就是同步器自身实现的逻辑。这样就把线程阻塞线程唤醒这些公共的逻辑抽成一个模板方法放在AQS抽象类里,具体如何定义线程获取锁成功与否,由具体继承了AQS的实现类区定义,模板方法中调用同步器自定义的逻辑,设计很巧妙。

2.1 独占模式加锁

2.1.1 acquire

  1. public final void acquire(int arg) {
  2. if (!tryAcquire(arg) &&
  3. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  4. selfInterrupt();
  5. }

此方法是独占模式下,如果线程通过CAS操作没有获取到锁对象时,会调用此方法,在ReentrantLock类中的NonfairSync内部类的lock方法的实现中可以找到此方法的调用,可以看到acquire方法中依次调用了以下4个方法,分别如下:

  • tryAcquire:尝试直接去获取资源,如果成功则acquire方法直接返回,不走if语句;
  • addWaiter:将当前线程加入等待队列的尾部,并标记为独占模式;
  • acquireQueued:使线程阻塞在等待队列中尝试获取资源,直到获取到资源才返回。如果在整个等待过程中被中断过,则返回true,否则返回false;
  • selfInterrupt:底层实现是Thread类的Interrupt方法,使当前线程中断。

    2.1.2 tryAcquire

    1. protected boolean tryAcquire(int arg) {
    2. throw new UnsupportedOperationException();
    3. }

    该方法就是在第2节开头说过的,需要继承AQS抽象类的实现类去实现的方法,比如在ReentrantLock类中的NonfairSync内部类就实现了该方法。

    2.1.3 addWaiter

    1. private Node addWaiter(Node mode) {
    2. // 通过入参的锁模式来构造一个Node实例,用来加入到双向链表的尾部,可以看到将当前线程也作为了构造函数的入参
    3. Node node = new Node(Thread.currentThread(), mode);
    4. Node pred = tail;
    5. if (pred != null) {
    6. node.prev = pred;
    7. // compareAndSetTail方法是通过CAS机制将当前线程的Node实例放入到链表尾部,
    8. // 底层是调用unSafe类的实例方法compareAndSwapObject,该方法是一个native方法
    9. if (compareAndSetTail(pred, node)) {
    10. pred.next = node;
    11. return node;
    12. }
    13. }
    14. // 如果当前链表没有尾结点,会调用enq方法将node节点入队
    15. enq(node);
    16. return node;
    17. }

    该方法将当前线程加入等待队列的尾部,并标记为独占模式。

    2.1.4 enq

    1. private Node enq(final Node node) {
    2. // for循环是一个死循环,其实就是自旋操作的实现
    3. for (;;) {
    4. Node t = tail;
    5. if (t == null) {
    6. // 如果t为空,创建一个头结点,且头结点没有对应具体线程
    7. if (compareAndSetHead(new Node()))
    8. tail = head;
    9. } else {
    10. // 如果t非空,将node放入队列尾部,node节点的前置节点是t节点
    11. node.prev = t;
    12. if (compareAndSetTail(t, node)) {
    13. t.next = node;
    14. // 返回node节点的前置节点
    15. return t;
    16. }
    17. }
    18. }
    19. }

    此方法是当链表的尾结点tail为空时,通过此方法将当前阻塞线程入队(双向链表)。

    2.1.5 acquireQueued

    1. final boolean acquireQueued(final Node node, int arg) {
    2. // 是否获取资源的标记位,为false代表获取到共享资源(锁)
    3. boolean failed = true;
    4. try {
    5. // 标记等待过程中是否被中断过,如果被中断过则为true
    6. boolean interrupted = false;
    7. // for循环是一个死循环,模拟自旋操作尝试获取锁
    8. for (;;) {
    9. // 获取node节点的前驱节点(队列是一个双向链表)
    10. final Node p = node.predecessor();
    11. // 如果前驱节点是头结点(头结点不对应线程,此时意味着node节点就是第一个应被唤醒的线程节点)并且当前线程尝试获取锁成功
    12. if (p == head && tryAcquire(arg)) {
    13. // 拿到资源后,将head指向该结点,即将node节点的pre指针和next指针均置空,意味着node节点对应的线程获取到锁后要出队了
    14. setHead(node);
    15. // 方便GC回收以前的head结点
    16. p.next = null;
    17. // 表示成功获取到资源,对应的标志位置为false
    18. failed = false;
    19. return interrupted;
    20. }
    21. // 如果当前线程在自旋过程中获取锁失败,就进入等待阻塞状态,如果线程被中断,则将标记位interrupted置为true
    22. if (shouldParkAfterFailedAcquire(p, node) &&
    23. parkAndCheckInterrupt())
    24. interrupted = true;
    25. }
    26. } finally {
    27. // 如果等待过程中没有成功获取资源(如timeout,或者可中断的情况下被中断了),那么取消结点在队列中的等待。
    28. if (failed)
    29. // 更新node节点的waitStatus为CANCELLED
    30. cancelAcquire(node);
    31. }
    32. }

    addWaiter方法是将node节点插入到双向链表尾部,而acquireQueued方法则维护了node节点在队列中的操作,比如不断地自旋尝试获取锁,何时唤醒队列中阻塞线程继续尝试获取锁。

    2.1.6 shouldParkAfterFailedAcquire

    1. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    2. // 获取当前node节点的前驱节点的waitStatus
    3. int ws = pred.waitStatus;
    4. if (ws == Node.SIGNAL)
    5. // 如果前驱节点的waitStatus为-1,代表当前节点就等待前驱节点的线程释放掉锁后,就会唤醒当前node节点对应的线程,直接返回true等就可以了
    6. return true;
    7. if (ws > 0) {
    8. // 如果前驱节点的waitStatus为CANCELLED,那就顺着双向链表一直往前找,直到找到一个正常等待的状态的节点,并将node节点排在它的后边
    9. do {
    10. node.prev = pred = pred.prev;
    11. } while (pred.waitStatus > 0);
    12. pred.next = node;
    13. } else {
    14. // 如果顺着链表往前找的过程中,前驱节点状态不是CANCELLED,就通过CAS把前驱节点的状态设置成SIGNAL
    15. compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    16. }
    17. return false;
    18. }

    正如方法名所示,该方法是在自旋过程中如果线程获取锁失败,会根据当前节点的前驱节点的waitStatus,来将当前节点设置为可能下一个就是唤醒他的状态。

    2.1.7 parkAndCheckInterrupt

    1. private final boolean parkAndCheckInterrupt() {
    2. // 调用unsafe类中的park方法(native方法)使当前线程进入阻塞等待状态
    3. LockSupport.park(this);
    4. // 返回当前线程是否被中断
    5. return Thread.interrupted();
    6. }

    正如方法名所示,该方法将当前线程进入阻塞等待状态,并返回当前线程是否被中断的结果。
    至此独占模式的加锁源码解析完毕。

    2.2 独占模式解锁

    2.2.1 release

    1. public final boolean release(int arg) {
    2. // 如果尝试释放资源成功,返回true
    3. if (tryRelease(arg)) {
    4. Node h = head;
    5. if (h != null && h.waitStatus != 0)
    6. // 当前线程释放掉了锁,唤醒等待队列中的下一个可以尝试获取锁的线程。
    7. unparkSuccessor(h);
    8. return true;
    9. }
    10. // 如果尝试释放资源失败,返回false
    11. return false;
    12. }

    ReentrantLock类的unlock解锁方法,底层就是AQS中的release方法实现。

    2.2.2 tryRelease

    1. protected boolean tryRelease(int arg) {
    2. throw new UnsupportedOperationException();
    3. }

    该方法尝试释放资源,也是由继承AQS抽象类的实现类去实现的,在ReentrantLock就有具体实现。

    2.2.3 unparkSuccessor

    1. private void unparkSuccessor(Node node) {
    2. /*
    3. * If status is negative (i.e., possibly needing signal) try
    4. * to clear in anticipation of signalling. It is OK if this
    5. * fails or if status is changed by waiting thread.
    6. */
    7. int ws = node.waitStatus;
    8. if (ws < 0)
    9. compareAndSetWaitStatus(node, ws, 0);
    10. /*
    11. * Thread to unpark is held in successor, which is normally
    12. * just the next node. But if cancelled or apparently null,
    13. * traverse backwards from tail to find the actual
    14. * non-cancelled successor.
    15. */
    16. Node s = node.next;
    17. if (s == null || s.waitStatus > 0) {
    18. s = null;
    19. for (Node t = tail; t != null && t != node; t = t.prev)
    20. if (t.waitStatus <= 0)
    21. s = t;
    22. }
    23. if (s != null)
    24. LockSupport.unpark(s.thread);
    25. }

    此方法用于唤醒等待队列中下一个线程。

    2.3 共享模式加锁

    2.3.1 acquireShared

    public final void acquireShared(int arg) {
          if (tryAcquireShared(arg) < 0)
              doAcquireShared(arg);
      }
    

    该法是共享模式下线程获取共享资源的顶层入口。它会获取指定量的资源,获取成功则直接返回,获取失败则进入等待队列,直到获取到资源为止,整个过程忽略中断。这里tryAcquireShared()依然需要自定义同步器去实现,但是AQS已经把其返回值的语义定义好了:负值代表获取失败;0代表获取成功,但没有剩余资源;正数表示获取成功,还有剩余资源,其他线程还可以去获取。所以这里acquireShared()的流程就是:

  1. tryAcquireShared()尝试获取资源,成功则直接返回;
  2. 失败则通过doAcquireShared()进入等待队列,直到获取到资源才返回。

    2.3.2 tryAcquireShared

    protected int tryAcquireShared(int arg) {
         throw new UnsupportedOperationException();
     }
    
    该方法也是由继承AQS的实现类实现,比如CountDownLatch类中就有该方法的实现。

    2.3.3 doAcquireShared

    private void doAcquireShared(int arg) {
         final Node node = addWaiter(Node.SHARED);
         boolean failed = true;
         try {
             boolean interrupted = false;
             for (;;) {
                 final Node p = node.predecessor();
                 if (p == head) {
                     int r = tryAcquireShared(arg);
                     if (r >= 0) {
                         setHeadAndPropagate(node, r);
                         p.next = null; // help GC
                         if (interrupted)
                             selfInterrupt();
                         failed = false;
                         return;
                     }
                 }
                 if (shouldParkAfterFailedAcquire(p, node) &&
                     parkAndCheckInterrupt())
                     interrupted = true;
             }
         } finally {
             if (failed)
                 cancelAcquire(node);
         }
     }
    

    2.4 共享模式解锁

    2.4.1 releaseShared

    2.4.2 doReleaseShared

    3、总结

    如果我来大致介绍AQS,我会介绍以下几点:
  • AQS是一个同步器的抽象类,JUC包下的类(比如ReentrantLock),内部类就是继承自AQS;我们如果想基于AQS的框架去实现自定义的同步器,可以令同步器继承AQS抽象类,并重点实现AQS抽象类中的protected方法;
  • AQS内部维护了竞争锁失败的线程在阻塞队列中的维护及唤醒的逻辑,这部分逻辑抽象成模板方法放在在AQS抽象类中,在这些公共逻辑里也涉及protected方法的调用,子类同步器中的模板方法调用的也是子类同步器覆写的这些protected方法。通过这种方式将公共的模板方法抽象在AQS类,将如何判定获取锁成功还是失败的逻辑在子类的protected方法中实现,实现解耦;
  • AQS维护的阻塞线程入队和唤醒的模型见1.3小节;
  • 需要具体的同步器实现的protected方法见第二节开头。

    参考

    图文并茂详解AQS加锁
    Java 并发高频面试题:聊聊你对 AQS 的理解?
    Java并发之AQS详解