AQS是什么?

抽象的队列同步器。是用来构建锁或者其他同步器组件的重量级基础框架和整个JUC体系的基石,通过内置的FIFO队列来完成资源获取线程的排队工作,并通过一个int类型的变量表示持有锁的状态。

为什么AQS是JUC基石?

和AQS相关的:

ReentrantLock

image.png

CountDownLatch

image.png

ReentrantReadWriteLock

Semaphore

AbstractQueuedSynchronizer(AQS) - 图3

进一步理解锁和同步器的关系

  • 锁: 面向锁的使用者 — 用户层面

    定义了程序员和锁交互层面的API,隐藏了实现细节,调用即可

  • 同步器: 面向锁的实现者

    提供统一规范并简化了锁的实现,屏蔽了同步状态管理、阻塞线程排队和通知、唤醒机制等。

能干什么?

加锁会导致阻塞

有阻塞就需要排队,实现排队必然需要某种形式的队列来进行管理

解释说明

抢到资源的线程直接使用处理业务逻辑,抢不到资源的必然涉及一种排队等候机制。抢占资源失败的线程继续去等待,但等待线程仍然保留获取锁的可能且获取锁流程仍在继续。

排队等候机制—队列?什么数据结构呢?
如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配,这个机制主要是CLH队列的变体实现的。将暂时获取不到锁的线程加入到队列中,这个队列就是AQS的抽象表现,它将请求共享资源的线程封装成队列的节点(Node),通过CAS、自旋以及LockSupport.park()的方式,维护state变量的状态,使并发达到同步的控制效果。
image.png

AQS初步

队列:
AQS使用一个volatile的int类型的成员变量来表示同步状态,通过内置的FIFO队列来完成资源获取的排队工作将每条要抢占资源的线程封装成一个Node节点来实现锁的分配,通过CAS完成对state值的修改。
image.png
内部体系架构:
image.png

AQS自身

AQS = state + CLH 变种的双端队列

AQS同步队列基本结构

image.png

同步状态state变量

CLH队列

虚拟的双向队列
image.png

Node

头指针 尾指针 前指针 后指针
image.png
Node等待状态:waitState成员变量
image.png

  1. static final class Node {
  2. /** Marker to indicate a node is waiting in shared mode */
  3. static final Node SHARED = new Node();
  4. /** Marker to indicate a node is waiting in exclusive mode */
  5. static final Node EXCLUSIVE = null;
  6. /** waitStatus value to indicate thread has cancelled */
  7. static final int CANCELLED = 1;
  8. /** waitStatus value to indicate successor's thread needs unparking */
  9. static final int SIGNAL = -1;
  10. /** waitStatus value to indicate thread is waiting on condition */
  11. static final int CONDITION = -2;
  12. /**
  13. * waitStatus value to indicate the next acquireShared should
  14. * unconditionally propagate
  15. */
  16. static final int PROPAGATE = -3;
  17. /**
  18. * Status field, taking on only the values:
  19. * SIGNAL: The successor of this node is (or will soon be)
  20. * blocked (via park), so the current node must
  21. * unpark its successor when it releases or
  22. * cancels. To avoid races, acquire methods must
  23. * first indicate they need a signal,
  24. * then retry the atomic acquire, and then,
  25. * on failure, block.
  26. * CANCELLED: This node is cancelled due to timeout or interrupt.
  27. * Nodes never leave this state. In particular,
  28. * a thread with cancelled node never again blocks.
  29. * CONDITION: This node is currently on a condition queue.
  30. * It will not be used as a sync queue node
  31. * until transferred, at which time the status
  32. * will be set to 0. (Use of this value here has
  33. * nothing to do with the other uses of the
  34. * field, but simplifies mechanics.)
  35. * PROPAGATE: A releaseShared should be propagated to other
  36. * nodes. This is set (for head node only) in
  37. * doReleaseShared to ensure propagation
  38. * continues, even if other operations have
  39. * since intervened.
  40. * 0: None of the above
  41. *
  42. * The values are arranged numerically to simplify use.
  43. * Non-negative values mean that a node doesn't need to
  44. * signal. So, most code doesn't need to check for particular
  45. * values, just for sign.
  46. *
  47. * The field is initialized to 0 for normal sync nodes, and
  48. * CONDITION for condition nodes. It is modified using CAS
  49. * (or when possible, unconditional volatile writes).
  50. */
  51. volatile int waitStatus;
  52. /**
  53. * Link to predecessor node that current node/thread relies on
  54. * for checking waitStatus. Assigned during enqueuing, and nulled
  55. * out (for sake of GC) only upon dequeuing. Also, upon
  56. * cancellation of a predecessor, we short-circuit while
  57. * finding a non-cancelled one, which will always exist
  58. * because the head node is never cancelled: A node becomes
  59. * head only as a result of successful acquire. A
  60. * cancelled thread never succeeds in acquiring, and a thread only
  61. * cancels itself, not any other node.
  62. */
  63. volatile Node prev;
  64. /**
  65. * Link to the successor node that the current node/thread
  66. * unparks upon release. Assigned during enqueuing, adjusted
  67. * when bypassing cancelled predecessors, and nulled out (for
  68. * sake of GC) when dequeued. The enq operation does not
  69. * assign next field of a predecessor until after attachment,
  70. * so seeing a null next field does not necessarily mean that
  71. * node is at end of queue. However, if a next field appears
  72. * to be null, we can scan prev's from the tail to
  73. * double-check. The next field of cancelled nodes is set to
  74. * point to the node itself instead of null, to make life
  75. * easier for isOnSyncQueue.
  76. */
  77. volatile Node next;
  78. /**
  79. * The thread that enqueued this node. Initialized on
  80. * construction and nulled out after use.
  81. */
  82. volatile Thread thread;
  83. /**
  84. * Link to next node waiting on condition, or the special
  85. * value SHARED. Because condition queues are accessed only
  86. * when holding in exclusive mode, we just need a simple
  87. * linked queue to hold nodes while they are waiting on
  88. * conditions. They are then transferred to the queue to
  89. * re-acquire. And because conditions can only be exclusive,
  90. * we save a field by using special value to indicate shared
  91. * mode.
  92. */
  93. Node nextWaiter;
  94. /**
  95. * Returns true if node is waiting in shared mode.
  96. */
  97. final boolean isShared() {
  98. return nextWaiter == SHARED;
  99. }
  100. /**
  101. * Returns previous node, or throws NullPointerException if null.
  102. * Use when predecessor cannot be null. The null check could
  103. * be elided, but is present to help the VM.
  104. *
  105. * @return the predecessor of this node
  106. */
  107. final Node predecessor() throws NullPointerException {
  108. Node p = prev;
  109. if (p == null)
  110. throw new NullPointerException();
  111. else
  112. return p;
  113. }
  114. Node() { // Used to establish initial head or SHARED marker
  115. }
  116. Node(Thread thread, Node mode) { // Used by addWaiter
  117. this.nextWaiter = mode;
  118. this.thread = thread;
  119. }
  120. Node(Thread thread, int waitStatus) { // Used by Condition
  121. this.waitStatus = waitStatus;
  122. this.thread = thread;
  123. }
  124. }

Node 有共享和独占两种模式。
Node = waitStatus + 前后指针

从ReentrantLock解读AQS

Lock接口的实现类,基本都是通过聚合了一个队列同步器的子类完成对线程访问控制的。

Reentrant的原理

image.png

从lock方法看公平和非公平

image.png
image.png
公平锁和非公平锁的lock()方法唯一的区别在于公平锁在获取同步状态的时候多了一个限制条件hasQueuedPredecessors(),这个是公平锁加锁时判断等待队列中是否存在有效节点的方法。

非公平锁

image.png
image.png

整个体系

  1. static final class NonfairSync extends Sync {
  2. private static final long serialVersionUID = 7316153563782823691L;
  3. /**
  4. * Performs lock. Try immediate barge, backing up to normal
  5. * acquire on failure.
  6. */
  7. final void lock() {
  8. if (compareAndSetState(0, 1))
  9. setExclusiveOwnerThread(Thread.currentThread());
  10. else
  11. acquire(1);
  12. }
  13. protected final boolean tryAcquire(int acquires) {
  14. return nonfairTryAcquire(acquires);
  15. }
  16. }

整个ReentrantLock的加锁过程分为三个阶段:

  • 尝试加锁
  • 加锁失败,线程入队列
  • 线程入队列后,进入阻塞状态

双向链表中,第一个节点是一个虚节点(也叫哨兵节点),其实并不存储任何信息,只是占位。真正的第一个有数据的节点,是从第二个节点开始。负责唤醒后面的节点。
image.png