Java AQS

内容大纲

AQS - 图1

基础

AbstractQueuedSynchronizer抽象同步队列简称A Q S,它是实现同步器的基础组件,如常用的ReentrantLockSemaphoreCountDownLatch等。
A Q S定义了一套多线程访问共享资源的同步模板,解决了实现同步器时涉及的大量细节问题,能够极大地减少实现工作,虽然大多数开发者可能永远不会使用A Q S实现自己的同步器(J U C包下提供的同步器基本足够应对日常开发),但是知道A Q S的原理对于架构设计还是很有帮助的,面试还可以吹吹牛,下面是A Q S的组成结构。
AQS - 图2
三部分组成,state同步状态、Node组成的CLH队列、ConditionObject条件变量(包含Node组成的条件单向队列),下面会分别对这三部分做介绍。
先贴下AbstractQueuedSynchronizer提供的核心函数,混个脸熟就够了,后面会讲解
状态

  • getState():返回同步状态
  • setState(int newState):设置同步状态
  • compareAndSetState(int expect, int update):使用C A S设置同步状态
  • isHeldExclusively():当前线程是否持有资源

独占资源(不响应线程中断)

  • tryAcquire(int arg):独占式获取资源,子类实现
  • acquire(int arg):独占式获取资源模板
  • tryRelease(int arg):独占式释放资源,子类实现
  • release(int arg):独占式释放资源模板

共享资源(不响应线程中断)

  • tryAcquireShared(int arg):共享式获取资源,返回值大于等于0则表示获取成功,否则获取失败,子类实现
  • acquireShared(int arg):共享式获取资源模板
  • tryReleaseShared(int arg):共享式释放资源,子类实现
  • releaseShared(int arg):共享式释放资源模板

这里补充下,获取独占、共享资源操作还提供超时与响应中断的扩展函数,有兴趣的读者可以去AbstractQueuedSynchronizer源码了解。

同步状态

在A Q S中维护了一个同步状态变量state,getState函数获取同步状态,setStatecompareAndSetState函数修改同步状态,对于A Q S来说,线程同步的关键是对state的操作,可以说获取、释放资源是否成功都是由state决定的,比如state>0代表可获取资源,否则无法获取,所以state的具体语义由实现者去定义,现有的ReentrantLockReentrantReadWriteLockSemaphoreCountDownLatch定义的state语义都不一样。

  • **ReentrantLock**的state用来表示是否有锁资源
  • **ReentrantReadWriteLock**的state高16位代表读锁状态,低16位代表写锁状态
  • **Semaphore**的state用来表示可用信号的个数
  • **CountDownLatch**的state用来表示计数器的值

    CLH队列

    CLH是A Q S内部维护的FIFO(先进先出)双端双向队列(方便尾部节点插入),基于链表数据结构,当一个线程竞争资源失败,就会将等待资源的线程封装成一个Node节点,通过C A S原子操作插入队列尾部,最终不同的Node节点连接组成了一个CLH队列,所以说A Q S通过CLH队列管理竞争资源的线程,个人总结CLH队列具有如下几个优点:

  • 先进先出保证了公平性

  • 非阻塞的队列,通过自旋锁和C A S保证节点插入和移除的原子性,实现无锁快速插入
  • 采用了自旋锁思想,所以CLH也是一种基于链表的可扩展、高性能、公平的自旋锁

    Node内部类

    Node是A Q S的内部类,每个等待资源的线程都会封装成Node节点组成C L H队列、等待队列,所以说Node是非常重要的部分,理解它是理解A Q S的第一步。
    AQS - 图3
    列Node类中的变量都很好理解,只有waitStatusnextWaiter没有细说,下面做个补充说明
    waitStatus等待状态如下
    AQS - 图4
    nextWaiter特殊标记

  • Node在CLH队列时,**nextWaiter**表示共享式或独占式标记

  • Node在条件队列时,**nextWaiter**表示下个Node节点指针

    流程概述

    线程获取资源失败,封装成Node节点从C L H队列尾部入队并阻塞线程,某线程释放资源时会把C L H队列首部Node节点关联的线程唤醒(此处的首部是指第二个节点,后面会细说),再次获取资源。
    AQS - 图5

    入队

    获取资源失败的线程需要封装成Node节点,接着尾部入队,在A Q S中提供addWaiter函数完成Node节点的创建与入队。
    1. /**
    2. * @description: Node节点入队-CLH队列
    3. * @param mode 标记 Node.EXCLUSIVE独占式 or Node.SHARED共享式
    4. */
    5. private Node addWaiter(Node mode) {
    6. //根据当前线程创建节点,等待状态为0
    7. Node node = new Node(Thread.currentThread(), mode);
    8. // 获取尾节点
    9. Node pred = tail;
    10. if (pred != null) {
    11. //如果尾节点不等于null,把当前节点的前驱节点指向尾节点
    12. node.prev = pred;
    13. //通过cas把尾节点指向当前节点
    14. if (compareAndSetTail(pred, node)) {
    15. //之前尾节点的下个节点指向当前节点
    16. pred.next = node;
    17. return node;
    18. }
    19. }
    20. //如果添加失败或队列不存在,执行end函数
    21. enq(node);
    22. return node;
    23. }
    添加节点的时候,如果从C L H队列已经存在,通过C A S快速将当前节点添加到队列尾部,如果添加失败或队列不存在,则指向enq函数自旋入队。
    1. /**
    2. * @description: 自旋cas入队
    3. * @param node 节点
    4. */
    5. private Node enq(final Node node) {
    6. for (;;) { //循环
    7. //获取尾节点
    8. Node t = tail;
    9. if (t == null) {
    10. //如果尾节点为空,创建哨兵节点,通过cas把头节点指向哨兵节点
    11. if (compareAndSetHead(new Node()))
    12. //cas成功,尾节点指向哨兵节点
    13. tail = head;
    14. } else {
    15. //当前节点的前驱节点设指向之前尾节点
    16. node.prev = t;
    17. //cas设置把尾节点指向当前节点
    18. if (compareAndSetTail(t, node)) {
    19. //cas成功,之前尾节点的下个节点指向当前节点
    20. t.next = node;
    21. return t;
    22. }
    23. }
    24. }
    25. }
    通过自旋C A S尝试往队列尾部插入节点,直到成功,自旋过程如果发现C L H队列不存在时会初始化C L H队列,入队过程流程如下图
    AQS - 图6
    第一次循环
  1. 刚开始C L H队列不存在,head与tail都指向null
  2. 要初始化C L H队列,会创建一个哨兵节点,head与tail都指向哨兵节点

第二次循环

  1. 当前线程节点的前驱节点指向尾部节点(哨兵节点)
  2. 设置当前线程节点为尾部,tail指向当前线程节点
  3. 前尾部节点的后驱节点指向当前线程节点(当前尾部节点)

最后结合addWaiter与enq函数的入队流程图如下
AQS - 图7

出队

C L H队列中的节点都是获取资源失败的线程节点,当持有资源的线程释放资源时,会将head.next指向的线程节点唤醒(C L H队列的第二个节点),如果唤醒的线程节点获取资源成功,线程节点清空信息设置为头部节点(新哨兵节点),原头部节点出队(原哨兵节点
acquireQueued函数中的部分代码

  1. //1.获取前驱节点
  2. final Node p = node.predecessor();
  3. //如果前驱节点是首节点,获取资源(子类实现)
  4. if (p == head && tryAcquire(arg)) {
  5. //2.获取资源成功,设置当前节点为头节点,清空当前节点的信息,把当前节点变成哨兵节点
  6. setHead(node);
  7. //3.原来首节点下个节点指向为null
  8. p.next = null; // help GC
  9. //4.非异常状态,防止指向finally逻辑
  10. failed = false;
  11. //5.返回线程中断状态
  12. return interrupted;
  13. }
  14. private void setHead(Node node) {
  15. //节点设置为头部
  16. head = node;
  17. //清空线程
  18. node.thread = null;
  19. //清空前驱节点
  20. node.prev = null;
  21. }

只需要关注1~3步骤即可,过程非常简单,假设获取资源成功,更换头部节点,并把头部节点的信息清除变成哨兵节点,注意这个过程是不需要使用C A S来保证,因为只有一个线程能够成功获取到资源。
AQS - 图8

条件变量

Objectwaitnotify函数是配合Synchronized锁实现线程间同步协作的功能,A Q S的ConditionObject条件变量也提供这样的功能,通过ConditionObjectawaitsignal两类函数完成。
不同于Synchronized锁,一个A Q S可以对应多个条件变量,而Synchronized只有一个。
AQS - 图9
如上图所示,ConditionObject内部维护着一个单向条件队列,不同于C H L队列,条件队列只入队执行await的线程节点,并且加入条件队列的节点,不能在C H L队列, 条件队列出队的节点,会入队到C H L队列。
当某个线程执行了ConditionObjectawait函数,阻塞当前线程,线程会被封装成Node节点添加到条件队列的末端,其他线程执行ConditionObjectsignal函数,会将条件队列头部线程节点转移到C H L队列参与竞争资源,具体流程如下图
AQS - 图10
最后补充下,条件队列Node类是使用nextWaiter变量指向下个节点,并且因为是单向队列,所以prevnext变量都是null

进阶

A Q S采用了模板方法设计模式,提供了两类模板,一类是独占式模板,另一类是共享形模式,对应的模板函数如下

  • 独占式
    • **acquire**获取资源
    • **release**释放资源
  • 共享式

    • **acquireShared**获取资源
    • **releaseShared**释放资源

      独占式获取资源

      acquire是个模板函数,模板流程就是线程获取共享资源,如果获取资源成功,线程直接返回,否则进入CLH队列,直到获取资源成功为止,且整个过程忽略中断的影响,acquire函数代码如下
      AQS - 图11
  • 执行tryAcquire函数,tryAcquire是由子类实现,代表获取资源是否成功,如果资源获取失败,执行下面的逻辑

  • 执行addWaiter函数(前面已经介绍过),根据当前线程创建出独占式节点,并入队CLH队列
  • 执行acquireQueued函数,自旋阻塞等待获取资源
  • 如果acquireQueued函数中获取资源成功,根据线程是否被中断状态,来决定执行线程中断逻辑

AQS - 图12
acquire函数的大致流程都清楚了,下面来分析下acquireQueued函数,线程封装成节点后,是如何自旋阻塞等待获取资源的,代码如下

  1. /**
  2. * @description: 自旋机制等待获取资源
  3. * @param node
  4. * @param arg
  5. * @return: boolean
  6. */
  7. final boolean acquireQueued(final Node node, int arg) {
  8. //异常状态,默认是
  9. boolean failed = true;
  10. try {
  11. //该线程是否中断过,默认否
  12. boolean interrupted = false;
  13. for (;;) {//自旋
  14. //获取前驱节点
  15. final Node p = node.predecessor();
  16. //如果前驱节点是首节点,获取资源(子类实现)
  17. if (p == head && tryAcquire(arg)) {
  18. //获取资源成功,设置当前节点为头节点,清空当前节点的信息,把当前节点变成哨兵节点
  19. setHead(node);
  20. //原来首节点下个节点指向为null
  21. p.next = null; // help GC
  22. //非异常状态,防止指向finally逻辑
  23. failed = false;
  24. //返回线程中断状态
  25. return interrupted;
  26. }
  27. /**
  28. * 如果前驱节点不是首节点,先执行shouldParkAfterFailedAcquire函数,shouldParkAfterFailedAcquire做了三件事
  29. * 1.如果前驱节点的等待状态是SIGNAL,返回true,执行parkAndCheckInterrupt函数,返回false
  30. * 2.如果前驱节点的等大状态是CANCELLED,把CANCELLED节点全部移出队列(条件节点)
  31. * 3.以上两者都不符合,更新前驱节点的等待状态为SIGNAL,返回false
  32. */
  33. if (shouldParkAfterFailedAcquire(p, node) &&
  34. //使用LockSupport类的静态方法park挂起当前线程,直到被唤醒,唤醒后检查当前线程是否被中断,返回该线程中断状态并重置中断状态
  35. parkAndCheckInterrupt())
  36. //该线程被中断过
  37. interrupted = true;
  38. }
  39. } finally {
  40. // 尝试获取资源失败并执行异常,取消请求,将当前节点从队列中移除
  41. if (failed)
  42. cancelAcquire(node);
  43. }
  44. }

一图胜千言,核心流程图如下
AQS - 图13

独占式释放资源

有获取资源,自然就少不了释放资源,A Q S中提供了release模板函数来释放资源,模板流程就是线程释放资源成功,唤醒CLH队列的第二个线程节点(首节点的下个节点),代码如下

  1. /**
  2. * @description: 独占式-释放资源模板函数
  3. * @param arg
  4. * @return: boolean
  5. */
  6. public final boolean release(int arg) {
  7. if (tryRelease(arg)) {//释放资源成功,tryRelease子类实现
  8. //获取头部线程节点
  9. Node h = head;
  10. if (h != null && h.waitStatus != 0) //头部线程节点不为null,并且等待状态不为0
  11. //唤醒CHL队列第二个线程节点
  12. unparkSuccessor(h);
  13. return true;
  14. }
  15. return false;
  16. }
  17. private void unparkSuccessor(Node node) {
  18. //获取节点等待状态
  19. int ws = node.waitStatus;
  20. if (ws < 0)
  21. //cas更新节点状态为0
  22. compareAndSetWaitStatus(node, ws, 0);
  23. //获取下个线程节点
  24. Node s = node.next;
  25. if (s == null || s.waitStatus > 0) { //如果下个节点信息异常,从尾节点循环向前获取到正常的节点为止,正常情况不会执行
  26. s = null;
  27. for (Node t = tail; t != null && t != node; t = t.prev)
  28. if (t.waitStatus <= 0)
  29. s = t;
  30. }
  31. if (s != null)
  32. //唤醒线程节点
  33. LockSupport.unpark(s.thread);
  34. }
  35. }

release逻辑非常简单,流程图如下
AQS - 图14

共享式获取资源

acquireShared是个模板函数,模板流程就是线程获取共享资源,如果获取到资源,线程直接返回,否则进入CLH队列,直到获取到资源为止,且整个过程忽略中断的影响,acquireShared函数代码如下

  1. /**
  2. * @description: 共享式-获取资源模板函数
  3. * @param arg
  4. * @return: void
  5. */
  6. public final void acquireShared(int arg) {
  7. /**
  8. * 1.负数表示失败
  9. * 2.0表示成功,但没有剩余可用资源
  10. * 3.正数表示成功且有剩余资源
  11. */
  12. if (tryAcquireShared(arg) < 0) //获取资源失败,tryAcquireShared子类实现
  13. //自旋阻塞等待获取资源
  14. doAcquireShared(arg);
  15. }

doAcquireShared函数与独占式的acquireQueued函数逻辑基本一致,唯一的区别就是下图红框部分
AQS - 图15

  • 节点的标记是共享式
  • 获取资源成功,还会唤醒后续资源,因为资源数可能>0,代表还有资源可获取,所以需要做后续线程节点的唤醒

    共享式释放资源

    A Q S中提供了releaseShared模板函数来释放资源,模板流程就是线程释放资源成功,唤醒CHL队列的第二个线程节点(首节点的下个节点),代码如下 ```java /**
    • @description: 共享式-释放资源模板函数
    • @param arg
    • @return: boolean */ public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) {//释放资源成功,tryReleaseShared子类实现
      1. //唤醒后继节点
      2. doReleaseShared();
      3. return true;
      } return false; }

private void doReleaseShared() { for (;;) { //获取头节点 Node h = head; if (h != null && h != tail) { int ws = h.waitStatus;

  1. if (ws == Node.SIGNAL) {//如果头节点等待状态为SIGNAL
  2. if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))//更新头节点等待状态为0
  3. continue; // loop to recheck cases
  4. //唤醒头节点下个线程节点
  5. unparkSuccessor(h);
  6. }
  7. //如果后继节点暂时不需要被唤醒,更新头节点等待状态为PROPAGATE
  8. else if (ws == 0 &&
  9. !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
  10. continue;
  11. }
  12. if (h == head)
  13. break;
  14. }

}

  1. 与独占式释放资源区别不大,都是唤醒头节点的下个节点,就不做过多描述了。
  2. <a name="CHcXr"></a>
  3. ## 实战
  4. 说了这么多理论,现在到实战环节了,正如前文所述,A Q S定义了一套多线程访问共享资源的同步模板,解决了实现同步器时涉及的大量细节问题,能够极大地减少实现工作,现在基于A Q S实现一个不可重入的独占锁,直接使用A Q S提供的独占式模板,只需明确`state`的语义与实现`tryAcquire``tryRelease`函数(**获取资源与释放资源**),在这里state0表示锁没有被线程持有,state1表示锁已经被某个线程持有,由于是不可重入锁,所以不需要记录持有锁线程的获取锁次数。<br />不可重入的独占锁代码如下
  5. ```java
  6. /**
  7. * @Description 不可重入的独占锁
  8. */
  9. public class NonReentrantLock implements Lock {
  10. /**
  11. * @Author 程序猿阿星
  12. * @Description 自定义同步器
  13. */
  14. private static class Sync extends AbstractQueuedSynchronizer {
  15. /**
  16. * 锁是否被线程持有
  17. */
  18. @Override
  19. protected boolean isHeldExclusively() {
  20. //0:未持有 1:已持有
  21. return super.getState() == 1;
  22. }
  23. /**
  24. * 获取锁
  25. */
  26. @Override
  27. protected boolean tryAcquire(int arg) {
  28. if (arg != 1) {
  29. //获取锁操作,是需要把state更新为1,所以arg必须是1
  30. throw new RuntimeException("arg not is 1");
  31. }
  32. if (compareAndSetState(0, arg)) {//cas 更新state为1成功,代表获取锁成功
  33. //设置持有锁线程
  34. setExclusiveOwnerThread(Thread.currentThread());
  35. return true;
  36. }
  37. return false;
  38. }
  39. /**
  40. * 释放锁
  41. */
  42. @Override
  43. protected boolean tryRelease(int arg) {
  44. if (arg != 0) {
  45. //释放锁操作,是需要把state更新为0,所以arg必须是0
  46. throw new RuntimeException("arg not is 0");
  47. }
  48. //清空持有锁线程
  49. setExclusiveOwnerThread(null);
  50. //设置state状态为0,此处不用cas,因为只有获取锁成功的线程才会执行该函数,不需要考虑线程安全问题
  51. setState(arg);
  52. return true;
  53. }
  54. /**
  55. * 提供创建条件变量入口
  56. */
  57. public ConditionObject createConditionObject() {
  58. return new ConditionObject();
  59. }
  60. }
  61. private final Sync sync = new Sync();
  62. /**
  63. * 获取锁
  64. */
  65. @Override
  66. public void lock() {
  67. //Aqs独占式-获取资源模板函数
  68. sync.acquire(1);
  69. }
  70. /**
  71. * 获取锁-响应中断
  72. */
  73. @Override
  74. public void lockInterruptibly() throws InterruptedException {
  75. //Aqs独占式-获取资源模板函数(响应线程中断)
  76. sync.acquireInterruptibly(1);
  77. }
  78. /**
  79. * 获取锁是否成功-不阻塞
  80. */
  81. @Override
  82. public boolean tryLock() {
  83. //子类实现
  84. return sync.tryAcquire(1);
  85. }
  86. /**
  87. * 获取锁-超时机制
  88. */
  89. @Override
  90. public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
  91. //Aqs独占式-获取资源模板函数(超时机制)
  92. return sync.tryAcquireNanos(1,unit.toNanos(time));
  93. }
  94. /**
  95. * 释放锁
  96. */
  97. @Override
  98. public void unlock() {
  99. //Aqs独占式-释放资源模板函数
  100. sync.release(0);
  101. }
  102. /**
  103. * 创建条件变量
  104. */
  105. @Override
  106. public Condition newCondition() {
  107. return sync.createConditionObject();
  108. }
  109. }

NonReentrantLock定义了一个内部类SyncSync用来实现具体的锁操作,它继承了A Q S,因为使用的是独占式模板,所以重写tryAcquiretryRelease函数,另外提供了一个创建条件变量的入口,下面使用自定义的独占锁来同步两个线程对j++。

  1. private static int j = 0;
  2. public static void main(String[] agrs) throws InterruptedException {
  3. NonReentrantLock nonReentrantLock = new NonReentrantLock();
  4. Runnable runnable = () -> {
  5. //获取锁
  6. nonReentrantLock.lock();
  7. for (int i = 0; i < 100000; i++) {
  8. j++;
  9. }
  10. //释放锁
  11. nonReentrantLock.unlock();
  12. };
  13. Thread thread = new Thread(runnable);
  14. Thread threadTwo = new Thread(runnable);
  15. thread.start();
  16. threadTwo.start();
  17. thread.join();
  18. threadTwo.join();
  19. System.out.println(j);
  20. }

无论执行多少次输出内容都是:

  1. 200000

AQS简化流程图

AQS - 图16