什么是AQS?

AQS全称AbstractQueuedSynchronizer(队列同步器),用于多线程之间的同步操作,JUC中的ReentrantLock、CountDownLatch、Semaphore都是基于该工具实现的。

基本原理?

AQS持有一个stat属性,用于表述同步状态

  • stat>0,锁被获取
  • stat=0,锁空闲

针对stat的操作,AQS提供了三个方法

  • getStat
  • setStat
  • compareAndSetStat

AQS内部持有一个队列(CLH同步队列),用于处理线程排队。类似于monitorObject里面的_entry_set。
队列元素Node封装了线程信息和等待信息。Node结构如下:
当锁释放stat=0的时候,会唤醒队列中的线程进行再次争抢。

  1. static final class Node {
  2. /** 共享 */
  3. static final Node SHARED = new Node();
  4. /** 独占 */
  5. static final Node EXCLUSIVE = null;
  6. /**
  7. * 因为超时或者中断,节点会被设置为取消状态,被取消的节点时不会参与到竞争中的,他会一直保持取消状态不会转变为其他状态;
  8. */
  9. static final int CANCELLED = 1;
  10. /**
  11. * 后继节点的线程处于等待状态,而当前节点的线程如果释放了同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行
  12. */
  13. static final int SIGNAL = -1;
  14. /**
  15. * 节点在等待队列中,节点线程等待在Condition上,当其他线程对Condition调用了signal()后,改节点将会从等待队列中转移到同步队列中,加入到同步状态的获取中
  16. */
  17. static final int CONDITION = -2;
  18. /**
  19. * 表示下一次共享式同步状态获取将会无条件地传播下去
  20. */
  21. static final int PROPAGATE = -3;
  22. /** 等待状态 */
  23. volatile int waitStatus;
  24. /** 前驱节点 */
  25. volatile Node prev;
  26. /** 后继节点 */
  27. volatile Node next;
  28. /** 获取同步状态的线程 */
  29. volatile Thread thread;
  30. Node nextWaiter;
  31. }

提供的方法概览

  • getState():返回同步状态的当前值;
  • setState(int newState):设置当前同步状态;
  • compareAndSetState(int expect, int update):使用CAS设置当前状态,该方法能够保证状态设置的原子性;
  • tryAcquire(int arg):独占式获取同步状态,获取同步状态成功后,其他线程需要等待该线程释放同步状态才能获取同步状态;
  • tryRelease(int arg):独占式释放同步状态;
  • tryAcquireShared(int arg):共享式获取同步状态,返回值大于等于0则表示获取成功,否则获取失败;
  • tryReleaseShared(int arg):共享式释放同步状态;
  • isHeldExclusively():当前同步器是否在独占式模式下被线程占用,一般该方法表示是否被当前线程所独占;
  • acquire(int arg):独占式获取同步状态,如果当前线程获取同步状态成功,则由该方法返回,否则,将会进入同步队列等待,该方法将会调用可重写的tryAcquire(int arg)方法;
  • acquireInterruptibly(int arg):与acquire(int arg)相同,但是该方法响应中断,当前线程为获取到同步状态而进入到同步队列中,如果当前线程被中断,则该方法会抛出InterruptedException异常并返回;
  • tryAcquireNanos(int arg,long nanos):超时获取同步状态,如果当前线程在nanos时间内没有获取到同步状态,那么将会返回false,已经获取则返回true;
  • acquireShared(int arg):共享式获取同步状态,如果当前线程未获取到同步状态,将会进入同步队列等待,与独占式的主要区别是在同一时刻可以有多个线程获取到同步状态;
  • acquireSharedInterruptibly(int arg):共享式获取同步状态,响应中断;
  • tryAcquireSharedNanos(int arg, long nanosTimeout):共享式获取同步状态,增加超时限制;
  • release(int arg):独占式释放同步状态,该方法会在释放同步状态之后,将同步队列中第一个节点包含的线程唤醒;
  • releaseShared(int arg):共享式释放同步状态;

公平锁和非公平锁的区别

  1. abstract static class Sync extends AbstractQueuedSynchronizer {
  2. private static final long serialVersionUID = -5179523762034025860L;
  3. /**
  4. * Performs {@link Lock#lock}. The main reason for subclassing
  5. * is to allow fast path for nonfair version.
  6. */
  7. abstract void lock();
  8. /**
  9. * Performs non-fair tryLock. tryAcquire is implemented in
  10. * subclasses, but both need nonfair try for trylock method.
  11. */
  12. final boolean nonfairTryAcquire(int acquires) {
  13. final Thread current = Thread.currentThread();
  14. int c = getState();
  15. if (c == 0) {
  16. // 第二次机会,直接CAS操作state,成功就先于很多前辈线程了。
  17. if (compareAndSetState(0, acquires)) {
  18. setExclusiveOwnerThread(current);
  19. return true;
  20. }
  21. }
  22. else if (current == getExclusiveOwnerThread()) {
  23. int nextc = c + acquires;
  24. if (nextc < 0) // overflow
  25. throw new Error("Maximum lock count exceeded");
  26. setState(nextc);
  27. return true;
  28. }
  29. return false;
  30. }
  31. }
  32. static final class NonfairSync extends Sync {
  33. private static final long serialVersionUID = 7316153563782823691L;
  34. /**
  35. * Performs lock. Try immediate barge, backing up to normal
  36. * acquire on failure.
  37. */
  38. final void lock() {
  39. // 第一次机会,直接CAS操作state,成功就先于很多前辈线程了
  40. if (compareAndSetState(0, 1))
  41. setExclusiveOwnerThread(Thread.currentThread());
  42. else
  43. acquire(1);
  44. }
  45. protected final boolean tryAcquire(int acquires) {
  46. return nonfairTryAcquire(acquires);
  47. }
  48. }
  49. static final class FairSync extends Sync {
  50. private static final long serialVersionUID = -3000897897090466540L;
  51. final void lock() {
  52. acquire(1);
  53. }
  54. /**
  55. * Fair version of tryAcquire. Don't grant access unless
  56. * recursive call or no waiters or is first.
  57. */
  58. protected final boolean tryAcquire(int acquires) {
  59. final Thread current = Thread.currentThread();
  60. int c = getState();
  61. if (c == 0) {
  62. // 需要先判断队列里是否有前辈节点,有的话就不能直接CAS
  63. if (!hasQueuedPredecessors() &&
  64. compareAndSetState(0, acquires)) {
  65. setExclusiveOwnerThread(current);
  66. return true;
  67. }
  68. }
  69. else if (current == getExclusiveOwnerThread()) {
  70. int nextc = c + acquires;
  71. if (nextc < 0)
  72. throw new Error("Maximum lock count exceeded");
  73. setState(nextc);
  74. return true;
  75. }
  76. return false;
  77. }
  78. }
  1. 公平和非公平的逻辑是在Sync层实现的,不是AQS层。
  2. 公平和非公平体现在线程首次进入临界区的锁竞争上,也就是刚来有机会直接参与竞争(2次机会),如果一旦没抢到,那么后续的加入阻塞队列后的唤醒机制,就和其他老节点一样了,需要等待逐个唤醒。

    为什么是从尾部开始唤醒?

    ```java // 先来看看从后向前遍历的逻辑 private void unparkSuccessor(Node node) { int ws = node.waitStatus; // 如果head节点当前waitStatus<0, 将其修改为0 if (ws < 0)

    1. compareAndSetWaitStatus(node, ws, 0);

    // 下面的代码就是唤醒后继节点,但是有可能后继节点取消了等待(waitStatus==1) // 从队尾往前找,找到waitStatus<=0的所有节点中排在最前面的 Node s = node.next; if (s == null || s.waitStatus > 0) {

    1. s = null;
    2. // 从后往前找,仔细看代码,不必担心中间有节点取消(waitStatus==1)的情况
    3. for (Node t = tail; t != null && t != node; t = t.prev)
    4. if (t.waitStatus <= 0)
    5. s = t;

    } if (s != null)

    1. // 唤醒线程
    2. LockSupport.unpark(s.thread);

    }

// 原因在于构造队列的时候存在线程安全问题 // addWaiter和enq在入列过程中都下述问题,这里以enq为例。 // 顺便说下addWaiter和enq的关系,就是addWaiter进行compareAndSetTail失败,就会进入enq // enq的逻辑就是无限自旋去compareAndSetTail。 private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { // 把当前节点的前驱指向尾节点 node.prev = t; // 尝试将当前节点设定为队列尾节点 if (compareAndSetTail(t, node)) { // 设定成功后,将原尾节点的后驱指向当前节点(也就是新尾节点) // 但是如果此时发生线程切换,开始执行唤醒节点的逻辑,如果是从前遍历, // 到这里的原尾节点的next就是null,认为没有后续节点了 // 再考虑后续时间片分配回来,这个next又连上了,就会很奇怪。 // 从后向前则没有这个问题,根因就是前驱节点的赋值是在CAS操作之前的。 // 再深入考虑其实就是因为阻塞队列是双向的,需要两步操作导致的。 t.next = node; return t; } // 没有尝试成功自旋继续 } } }

  1. <a name="Vzkp2"></a>
  2. ### 为什么是双向队列?
  3. 首先队列的好处是增删方便,双向队列的好处是从任意一个位置的增删方便。<br />在单向前提下,如果要删除一个node,必须从头开始遍历到其前驱节点,再将其前驱节点的next指向其next节点。<br />而双向的话,如果要删除一个node,只需要将prev.next=node.next,next.prev=node.prev,很显然少了遍历的过程。<br />回到AQS,首先AQS不会发生需要在中间增加节点的情况,因为增加都是在尾部增加,这点上单向、双向都能实现。但是是否会发生在中间节点删除节点的情况呢?会!这就是AQS用双向链表的核心原因。<br />什么场景下会,waitStatus>0的时候,也是CANCELLED状态,AQS支持阻塞中断,应用上也就是ReentrantLock的tryLock(long millisecond),允许定时中断,node在中断后,就需要从队列中删除(GC回收)。
  4. ```java
  5. /**
  6. * Link to predecessor node that current node/thread relies on
  7. * for checking waitStatus. Assigned during enqueuing, and nulled
  8. * out (for sake of GC) only upon dequeuing. Also, upon
  9. * cancellation of a predecessor, we short-circuit while
  10. * finding a non-cancelled one, which will always exist
  11. * because the head node is never cancelled: A node becomes
  12. * head only as a result of successful acquire. A
  13. * cancelled thread never succeeds in acquiring, and a thread only
  14. * cancels itself, not any other node.
  15. *
  16. * 指向前驱节点,用于当前节点检查其waitStatus。
  17. * 入列时赋值,出列时置null(为了尽快被GC)
  18. * 此外,当前驱节点取消后,我们会一直向前寻找一个未取消的节点。
  19. * 而且一定会存在因为至少有头节点(已经获取到锁的节点不会被取消)兜底。
  20. * 一个取消后的节点不可能拿到锁,一个节点只有自己取消自己,其他节点不能取消
  21. */
  22. volatile Node prev;