1. 关于AQS这个概念,个人是最近才知道。本篇文章只是简单的整理一下知识内容。

简介

  • AQS是AbstractQueuedSynchronizer这个JDK源码中的抽象类名的缩写
  • JDK8的API手册定义其为一个框架。说白了就是使用了模板方法模式,定义了一个同步器实现的算法步骤,算法具体实现,延迟到子类去实现
  • ReentrantLockSemaphore等类都是依靠这个框架(继承了这个类)来实现的

    AQS数据结构

  • status:资源标识
  • CLH锁-队列:CLH是三个人名字的首字母,CLH锁时自旋锁

image.png

  • 队列中节点是一个Node对象,Node中拥有线程对象

    资源共享的两种模式

  • 独占式(Exclusive):如ReentrantLock

  • 共享模式(Share):如CountDonwLatch、Semaphore

    Node数据结构

    Node的队列中每个节点的数据结构,每个节点Node代表的就是一个等待或占用线程

    1. /**
    2. * Wait queue node class.
    3. *
    4. * <p>The wait queue is a variant of a "CLH" (Craig, Landin, and
    5. * Hagersten) lock queue. CLH locks are normally used for
    6. * spinlocks. We instead use them for blocking synchronizers, but
    7. * use the same basic tactic of holding some of the control
    8. * information about a thread in the predecessor of its node. A
    9. * "status" field in each node keeps track of whether a thread
    10. * should block. A node is signalled when its predecessor
    11. * releases. Each node of the queue otherwise serves as a
    12. * specific-notification-style monitor holding a single waiting
    13. * thread. The status field does NOT control whether threads are
    14. * granted locks etc though. A thread may try to acquire if it is
    15. * first in the queue. But being first does not guarantee success;
    16. * it only gives the right to contend. So the currently released
    17. * contender thread may need to rewait.
    18. *
    19. * <p>To enqueue into a CLH lock, you atomically splice it in as new
    20. * tail. To dequeue, you just set the head field.
    21. * <pre>
    22. * +------+ prev +-----+ +-----+
    23. * head | | <---- | | <---- | | tail
    24. * +------+ +-----+ +-----+
    25. * </pre>
    26. *
    27. * Node的类的这段注释就解释了为啥叫CLH锁队列。
    28. * 因为新Node进入队列时使用CLH锁的方式(自旋锁的方式)
    29. *
    30. *
    31. */
    32. static final class Node {
    33. /**
    34. * Marker to indicate a node is waiting in shared mode
    35. * 标记一个节点(一个线程)在共享模式下等待
    36. *
    37. * */
    38. static final Node SHARED = new Node();
    39. /**
    40. * Marker to indicate a node is waiting in exclusive mode
    41. * 标记一个节点在独占模式下等待
    42. * */
    43. static final Node EXCLUSIVE = null;
    44. /**
    45. * waitStatus value to indicate thread has cancelled
    46. *
    47. * 表示该结点已被取消
    48. *
    49. * */
    50. static final int CANCELLED = 1;
    51. /**
    52. *
    53. * waitStatus value to indicate successor's thread needs unparking
    54. * 标记后继的节点需要被唤醒
    55. * */
    56. static final int SIGNAL = -1;
    57. /**
    58. *
    59. * waitStatus value to indicate thread is waiting on condition
    60. * 表示该结点等待某个条件
    61. *
    62. * */
    63. static final int CONDITION = -2;
    64. /**
    65. * waitStatus value to indicate the next acquireShared should
    66. * unconditionally propagate
    67. * waitStatus的值,表示有资源可用,
    68. * 新head结点需要继续唤醒后继结点
    69. * (共享模式下,多线程并发释放资源,
    70. * 而head唤醒其后继结点后,需要把多出来的资源留给后面的结点;设置新的head结点时,会继续唤醒其后继结点)
    71. *
    72. *
    73. */
    74. static final int PROPAGATE = -3;
    75. /**
    76. *
    77. */
    78. volatile int waitStatus;
    79. /** 前驱指针 */
    80. volatile Node prev;
    81. /** 后继指针 */
    82. volatile Node next;
    83. /**
    84. * Node所代表的线程实例
    85. *
    86. */
    87. volatile Thread thread;
    88. /**
    89. *
    90. * 要么是标记共享模式还是独占模式
    91. * 要么就是记录下一个等待条件的Node
    92. */
    93. Node nextWaiter;
    94. /**
    95. * 是否共享模式
    96. */
    97. final boolean isShared() {
    98. return nextWaiter == SHARED;
    99. }
    100. /**
    101. * 前节点获取
    102. * @return the predecessor of this node
    103. */
    104. final Node predecessor() throws NullPointerException {
    105. Node p = prev;
    106. if (p == null)
    107. throw new NullPointerException();
    108. else
    109. return p;
    110. }
    111. Node() { // Used to establish initial head or SHARED marker
    112. }
    113. /**
    114. * 这里 第二的Node就是指明是 共享模式还是独占模式
    115. * @param thread
    116. * @param mode
    117. */
    118. Node(Thread thread, Node mode) { // Used by addWaiter
    119. this.nextWaiter = mode;
    120. this.thread = thread;
    121. }
    122. Node(Thread thread, int waitStatus) { // Used by Condition
    123. this.waitStatus = waitStatus;
    124. this.thread = thread;
    125. }
    126. }
  • 为啥叫CLH队列?

因为这个队列新增节点是通过CLH锁(一种自旋锁)的方式去入队列的。

Node类的几种waitStatus

image.png

状态 状态值 描述
INITAL 0 初始化状态
CANCELLED 1 当前节点已取消。
SIGNAL -1 后继节点需要被唤醒
CONDITION -2 节点等待一个Condition
PROPAGATE -3 共享模式中标识节点可运行?
标识资源可用,需要唤醒后继节点
  • 额,额,额,先记录着,后面再具体理解

Node类的入队列流程

这里我就不贴代码了。可以看一下涛哥笔记 yyds好吧。

  1. 尝试直接CAS将节点插入队列
  2. 失败的话循环CAS插入

AQS的主要方法解析

AQS使用设计模式中的模板方法模式,自然会留给子类去实现的一些方法。

  • isHeldExclusively():该线程是否正在独占资源。用到Condition时需要实现方法
    • 独占锁
  • acquire(int arg):独占式获取同步状态,如果当前线程获取成功则返回,否则加入等待队列(一致等待)
  • acquireInterruptibly(int arg):独占式获取同步状态(同上),如果被打断直接抛异常
  • tryAcquire(int arg):独占式获取同步状态(供开发者重写)
  • tryAcquireNanos(int arg,long nanosTimeout):独占式获取同步状态,增加等待时间限制
  • release(int arg):独占式释放同步状态,释放后将同步队列中第一个节点包含的线程唤醒
  • tryRelease(int arg):独占式释放同步状态(供开发者重写)

    • 共享锁

    类似

tryAcquire方法由开发者自己实现,但是抽象类中一般也实现这个方法,但是只是抛异常,这样做的目的:

  1. 避免子类把不需要的方法实现一遍

获取资源方法分析

总的来说是下面流程
image.png

  1. 线程A先获取尝试获取资源,成功后线程A执行
  2. 获取失败,线程A转换为Node加入等待队列
  3. 线程A的Node循环尝试获取资源
  4. 在某次尝试获取失败后,发现可以中断线程了,进行中断操作

acquire(int arg)

  1. /**
  2. * Acquires in exclusive mode, ignoring interrupts. Implemented
  3. * by invoking at least once {@link #tryAcquire},
  4. * returning on success. Otherwise the thread is queued, possibly
  5. * repeatedly blocking and unblocking, invoking {@link
  6. * #tryAcquire} until success. This method can be used
  7. * to implement method {@link Lock#lock}.
  8. *
  9. * 是一个独占模式,忽略中断,
  10. * 通过至少一次的tryAcquire方法实现
  11. * 或者进入队列重复的阻塞和不阻塞, 不阻塞时调用tryAcquire 直到成功
  12. *
  13. *
  14. *
  15. * @param arg the acquire argument. This value is conveyed to
  16. * {@link #tryAcquire} but is otherwise uninterpreted and
  17. * can represent anything you like.
  18. */
  19. public final void acquire(int arg) {
  20. //首次try没成功, 入队列,之后又调用acquireQueued方法。
  21. if (!tryAcquire(arg) &&
  22. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  23. selfInterrupt();
  24. }

acquireQueued(Node node,int arg)

  • 循环试着去拿资源
  • 每次失败后,会看看是否可以直接Waiting

    1. /**
    2. * Acquires in exclusive uninterruptible mode for thread already in
    3. * queue. Used by condition wait methods as well as acquire.
    4. *
    5. * 以独占和不可中断的模式
    6. *
    7. * @param node the node
    8. * @param arg the acquire argument
    9. * @return {@code true} if interrupted while waiting
    10. */
    11. final boolean acquireQueued(final Node node, int arg) {
    12. boolean failed = true;
    13. try {
    14. boolean interrupted = false;
    15. for (;;) {
    16. // 获取节点的前节点
    17. final Node p = node.predecessor();
    18. //若前节点是头节点,再试着获取一次
    19. if (p == head && tryAcquire(arg)) {
    20. //成功的话,我就是新王
    21. setHead(node);
    22. //前节点出队列
    23. p.next = null; // help GC
    24. failed = false;
    25. return interrupted;
    26. }
    27. //shouldParkAfterFailedAcquire:判断是否可以让线程Wating
    28. // parkAndCheckInterrupt()并且尝试中断
    29. if (shouldParkAfterFailedAcquire(p, node) &&
    30. parkAndCheckInterrupt())
    31. interrupted = true;
    32. }
    33. } finally {
    34. // 循环结束仍然是失败,则取消请求
    35. if (failed)
    36. cancelAcquire(node);
    37. }
    38. }

    shouldParkAfterFailedAcquire(Node pred, Node nowNode)
    1. /**
    2. * Checks and updates status for a node that failed to acquire.
    3. * Returns true if thread should block. This is the main signal
    4. * control in all acquire loops. Requires that pred == node.prev.
    5. *
    6. * 如果Node应该被阻塞
    7. *
    8. * @param pred node's predecessor holding status
    9. * @param node the node
    10. * @return {@code true} if thread should block
    11. */
    12. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    13. int ws = pred.waitStatus;
    14. if (ws == Node.SIGNAL)
    15. /*
    16. * This node has already set status asking a release
    17. * to signal it, so it can safely park.
    18. *
    19. * 前节点如果是标记了后继节点唤醒,则返回true
    20. *
    21. */
    22. return true;
    23. if (ws > 0) {
    24. /*
    25. * Predecessor was cancelled. Skip over predecessors and
    26. * indicate retry.
    27. * 前节点如果被取消了,则一直追溯到前面的不为取消的节点
    28. */
    29. do {
    30. node.prev = pred = pred.prev;
    31. } while (pred.waitStatus > 0);
    32. //中间一段已取消的节点全部剔除队列
    33. pred.next = node;
    34. } else {
    35. /*
    36. * waitStatus must be 0 or PROPAGATE. Indicate that we
    37. * need a signal, but don't park yet. Caller will need to
    38. * retry to make sure it cannot acquire before parking.
    39. *
    40. * 我们需要前面一个节点是 -1 状态, 否则我们不能 阻塞
    41. *
    42. */
    43. compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    44. }
    45. return false;
    46. }

    parkAndCheckInterrupt()
    1. /**
    2. * Convenience method to park and then check if interrupted
    3. *
    4. * @return {@code true} if interrupted
    5. */
    6. private final boolean parkAndCheckInterrupt() {
    7. LockSupport.park(this);
    8. return Thread.interrupted();
    9. }
  • LockSupport.park: 线程进入Waiting状态

  • LockSupport.unPark: 线程恢复

释放资源分析

  1. /**
  2. * Releases in exclusive mode. Implemented by unblocking one or
  3. * more threads if {@link #tryRelease} returns true.
  4. * This method can be used to implement method {@link Lock#unlock}.
  5. *
  6. * 尝试释放成功后,获取队列头节点。状态非初始化且不为空时
  7. * unpark后继节点
  8. *
  9. * @param arg the release argument. This value is conveyed to
  10. * {@link #tryRelease} but is otherwise uninterpreted and
  11. * can represent anything you like.
  12. * @return the value returned from {@link #tryRelease}
  13. */
  14. public final boolean release(int arg) {
  15. if (tryRelease(arg)) {
  16. Node h = head;
  17. if (h != null && h.waitStatus != 0)
  18. unparkSuccessor(h);
  19. return true;
  20. }
  21. return false;
  22. }
  23. /**
  24. * Wakes up node's successor, if one exists.
  25. *
  26. * @param node the node
  27. */
  28. private void unparkSuccessor(Node node) {
  29. /*
  30. * If status is negative (i.e., possibly needing signal) try
  31. * to clear in anticipation of signalling. It is OK if this
  32. * fails or if status is changed by waiting thread.
  33. */
  34. int ws = node.waitStatus;
  35. //当前节点小于0,将当前节点置为0
  36. if (ws < 0)
  37. compareAndSetWaitStatus(node, ws, 0);
  38. /*
  39. * Thread to unpark is held in successor, which is normally
  40. * just the next node. But if cancelled or apparently null,
  41. * traverse backwards from tail to find the actual
  42. * non-cancelled successor.
  43. * 当前节点的下一个节点为 取消状态的节点时或不存在,
  44. * 从后向前遍历出一个 可以唤醒的节点
  45. *
  46. *
  47. */
  48. Node s = node.next;
  49. if (s == null || s.waitStatus > 0) {
  50. s = null;
  51. for (Node t = tail; t != null && t != node; t = t.prev)
  52. if (t.waitStatus <= 0)
  53. s = t;
  54. }
  55. if (s != null)
  56. LockSupport.unpark(s.thread);
  57. }
  • 有意思的是当可以释放资源节点的下一个节点无效时,使用从尾向前遍历找有效节点

参考文章

RedSpider《深入浅出多线程》- http://concurrent.redspider.group/article/02/11.html
雨润涛哥的私人笔记 - https://lvtao0408.github.io/2020/03/08/AQS/