AQS是JUC包的一个同步器,实现了锁的基本抽象功能,支持独占锁和共享锁。该类使用模板方法设计模式实现,称为构建锁和同步器的框架,使该类可以简单高效的构造出广泛的同步器或等待队列。

1、AQS内部队列

AQS内部维护了一个先进先出的双向链表,每个节点是线程封装的,每当线程通过AQS获取锁失败时,线程就会被封装成一个Node节点,通过CAS原子操作插入队列尾部,每当有线程释放锁时,AQS会尝试让队头的后继节点占用锁。


thread
表示对应放入AQS队列节点关联的线程


每个节点与等待线程关联,每个节点维护了一个状态waitStatus

  • static final int CANCELLED = 1

表示线程因为中断或者等待超时从等待队列取消等待,已取消的节点不会再阻塞

  • static final int SIGNAL = -1

表示其后继节点处于等待唤醒状态,当前节点对应的线程如果释放了锁或者被取消,需要唤醒其后继节点,使后继节点的线程得以运行

  • static final int CONDITION = -2

表示节点对应的线程在条件队列中阻塞

  • static final int PROPAGATE = -3

表示下一个线程获取到共享锁后,自己的共享状态会被无限的传播下去。因为共享锁时多个线程共有的,一个节点的线程获取锁后,必然要通知后继共享节点的线程。这种状态在CountDownLatch中用到了。

  • 0

表示节点处于初始状态


Node为先进先出链表节点的封装类,每个Node有两个指针,分别指向前驱节点和后继节点。
prev: 当前节点的前置节点
next:当前节点的后继节点


nextWaiter
如果当前Node节点不是一个普通节点而是条件等待节点,则节点处于某个条件的等待队列上,此属性指向下一个条件等待节点,即其条件队列的后继节点


  1. static final class Node {
  2. /**共享模式*/
  3. static final Node SHARED = new Node();
  4. /**独占模式*/
  5. static final Node EXCLUSIVE = null;
  6. /**当前节点处于取消状态*/
  7. static final int CANCELLED = 1;
  8. /**当前节点需要唤醒其后继,后继节点处于等待唤醒状态(SIGNAL表示的是后继节点的状态)*/
  9. static final int SIGNAL = -1;
  10. /** 当前线程正在进行条件等待 */
  11. static final int CONDITION = -2;
  12. /**下一次共享锁的acquireShared需要无条件传播*/
  13. static final int PROPAGATE = -3;
  14. /**
  15. * node状态可选值
  16. *
  17. * SIGNAL 当前节点需要唤醒其后继,后继节点处于等待唤醒状态
  18. * CANCELLED 当前节点处于取消状态
  19. * CONDITION 当前线程正在进行条件等待
  20. * PROPAGATE 下一次共享锁的acquireShared需要无条件传播
  21. * 0
  22. *
  23. * waitStatus == 0 默认状态
  24. * waitStatus >0 取消状态
  25. * waitStatus == -1 表示当前node如果是head节点时 释放锁后需要唤醒后继节点
  26. */
  27. volatile int waitStatus;
  28. /**前置节点*/
  29. volatile Node prev;
  30. /**后继节点*/
  31. volatile Node next;
  32. /**当前node的线程*/
  33. volatile Thread thread;
  34. /**
  35. *如果当前Node节点不是一个普通节点而是条件等待节点,则节点处于某个条件的等待队列上,此属性指向下一个条件等待节点,即其条件队列的后继节点
  36. */
  37. Node nextWaiter;
  38. }

2、AQS核心属性

head和tail
AQS通过内置的FIFO双向队列来完成线程的排队工作,内部通过节点的head和tail记录队首和队尾元素。AQS的头节点和尾节点都是懒加载的,在需要的时候才真正创建。只有在线程竞争CAS失败的情况下,有新的线程加入同步队列,AQS才创建一个head节点,head节点永远指向持锁线程,尾节点只有在新线程阻塞时才创建。
阻塞队列不包含头节点,head的下一个节点到尾节点才是等待锁的阻塞队列。


state
表示加锁状态,state初始化为0,表示未锁定状态

  1. public abstract class AbstractQueuedSynchronizer
  2. extends AbstractOwnableSynchronizer
  3. implements java.io.Serializable {
  4. */
  5. /**头节点 任何时刻头节点对应的线程都是持锁线程*/
  6. private transient volatile Node head;
  7. /**
  8. * Tail of the wait queue, lazily initialized. Modified only via
  9. * method enq to add new wait node.
  10. */
  11. /**尾结点*/
  12. private transient volatile Node tail;
  13. /**
  14. * The synchronization state.
  15. */
  16. /**
  17. *表示资源:
  18. * 独占模式下: 0 未加锁状态 1 加锁状态
  19. *
  20. */
  21. private volatile int state;
  22. }

3、AQS对外核心方法

3.1、AQS#acquire获取锁

获取锁的核心方法,如果获取锁成功则继续往下执行,否则将当前线程封装为节点加入等待队列。然后线程被挂起,被唤醒后继续尝试抢锁,如果抢锁失败,继续挂起等待唤醒,直到抢锁成功则退出,最后返回中断标记。


其中tryAcquire方法需要子类实现,为尝试获取锁逻辑。
addWaiter方法为将当前线程封装成节点加入等待队列逻辑,详见第4节内部核心方法AQS#addWaiter

acquireQueued方法为阻塞当前线程获取锁逻辑,详见第4节内部核心方法AQS#acquireQueued

  1. /**
  2. * 阻塞等待获取锁
  3. * 1、AQS为无锁状态,第一次直接尝试抢锁然后成功
  4. * 2、AQS为无锁状态且AQS队列没有线程等待锁但是CAS抢锁失败 或者 AQS队列有线程等待锁,将当前线程封装成节点加入等待队列,
  5. * 然后在acquireQueued方法中发现前置节点为头节点,再次尝试抢锁,然后成功了
  6. * 3、AQS队列有线程等待锁,将当前线程封装成节点加入等待队列,然后线程被挂起了,唤醒(可能外部线程调用unpark或者中断)后继续抢锁,如果失败则继续挂起,直到抢锁成功
  7. * @param arg
  8. */
  9. public final void acquire(int arg) {
  10. /**
  11. *条件1: !tryAcquire(arg) 尝试获取锁 子类实现
  12. * 返回true 第一次尝试获取锁失败
  13. * 返回false 第一次尝试获取锁成功
  14. * 条件2: acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
  15. * 前置条件(第一次尝试获取锁失败)
  16. * addWaiter(Node.EXCLUSIVE) 将当前线程封装成node然后入队
  17. * acquireQueued 1、挂起当前线程 2、执行唤醒后相关逻辑
  18. * 返回true 挂起过程中线程被中断或者唤醒过
  19. * 返回false 未被中断过
  20. *
  21. */
  22. if (!tryAcquire(arg) &&
  23. acquireQueued(addWaiter(Node.EXCLUSIVE), arg)){
  24. /**
  25. * 如果线程被挂起后被中断退出 再次设置中断标记
  26. */
  27. selfInterrupt();
  28. }
  29. }

3.2、AQS#release释放锁

此处释放锁仅仅只是唤醒队列中最近的等锁线程,并没有对头节点进行出队操作,由被唤醒的线程做出队操作,详见第4节内部核心方法AQS#acquireQueued

  1. /**
  2. * 释放锁
  3. * @param arg
  4. * @return
  5. */
  6. public final boolean release(int arg) {
  7. if (tryRelease(arg)) {
  8. Node h = head;
  9. /**
  10. * 条件1: h != null
  11. * head是懒加载的,head为null说明没有发生锁竞争,当前线程拿锁时就为空队列,一直没有创建head节点。只有线程尝试拿锁时,发现为无锁状态,并且队列为空队列,
  12. * 但是CAS抢锁时失败了,此时将当前线程封装为节点入队时首先就要创建一个head节点,然后将当前线程节点放在尾结点
  13. *
  14. * 条件2:h.waitStatus != 0
  15. * 返回true 当前节点后面一定插入过其他节点 当前线程需要唤醒后继节点
  16. */
  17. if (h != null && h.waitStatus != 0){
  18. //唤醒最近的后继节点
  19. unparkSuccessor(h);
  20. }
  21. return true;
  22. }
  23. return false;
  24. }

3.3、AQS#acquireInterruptibly响应中断的加锁

被中断会抛出异常,首先尝试直接抢锁,抢锁失败则走可中断抢锁逻辑

  1. /**
  2. * 响应中断的加锁方式
  3. * @param arg
  4. * @throws InterruptedException
  5. */
  6. public final void acquireInterruptibly(int arg)
  7. throws InterruptedException {
  8. //如果当前线程被中断了 直接抛出中断异常
  9. if (Thread.interrupted())
  10. throw new InterruptedException();
  11. //尝试加锁
  12. if (!tryAcquire(arg))
  13. doAcquireInterruptibly(arg);
  14. }

3.4、AQS#tryAcquireNanos在指定时间内尝试获取锁

被中断会抛出异常,首先尝试直接抢锁,抢锁失败则走在指定时间内尝试获取锁逻辑

  1. /**
  2. * 在指定时间内尝试获取锁
  3. * @param arg
  4. * @param nanosTimeout 超时时间
  5. * @return
  6. * @throws InterruptedException
  7. */
  8. public final boolean tryAcquireNanos(int arg, long nanosTimeout)
  9. throws InterruptedException {
  10. if (Thread.interrupted())
  11. throw new InterruptedException();
  12. /**
  13. * 先直接调用tryAcquire尝试获取锁 然后 调用doAcquireNanos 阻塞等待锁
  14. */
  15. return tryAcquire(arg) ||
  16. doAcquireNanos(arg, nanosTimeout);
  17. }

4、AQS#内部核心方法

4.1、AQS#addWaiter将当前线程封装成node然后入队

将当前线程封装成node然后入队,然后返回当前线程封装的节点。入队过程中如果队列中没有节点,要先初始化队列,新建一个空节点入队,并且头节点和尾结点都指向它,然后将当前线程封装的节点以尾结点身份自旋入队;如果队列中有节点,则直接将当前线程封装的节点以尾结点身份自旋入队。

  1. /**
  2. * 将当前线程封装成node然后入队
  3. * @param mode 独占模式获取共享模式
  4. * @return 返回当前线程封装的节点
  5. */
  6. private Node addWaiter(Node mode) {
  7. // 构建node 把当前线程封装到node中
  8. Node node = new Node(mode);
  9. for (;;) {
  10. Node oldTail = tail;
  11. // oldTail != null说明当前队列有节点,不需要再新建头节点
  12. if (oldTail != null) {
  13. //当前节点的前驱指向尾结点
  14. node.setPrevRelaxed(oldTail);
  15. /**
  16. * CAS替换tail为当前节点
  17. * 返回true 当前节点成为了尾结点
  18. * 返回false 说明其它线程追加了节点,在CAS时oldTail不是当前队列的尾结点了,此时拿到的oldTail是脏数据,
  19. * 需要重新获取,for循环,获取新的尾结点,重新替换尾结点,直到当前节点被指定为尾结点成功入队
  20. */
  21. if (compareAndSetTail(oldTail, node)) {
  22. //旧的尾结点的后继指向新的尾结点
  23. oldTail.next = node;
  24. //当前线程封装的节点已经以尾结点的身份入队了,返回当前线程封装的节点
  25. return node;
  26. }
  27. } else {
  28. /**
  29. *说明当前队列没有节点,初始化队列
  30. * 需要新建一个空节点,通过CAS替换到头节点并且tail也指向头节点
  31. * 无论CAS成功还是失败(CAS替换到头节点有并发) 队列都会新建一个头节点,然后for循环走下一步逻辑
  32. */
  33. initializeSyncQueue();
  34. }
  35. }
  36. }

4.2、AQS#hasQueuedPredecessors判断当前AQS的队列中是否有线程在等锁

如果当前AQS的队列中有不为CALCEL状态的节点在等锁,且最靠近头节点的不为CALCEL状态的节点对应线程和当前线程不是同一个线程,说明当前线程需要被挂起。
如果当前AQS的队列中没有节点或者靠近头节点的不为CALCEL状态的节点对应线程和当前线程是同一个线程,则当前线程不需要等锁,有抢锁的权利。

  1. /**
  2. * 判断当前AQS的队列中是否有线程在等锁
  3. * @return
  4. * 返回true
  5. * 当前AQS的队列中有线程在等锁,且最靠近头节点的等锁线程和当前线程不是同一个线程
  6. * 返回false
  7. * (1)当前AQS的队列中没有线程等锁
  8. * (2)当前AQS的队列中有线程在等锁,但是最靠近头节点的等锁线程和当前线程是同一个线程
  9. */
  10. public final boolean hasQueuedPredecessors() {
  11. Node h, s;
  12. //如果 head节点为空 则证明没有线程等待,直接尝试抢锁
  13. if ((h = head) != null) {
  14. /**
  15. * 条件1:(s = h.next) == null
  16. * 返回true 头节点的后继节点为空
  17. * 返回false 头节点的后继节点不为空
  18. * 条件2:s.waitStatus > 0
  19. * 前置条件:头节点的后继节点不为空
  20. * 返回true 头节点的后继节点为CANCEL状态
  21. * 返回false 头节点的后继节点不为CANCEL状态
  22. */
  23. if ((s = h.next) == null || s.waitStatus > 0) {
  24. /**
  25. * 运行到此处的情况
  26. * 1、头节点的后继节点为空
  27. * (1)当前节点为尾结点
  28. * (2)新节点入队未完全完成 当前队列状态为 a<=>b<-c的状态 ,因为入队时是先将待入队节点指向尾结点,
  29. * 然后通过CAS将当前节点设置为尾结点,最后将旧尾结点的后继指向新尾结点
  30. * 2、头节点的后继节点为CANCEL状态
  31. */
  32. //s表示靠近头节点且不为CANCEL状态的节点
  33. s = null; // traverse in case of concurrent cancellation
  34. //因为存在a<=>b<-c队列的可能,所以从后往前找能找到所有的节点,寻找最靠近头节点的状态不为CANCEL的节点
  35. for (Node p = tail; p != h && p != null; p = p.prev) {
  36. if (p.waitStatus <= 0)
  37. s = p;
  38. }
  39. }
  40. /**
  41. * 条件1: s != null
  42. * 返回true 队列中存在未取消的等锁线程节点
  43. * 返回false 队列中不存在未取消的等锁线程节点
  44. * 条件2:s.thread != Thread.currentThread()
  45. * 前置条件:队列中存在未取消的等锁线程节点
  46. * 返回true: 当前AQS的队列中有线程在等锁,且最靠近头节点的等锁线程和当前线程不是同一个线程
  47. * 返回false: 当前AQS的队列中有线程在等锁,但是最靠近头节点的等锁线程和当前线程是同一个线程
  48. */
  49. if (s != null && s.thread != Thread.currentThread())
  50. return true;
  51. }
  52. return false;
  53. }

4.3、AQS#acquireQueued阻塞挂起线程

阻塞获取锁核心逻辑。
1、第一次尝试获取锁失败后,然后将当前线程节点加入到等待队列。如果当前线程节点前置节点为CANCEL状态,循环删除当前线程节点前的CANCEL状态节点,然后将前置节点设置为signal状态,然后将当前线程挂起。
2、被唤醒(外部线程调用unpark或者中断,绝大多数情况下是前置节点释放锁唤醒当前线程)后,如果当前节点的前置节点是头节点,会继续尝试抢锁,如果当前节点的前置节点不头节点或者抢锁失败, 则继续挂起等待唤醒,直到抢锁成功
3、抢锁成功后要协助旧的头节点出队(包括将当前线程节点设置为头节点,断开旧的头节点的后继指针),最后返回中断标记

  1. /**
  2. * 阻塞获取锁核心逻辑
  3. * (1)若当前线程节点的前置节点为signal状态,直接挂起当前线程,否则将当前线程节点的前置节点调整为signal状态(循环多次调整,删除当前线程节点前的CANCEL状态节点,
  4. * 设置当前线程节点的前置节点为signal状态),然后挂起当前线程
  5. * (2)被唤醒(外部线程调用unpark或者中断,绝大多数情况下是前置节点释放锁唤醒当前线程)后继续尝试抢锁,如果抢锁失败,
  6. * 继续挂起等待唤醒,直到抢锁成功
  7. * (3)抢锁成功后要协助旧的头节点出队,最后返回中断标记
  8. * @param node 当前线程的node,并且已经入队成功
  9. * @param arg state的值
  10. * @return 返回当前线程的中断标记
  11. */
  12. final boolean acquireQueued(final Node node, int arg) {
  13. // 当前线程是否被中断
  14. boolean interrupted = false;
  15. try {
  16. for (;;) {
  17. //获取当前节点的上一个节点
  18. final Node p = node.predecessor();
  19. /**
  20. * 条件1: p == head
  21. * 返回true 当前节点的上一个节点为头节点 如果当前节点的前置节点是head节点,说明当前节点为head.next节点,拥有抢锁资格
  22. * 返回false 当前节点的上一个节点不是头节点
  23. *条件2: tryAcquire(arg)
  24. * 前置条件:当前节点的上一个节点为头节点
  25. * 返回true 抢锁成功
  26. * 返回false 抢锁失败
  27. */
  28. if (p == head && tryAcquire(arg)) {
  29. //如果当前线程节点抢锁成功 则将当前节点设置为头节点
  30. setHead(node);
  31. //协助老的head节点出队
  32. p.next = null; // help GC
  33. //返回中断标记
  34. return interrupted;
  35. }
  36. /**
  37. * 运行到这里的可能情况:
  38. * 1、当前节点的前驱节点不是头节点
  39. * 2、当前节点的前驱节点是头节点,但是尝试抢锁失败(可能头节点线程还没有释放锁)
  40. */
  41. /**
  42. * shouldParkAfterFailedAcquire 确保当前线程节点的前置节点为signal状态,才能被挂起
  43. * true 当前线程节点的前置节点为signal状态,当前线程可以被挂起
  44. * false 当前线程节点的前置节点不为signal状态,当前线程不能被挂起,此时继续走for循环,下一次会尝试将当前线程节点的
  45. * 前置节点设置signal状态,直到该方法返回true
  46. */
  47. if (shouldParkAfterFailedAcquire(p, node)){
  48. /**
  49. * parkAndCheckInterrupt() 将当前线程挂起,被唤醒后返回是否是中断唤醒的
  50. * true 其它线程中断当前线程唤醒挂起线程
  51. * false 正常唤醒 外部线程调用unpark
  52. */
  53. interrupted |= parkAndCheckInterrupt();
  54. }
  55. }
  56. } catch (Throwable t) {
  57. cancelAcquire(node);
  58. if (interrupted)
  59. selfInterrupt();
  60. throw t;
  61. }
  62. }

4.4、AQS#houldParkAfterFailedAcquire判断当前线程符合挂起条件

判断当前线程符合挂起条件,只有当前线程前置节点为SIGNAL状态时,当前线程才可以挂起,当前线程前置节点对应线程释放锁后会唤醒当前线程。
第一次进入该方法时,一般前置节点不是SIGNAL状态,如果当前线程节点前有状态为CANCEL的节点,需要循环移除,然后进入退出本方法,进入下一次循环,然后将前置节点状态设置为SIGNAL状态,然后再次退出,进入下一次循环,再下一次进入,当前线程前置节点为SIGNAL状态就返回true了,线程就可以被挂起了。

  1. * 判断当前线程符合挂起条件 即当前节点的前置节点是否为SIGNAL状态
  2. * 只有当当前节点的前置节点为SIGNAL状态时,才可以挂起,不然挂起之后不会被唤醒,前置节点设置为signal状态,
  3. * 表示前置节点释放锁后需要唤醒当前线程封装的节点
  4. * @param pred 当前线程node节点的前置节点
  5. * @param node 当前线程node节点
  6. * @return
  7. * true 当前线程node节点的前置节点为signal状态
  8. * false
  9. * 1)当前线程node节点的前置节点为CANCEL,循环让所有的CANCEL节点出队,下次进来该方法可能前置节点为0 PROPAGATE或者signal状态
  10. * 2)当前线程node节点的前置节点为0 或者 PROPAGATE状态,直接改成SIGNAL状态,下次进来就是signal状态了
  11. */
  12. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  13. /**
  14. * 获取前置节点的状态
  15. * 0 默认状态
  16. * -1 SIGNAL 当前节点释放锁后会唤醒后继第一个节点
  17. * 1 CANCEL 当前节点处于取消状态 线程因为中断或者等待超时从等待队列取消等待
  18. */
  19. int ws = pred.waitStatus;
  20. /**
  21. * 表示前置节点是一个可以唤醒当前节点的节点,ws=-1是原因是parkAndCheckInterrupt方法挂起当前线程了
  22. */
  23. if (ws == Node.SIGNAL){
  24. //普通情况下 第一次来到shouldParkAfterFailedAcquire,ws不会是-1
  25. return true;
  26. }
  27. if (ws > 0) {
  28. //表示前置节点是CANCEL状态 表示线程因为中断或者等待超时从等待队列取消等待,已取消的节点不会再阻塞
  29. /*
  30. * Predecessor was cancelled. Skip over predecessors and
  31. * indicate retry.
  32. */
  33. //从当前线程节点往前找,找前置节点waitStatus <= 0的情况 其实就是让CANCEL节点出队,head节点的waitStatus不能为cancelled
  34. do {
  35. node.prev = pred = pred.prev;
  36. } while (pred.waitStatus > 0);
  37. pred.next = node;
  38. } else {
  39. /*
  40. * waitStatus must be 0 or PROPAGATE. Indicate that we
  41. * need a signal, but don't park yet. Caller will need to
  42. * retry to make sure it cannot acquire before parking.
  43. */
  44. /**
  45. * waitStatus为0 或者 PROPAGATE
  46. * 直接将当前node前置节点设置为signal状态 表示前置节点释放锁后需要唤醒当前线程封装的节点
  47. */
  48. pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
  49. }
  50. return false;
  51. }

4.5、AQS#parkAndCheckInterrupt挂起当前线程唤醒后返回中断标志

如果当前线程符合挂起条件,即当前节点的前置节点是否为SIGNAL状态,那么就将当前线程挂起,执行完LockSupport.park(this) 后当前线程阻塞住,只有外部线程调用unPark(一般为前置节点释放锁了)或者中断当前线程才会被唤醒,最后返回中断标志,主要是为了拿锁成功后,再次设置中断标记,保证业务代码的中断响应执行。

  1. /**
  2. * 将当前线程挂起 且唤醒后返回当前线程的中断标记
  3. * 唤醒的可能情况
  4. * 1、正常唤醒 外部线程调用unpark
  5. * 2、其它线程中断当前线程
  6. * @return
  7. */
  8. private final boolean parkAndCheckInterrupt() {
  9. LockSupport.park(this);
  10. return Thread.interrupted();
  11. }

4.6、AQS.unparkSuccessor唤醒后继节点

唤醒最靠近头节点的状态不为CANCEL的节点,即队列中下一个等锁的线程。

但是要特别注意一种新节点入队未完全完成的队列情况
因为入队时是先将待入队节点指向尾结点,然后通过CAS将当前节点设置为尾结点,最后将旧尾结点的后继指向新尾结点,所以可能出现
a<=> b <- c的情况,所以要从尾结点向头节点遍历,找到最靠近头节点的状态不为CANCEL的节点

  1. /**
  2. * 唤醒当前节点的后继节点
  3. * @param node 当前节点
  4. */
  5. private void unparkSuccessor(Node node) {
  6. int ws = node.waitStatus;
  7. if (ws < 0){
  8. //把状态改为0 因为当期线程已经在执行唤醒后继节点的工作了
  9. node.compareAndSetWaitStatus(ws, 0);
  10. }
  11. Node s = node.next;
  12. /**
  13. * 条件1:s == null
  14. * 返回true
  15. * (1)当前节点为尾结点
  16. * (2)新节点入队未完全完成 当前队列状态为 a<=>b<-c的状态 ,因为入队时是先将待入队节点指向尾结点,
  17. * 然后通过CAS将当前节点设置为尾结点,最后将旧尾结点的后继指向新尾结点
  18. *
  19. * 条件2:s.waitStatus > 0
  20. * 返回true 后继节点为CANCEL状态
  21. */
  22. if (s == null || s.waitStatus > 0) {
  23. s = null;
  24. //因为存在a<=>b<-c队列的可能,所以从后往前找能找到所有的节点,寻找最靠近头节点的状态不为CANCEL的节点
  25. for (Node p = tail; p != node && p != null; p = p.prev)
  26. if (p.waitStatus <= 0)
  27. s = p;
  28. }
  29. //唤醒最靠近头节点的状态不为CANCEL的节点 即队列中下一个等锁的线程
  30. if (s != null)
  31. LockSupport.unpark(s.thread);
  32. }

4.7、AQS#doAcquireInterruptibly可中断加锁

大致逻辑和加锁一致,但是被唤醒后如果中断标记为true,直接抛出中断异常,外层捕获异常,执行取消逻辑,然后将异常继续往上层抛。

  1. /**
  2. * 可中断的加锁的核心逻辑
  3. * (1)若当前线程节点的前置节点为signal状态,直接挂起当前线程,否则将当前线程节点的前置节点调整为signal状态(循环多次调整,删除当前线程节点前的CANCEL状态节点,
  4. * 设置当前线程节点的前置节点为signal状态),然后挂起当前线程
  5. * (2)被唤醒后,判断中断标记
  6. * 如果是中断唤醒的,抛出中断异常,捕获后执行取消获取锁逻辑,并向上抛出异常;
  7. * 如果不是被中断唤醒的,尝试抢锁,如果抢锁失败, 继续挂起等待唤醒,直到抢锁成功,抢锁成功后要协助旧的头节点出队,最后返回中断标记
  8. *
  9. * @param arg
  10. * @throws InterruptedException
  11. */
  12. private void doAcquireInterruptibly(int arg)
  13. throws InterruptedException {
  14. final Node node = addWaiter(Node.EXCLUSIVE);
  15. try {
  16. for (;;) {
  17. //获取当前节点的上一个节点
  18. final Node p = node.predecessor();
  19. /**
  20. * 条件1: p == head
  21. * 返回true 当前节点的上一个节点为头节点 如果当前节点的前置节点是head节点,说明当前节点为head.next节点,拥有抢锁资格
  22. * 返回false 当前节点的上一个节点不是头节点
  23. *条件2: tryAcquire(arg)
  24. * 前置条件:当前节点的上一个节点为头节点
  25. * 返回true 抢锁成功
  26. * 返回false 抢锁失败
  27. */
  28. if (p == head && tryAcquire(arg)) {
  29. setHead(node);
  30. p.next = null; // help GC
  31. return;
  32. }
  33. /**
  34. * 运行到这里的可能情况:
  35. * 1、当前节点的前驱节点不是头节点
  36. * 2、当前节点的前驱节点是头节点,但是尝试抢锁失败(可能头节点线程还没有释放锁)
  37. */
  38. /**
  39. * 条件1:shouldParkAfterFailedAcquire 确保当前线程节点的前置节点为signal状态,才能被挂起
  40. * 返回true 当前线程节点的前置节点为signal状态,当前线程可以被挂起
  41. * 返回false 当前线程节点的前置节点不为signal状态,当前线程不能被挂起,此时继续走for循环,下一次会尝试将当前线程节点的
  42. * 前置节点设置signal状态,直到该方法返回true
  43. * 条件2:parkAndCheckInterrupt() 挂起当前线程被唤醒后返回中断状态
  44. * 返回true 线程被中断
  45. * 返回false 线程没有被中断
  46. *
  47. */
  48. if (shouldParkAfterFailedAcquire(p, node) &&
  49. parkAndCheckInterrupt())
  50. throw new InterruptedException();
  51. }
  52. } catch (Throwable t) {
  53. //拦截 中断异常
  54. cancelAcquire(node);
  55. throw t;
  56. }
  57. }

4.8、AQS#cancelAcquire取消锁请求

在响应中断的加锁逻辑中,如果被中断唤醒,需要执行取消逻辑,根据取消节点在队列的位置不同, 执行出队的策略也不同,一共分为三种情况
(1)当前节点为尾结点
直接将尾结点前置节点的next指针断开,将将尾结点前置节点设置为尾结点
(2)当前节点不是head.next节点,也不是尾结点
直接将当前节点的前置节点的next指针指向当前节点的下一个节点
(3)当前节点是head.next节点
直接唤醒当前节点的下一个节点
点击查看【processon】

  1. /**
  2. *取消获取锁 出队逻辑
  3. * @param node 取消获取锁的节点
  4. */
  5. private void cancelAcquire(Node node) {
  6. // Ignore if node doesn't exist
  7. if (node == null)
  8. return;
  9. node.thread = null;
  10. //获取当前节点的前置节点
  11. Node pred = node.prev;
  12. // 跳过被取消的节点 找到当前节点不为CANCEL状态的前置节点
  13. while (pred.waitStatus > 0)
  14. node.prev = pred = pred.prev;
  15. /**
  16. * 拿到当前节点最近一个不为CANCEL状态的前置节点的后继节点
  17. * (1)predNext就为当前节点
  18. * (2)predNext为一个被取消的节点
  19. */
  20. Node predNext = pred.next;
  21. //设置当期节点状态为被取消
  22. node.waitStatus = Node.CANCELLED;
  23. /**
  24. * 当前取消的节点所在队列的位置不同,执行出队的策略也不同,一共分为三种情况
  25. * (1)当前节点为尾结点
  26. * (2)当前节点不是head.next节点,也不是尾结点
  27. * (3)当前节点是head.next节点
  28. */
  29. /**
  30. * 条件1:node == tail 当前节点是否为队尾节点
  31. * 条件2: compareAndSetTail(node, pred) 如果当前节点为队尾节点尝试使用尾结点的前置节点通过CAS替换队尾节点
  32. */
  33. if (node == tail && compareAndSetTail(node, pred)) {
  34. //当前节点为尾结点并且使用尾结点的前置节点通过CAS替换队尾节点成功,设置前置节点的next指针为空,完成node出队
  35. pred.compareAndSetNext(predNext, null);
  36. } else {
  37. // If successor needs signal, try to set pred's next-link
  38. // so it will get one. Otherwise wake it up to propagate.
  39. int ws;
  40. /**
  41. * 条件1: pred != head
  42. * true 当前节点不是head.next节点
  43. * false 当前节点是head.next节点
  44. * 条件2:(ws = pred.waitStatus) == Node.SIGNAL ||
  45. * (ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))
  46. * 保证pred的状态是SIGNAL状态
  47. * 条件2.1:(ws = pred.waitStatus) == Node.SIGNAL 前置节点的状态是否为SIGNAL
  48. * 前置条件:当前节点不是head.next节点
  49. * 条件2.2:ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL)
  50. * 前置条件:当前节点不是head.next节点并且前置节点状态不是SIGNAL状态
  51. * 条件2.2.1: ws <= 0
  52. * true 前置节点不是被取消状态
  53. * false 前置节点是被取消状态
  54. * 条件2.2.2: pred.compareAndSetWaitStatus(ws, Node.SIGNAL)
  55. * 前置条件:当前节点不是head.next节点并且前置节点状态不是SIGNAL状态和CANCEL状态
  56. * true CAS设置前置节点为SIGNAL状态成功
  57. * false CAS设置前置节点为SIGNAL状态失败
  58. * 条件3:pred.thread != null
  59. * 前置条件:当前节点不是head.next节点并且前置节点状态是SIGNAL状态
  60. * true 前置节点封装的线程对象不为空
  61. *
  62. *
  63. */
  64. if (pred != head &&
  65. ((ws = pred.waitStatus) == Node.SIGNAL ||
  66. (ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) &&
  67. pred.thread != null) {
  68. //当前节点的后继节点
  69. Node next = node.next;
  70. //当前节点的下一个节点没有被取消
  71. if (next != null && next.waitStatus <= 0){
  72. //将前置节点的next指向当前节点的next
  73. pred.compareAndSetNext(predNext, next);
  74. }
  75. } else {
  76. //当前node是head.next节点,直接唤醒后继节点
  77. unparkSuccessor(node);
  78. }
  79. node.next = node; // help GC
  80. }
  81. }

4.9、AQS#doAcquireNanos可中断和超时的获取锁

  1. /**
  2. * 可中断和超时的获取锁
  3. * @param arg
  4. * @param nanosTimeout 超时时间
  5. * @return
  6. * @throws InterruptedException
  7. */
  8. private boolean doAcquireNanos(int arg, long nanosTimeout)
  9. throws InterruptedException {
  10. if (nanosTimeout <= 0L)
  11. return false;
  12. //超时时间
  13. final long deadline = System.nanoTime() + nanosTimeout;
  14. //将当前线程封装成node然后入队
  15. final Node node = addWaiter(Node.EXCLUSIVE);
  16. try {
  17. for (;;) {
  18. //获取当前节点的前置节点
  19. final Node p = node.predecessor();
  20. //如果前置节点是头节点 则尝试抢锁
  21. if (p == head && tryAcquire(arg)) {
  22. //替换头节点
  23. setHead(node);
  24. //协助头节点出队
  25. p.next = null; // help GC
  26. return true;
  27. }
  28. //计算剩余时间
  29. nanosTimeout = deadline - System.nanoTime();
  30. if (nanosTimeout <= 0L) {
  31. //如果已经超时 直接取消获取锁 执行出队逻辑
  32. cancelAcquire(node);
  33. return false;
  34. }
  35. /**
  36. * shouldParkAfterFailedAcquire(p, node) 判断当前线程符合挂起条件 即当前节点的前置节点是否为SIGNAL状态
  37. *
  38. * nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD剩余时间是否超过1000ns
  39. *
  40. * 只有在当前线程符合挂起条件 并且 剩余时间超过1000ns时 才会挂起当前线程 ,超时时间不足1000ns 循环自旋
  41. */
  42. if (shouldParkAfterFailedAcquire(p, node) &&
  43. nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
  44. LockSupport.parkNanos(this, nanosTimeout);
  45. if (Thread.interrupted())
  46. throw new InterruptedException();
  47. }
  48. } catch (Throwable t) {
  49. //中断则取消锁请求
  50. cancelAcquire(node);
  51. throw t;
  52. }
  53. }