Java 并发 AQS

概述

AbstractQueuedSynchronizer可以称为抽象队列同步器。
AQS有独占模式和共享模式两种:

  • 独占模式

公平锁:
非公平锁:

  • 共享模式

    数据结构

    基本属性

    ```java /**
    • 同步等待队列的头结点 */ private transient volatile Node head;

/**

  • 同步等待队列的尾结点 */ private transient volatile Node tail;

/**

  • 同步资源状态 */ private volatile int state;

    1. <a name="xJDcl"></a>
    2. #### 内部类
    3. ```java
    4. static final class Node {
    5. /**
    6. * 标记节点为共享模式
    7. */
    8. static final Node SHARED = new Node();
    9. /**
    10. * 标记节点为独占模式
    11. */
    12. static final Node EXCLUSIVE = null;
    13. static final int CANCELLED = 1;
    14. static final int SIGNAL = -1;
    15. static final int CONDITION = -2;
    16. static final int PROPAGATE = -3;
    17. /**
    18. * CANCELLED: 值为1,表示当前的线程被取消
    19. * SIGNAL: 值为-1,表示当前节点的后继节点包含的线程需要运行,也就是unpark;
    20. * CONDITION: 值为-2,表示当前节点在等待condition,也就是在condition队列中;
    21. * PROPAGATE: 值为-3,表示当前场景下后续的acquireShared能够得以执行;
    22. * 0: 表示当前节点在sync队列中,等待着获取锁。
    23. * 表示当前节点的状态值
    24. */
    25. volatile int waitStatus;
    26. /**
    27. * 前置节点
    28. */
    29. volatile Node prev;
    30. /**
    31. * 后继节点
    32. */
    33. volatile Node next;
    34. /**
    35. * 节点同步状态的线程
    36. */
    37. volatile Thread thread;
    38. /**
    39. * 存储condition队列中的后继节点
    40. */
    41. Node nextWaiter;
    42. /**
    43. * 是否为共享模式
    44. */
    45. final boolean isShared() {
    46. return nextWaiter == SHARED;
    47. }
    48. /**
    49. * 获取前驱结点
    50. */
    51. final Node predecessor() throws NullPointerException {
    52. Node p = prev;
    53. if (p == null)
    54. throw new NullPointerException();
    55. else
    56. return p;
    57. }
    58. Node() { // Used to establish initial head or SHARED marker
    59. }
    60. Node(Thread thread, Node mode) { // Used by addWaiter
    61. this.nextWaiter = mode;
    62. this.thread = thread;
    63. }
    64. Node(Thread thread, int waitStatus) { // Used by Condition
    65. this.waitStatus = waitStatus;
    66. this.thread = thread;
    67. }
    68. }

    主要方法解析

    Java并发之AQS原理剖析 - 图1

    tryAcquire/tryAcquireShared(int arg)

    独占/共享模式获取锁;由子类实现,仅仅获取锁,获取锁失败时不进行阻塞排队。

    tryRelease/tryReleaseShared(int arg)

    独占/共享模式释放锁;由子类实现,仅仅释放锁,释放锁成功不对后继节点进行唤醒操作。

    acquire/acquireShared(int arg)

    独占/共享模式获取锁,如果线程被中断唤醒,会返回线程中断状态,不会抛异常中止执行操作(忽略中断)。

    acquireInterruptibly/acquireSharedInterruptibly(int arg)

    独占/共享模式获取锁,线程如果被中断唤醒,则抛出InterruptedException异常(中断即中止)。

    tryAcquireNanos/tryAcquireSharedNanos(int arg, long nanosTimeout)

    独占/共享时间中断模式获取锁,线程如果被中断唤醒,则抛出InterruptedException异常(中断即中止);如果超出等待时间则返回加锁失败。

    release/releaseShared(int arg)

    独占/共享模式释放锁。

    addWaiter(Node mode)

    将给定模式节点进行入队操作。

    1. private Node addWaiter(Node mode) {
    2. // 根据指定模式,新建一个当前节点的对象
    3. Node node = new Node(Thread.currentThread(), mode);
    4. // Try the fast path of enq; backup to full enq on failure
    5. Node pred = tail;
    6. if (pred != null) {
    7. // 将当前节点的前置节点指向之前的尾结点
    8. node.prev = pred;
    9. // 将当前等待的节点设置为尾结点(原子操作)
    10. if (compareAndSetTail(pred, node)) {
    11. // 之前尾结点的后继节点设置为当前等待的节点
    12. pred.next = node;
    13. return node;
    14. }
    15. }
    16. enq(node);
    17. return node;
    18. }

    enq(final Node node)

    将节点设置为尾结点。注意这里会进行自旋操作,确保节点设置成功。因为等待的线程需要被唤醒操作;如果操作失败,当前节点没有与其他节点没有引用指向关系,一直就不会被唤醒(除非程序代码中断线程)。 ```java private Node enq(final Node node) { for (;;) {

    1. Node t = tail;
    2. // 判断尾结点是否为空,尾结点初始值是为空
    3. if (t == null) { // Must initialize
    4. // 尾结点为空,需要初始化
    5. if (compareAndSetHead(new Node()))
    6. tail = head;
    7. } else {
    8. // 设置当前节点设置为尾结点
    9. node.prev = t;
    10. if (compareAndSetTail(t, node)) {
    11. t.next = node;
    12. return t;
    13. }
    14. }

    } }

  1. <a name="xOBCm"></a>
  2. #### `acquireQueued(final Node node, int arg)`
  3. 已经在队列当中的节点,准备阻塞获取锁。在阻塞前会判断前置节点是否为头结点,如果为头结点;这时会尝试获取下锁(因为这时头结点有可能会释放锁)。
  4. ```java
  5. final boolean acquireQueued(final Node node, int arg) {
  6. boolean failed = true;
  7. try {
  8. boolean interrupted = false;
  9. for (;;) {
  10. // 当前节点的前置节点
  11. final Node p = node.predecessor();
  12. // 入队前会先判断下该节点的前置节点是否是头节点(此时头结点有可能会释放锁);然后尝试去抢锁
  13. // 在非公平锁场景下有可能会抢锁失败,这时候会继续往下执行 阻塞线程
  14. if (p == head && tryAcquire(arg)) {
  15. //如果抢到锁,将头节点后移(也就是将该节点设置为头结点)
  16. setHead(node);
  17. p.next = null; // help GC
  18. failed = false;
  19. return interrupted;
  20. }
  21. // 如果前置节点不是头结点,或者当前节点抢锁失败;通过shouldParkAfterFailedAcquire判断是否应该阻塞
  22. // 当前置节点的状态为SIGNAL=-1,才可以安全被parkAndCheckInterrupt阻塞线程
  23. if (shouldParkAfterFailedAcquire(p, node) &&
  24. parkAndCheckInterrupt())
  25. // 该线程已被中断
  26. interrupted = true;
  27. }
  28. } finally {
  29. if (failed)
  30. cancelAcquire(node);
  31. }
  32. }

shouldParkAfterFailedAcquire(Node pred, Node node)

检查和更新未能获取锁节点的状态,返回是否可以被安全阻塞。

  1. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  2. int ws = pred.waitStatus; // 获取前置节点的状态
  3. if (ws == Node.SIGNAL)
  4. /*
  5. * 前置节点的状态waitStatus为SIGNAL=-1,当前线程可以安全的阻塞
  6. */
  7. return true;
  8. if (ws > 0) {
  9. /*
  10. * 如果前置节点的状态waitStatus>0,即waitStatus为CANCELLED=1(无效节点),需要从同步状态队列中取消等待(移除队列)
  11. */
  12. do {
  13. node.prev = pred = pred.prev;
  14. } while (pred.waitStatus > 0);
  15. pred.next = node;
  16. } else {
  17. /*
  18. * 将前置状态的waitStatus修改为SIGNAL=-1,然后当前节点才可以被安全的阻塞
  19. */
  20. compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
  21. }
  22. return false;
  23. }

parkAndCheckInterrupt()

阻塞当前节点,返回当前线程的中断状态。

  1. private final boolean parkAndCheckInterrupt() {
  2. LockSupport.park(this); //阻塞
  3. return Thread.interrupted();
  4. }

cancelAcquire(Node node)

取消进行的获取锁操作,在非忽略中断模式下,线程被中断唤醒抛异常时会调用该方法。

  1. // 将当前节点的状态设置为CANCELLED,无效的节点,同时移除队列
  2. private void cancelAcquire(Node node) {
  3. if (node == null)
  4. return;
  5. node.thread = null;
  6. Node pred = node.prev;
  7. while (pred.waitStatus > 0)
  8. node.prev = pred = pred.prev;
  9. Node predNext = pred.next;
  10. node.waitStatus = Node.CANCELLED;
  11. if (node == tail && compareAndSetTail(node, pred)) {
  12. compareAndSetNext(pred, predNext, null);
  13. } else {
  14. int ws;
  15. if (pred != head &&
  16. ((ws = pred.waitStatus) == Node.SIGNAL ||
  17. (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
  18. pred.thread != null) {
  19. Node next = node.next;
  20. if (next != null && next.waitStatus <= 0)
  21. compareAndSetNext(pred, predNext, next);
  22. } else {
  23. unparkSuccessor(node);
  24. }
  25. node.next = node; // help GC
  26. }
  27. }

hasQueuedPredecessors()

判断当前线程是否应该排队。
1.第一种结果——返回true:(1.1和1.2同时存在,1.2.1和1.2.2有一个存在)
1.1 h != t为true,说明头结点和尾结点不相等,表示队列中至少有两个不同节点存在,至少有一点不为null。
1.2 ((s = h.next) == null || s.thread != Thread.currentThread())为true
1.2.1 (s = h.next) == null为true,表示头结点之后没有后续节点。
1.2.2 (s = h.next) == null为false,s.thread != Thread.currentThread()为true
表示头结点之后有后续节点,但是头节点的下一个节点不是当前线程
2.第二种结果——返回false,无需排队。(2.1和2.2有一个存在)
2.1 h != t为false,即h == t;表示h和t同时为null或者h和t是同一个节点,无后续节点。
2.2 h != t为true,((s = h.next) == null || s.thread != Thread.currentThread())为false
表示队列中至少有两个不同节点存在,同时持有锁的线程为当前线程。

  1. public final boolean hasQueuedPredecessors() {
  2. Node t = tail; // Read fields in reverse initialization order
  3. Node h = head;
  4. Node s;
  5. return h != t &&
  6. ((s = h.next) == null || s.thread != Thread.currentThread());
  7. }