概览

AbstractQueuedSynchronizer 提供原子式管理同步状态、阻塞和唤醒线程功能以及队列模型的简单框架。

使用设计模式——模板方法,子类只需要实现很少几个接口,就可以定一个锁同步类,完成独占锁,共享锁的逻辑。

参考美团点评技术团队编写的:从ReentrantLock的实现看AQS的原理及应用 加上自己的理解来加深对 AQS 的理解。

本文暂时只介绍 AQS 独占加锁与解锁过程实现

ReentrantLock 与 AQS 关联

根据源码可知,ReentrantLock 的非公平锁加锁,只用实现 tryAcquire 获取锁的逻辑,就可以完成非公平的独占锁获取。其中获取锁失败进行等待的逻辑 AQS 已经帮我们实现。
AbstractQueuedSynchronizer 源码学习基于 JDK 1.8 - 图1

1. 独占加锁 acquire 实现

  1. // AQS acquire 方法
  2. public final void acquire(int arg) {
  3. // 首先调用子类 tryAcquire 去获取锁,若获取锁失败,才执行后面的 addWaiter acquireQueued 逻辑
  4. if (!tryAcquire(arg) &&
  5. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  6. selfInterrupt(); // 获取锁成功,当前线程中断
  7. }

主要方法:

  • tryAcuire,子类重写获取锁的逻辑
  • addWaiter,添加节点到队列末尾

    • enq ,入队以及初始化队列
  • acquireQueued,返回 node 在等待过程中是否被中断过

    • shouldParkAfterFailedAcquire,阻塞获取锁失败的线程,删除队列中状态为取消的节点
    • parkAndCheckInterrupt,真正的阻塞办法
    • cancelAcquire,获取锁过程中发生了异常

1.1 tryAcquire

子类复写 tryAcquire 方法,里面实现获取锁逻辑,获取成功返回 true,获取失败返回 false。

参考 ReentranLock 灯同步类的实现,主要逻辑是 setState ,并将当前线程标记为已获取到该锁

1.2 addWaiter

将节点插入到队列尾部,若队列为初始化,先进行初始化

  1. // 将 EXCLUSIVE 类型的 Node 添加到队列尾部
  2. private Node addWaiter(Node mode) {
  3. Node node = new Node(Thread.currentThread(), mode);
  4. // Try the fast path of enq; backup to full enq on failure
  5. Node pred = tail; // 尾节点
  6. if (pred != null) { // 尾节点不为空时,将 node 插入到队列尾部
  7. node.prev = pred; // 队列是双向的,将 node 的前置指针指向 pred
  8. if (compareAndSetTail(pred, node)) { // 这个方法就是将 tailOffset 的值与 pred 进行比较,若相同,将 tailOffset 的值设为 node,即将 tail 指向 node。
  9. pred.next = node; // pred.next 指向 node
  10. return node; // 返回刚刚添加的 node
  11. }
  12. }
  13. // 尾节点为空(队列中没有元素,需要初始化)
  14. // 或者 compareAndSetTail 失败(tailOffset 处的值不是 pred,被其它线程修改,导致 tail 指向的节点和 pred 指向的节点不同)
  15. enq(node); // 死循环方式 插入到队列中
  16. return node; // 返回刚刚添加的 node
  17. }
  18. // 插入节点到队列
  19. private Node enq(final Node node) {
  20. for (;;) {
  21. Node t = tail;
  22. // 尾指针为 null
  23. if (t == null) { // Must initialize
  24. if (compareAndSetHead(new Node())) // 队列初始化
  25. tail = head; // ,head 和 tail 指向同一处,进行下一次循环
  26. } else {
  27. // 队列中有元素
  28. node.prev = t; // node 的前置指针指向 tail
  29. if (compareAndSetTail(t, node)) { // 若 tailOffset 指向的值等于 t,将 tail 处的值改为 node
  30. t.next = node; // 队列是双向的,将后置指针指向 node
  31. return t; // 返回插入队列前 队列中的最后一个节点
  32. }
  33. }
  34. }
  35. }

1.3 acquireQueued

循环从队列中查询 node 节点能否获取锁,直到线程获取成功或者被中断

  1. // node 是刚刚插入队列结尾的节点,死循环去判断 node 是否获取到锁,返回是否被中断
  2. final boolean acquireQueued(final Node node, int arg) {
  3. // 获取失败标志位
  4. boolean failed = true;
  5. try {
  6. // 是否被中断
  7. boolean interrupted = false;
  8. // 死循环,直到获取锁成功
  9. for (;;) {
  10. // 刚刚插入节点 node 的前置节点
  11. final Node p = node.predecessor();
  12. if (p == head && tryAcquire(arg)) { // 若前置节点是头节点(头节点里面啥都没有),并且当前 Node 获取锁成功
  13. setHead(node); // head 指针后移指向 node,并将 node 属性置为 null
  14. p.next = null; // help GC
  15. failed = false;
  16. return interrupted;
  17. }
  18. // 上面条件不满足:p 不是头节点 或者 node 没有获取到锁
  19. // shouldParkAfterFailedAcquire 若锁获取失败,返回线程是否需要被阻塞
  20. // parkAndCheckInterrupt 阻塞当前线程(for 循环卡住),并返回当前线程是否被中断
  21. // 这个方法块的意思:循环判断 node 是否需要被阻塞,并进行阻塞直到被唤醒
  22. if (shouldParkAfterFailedAcquire(p, node) &&
  23. parkAndCheckInterrupt())
  24. // 被中断过
  25. interrupted = true;
  26. }
  27. } finally {
  28. // 出现不正常情况,取消获取锁
  29. if (failed)
  30. cancelAcquire(node); // 后面讲
  31. }
  32. }

1.3.1 shouldParkAfterFailedAcquire

  1. // waitStatus 状态列举:
  2. static final int CANCELLED = 1; // 取消状态
  3. static final int SIGNAL = -1; // 后继线程需要解锁
  4. static final int CONDITION = -2; // 等待某个条件的状态
  5. static final int PROPAGATE = -3; // 获取共享锁传播状态
  6. // 根据前驱节点 pred 判断 node 是否需要被阻塞
  7. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  8. //
  9. int ws = pred.waitStatus;
  10. // 前驱节点处于 唤醒状态,
  11. if (ws == Node.SIGNAL)
  12. return true; // 那么返回需要被阻塞,被唤醒下面说
  13. // 若前驱节点处于取消状态,移除队列中 node 前面处于取消状态的节点
  14. if (ws > 0) {
  15. do {
  16. // 往前遍历,直到找到队列中,节点不是 取消状态 的节点,将 node 的前置指向指向这个节点。
  17. node.prev = pred = pred.prev;
  18. } while (pred.waitStatus > 0);
  19. // 取消状态的节点,就从队列中移除了
  20. pred.next = node;
  21. } else {
  22. // 将前置节点的 waitStatus 设为 SIGNAL
  23. compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
  24. }
  25. // 返回不需要被阻塞
  26. return false;
  27. }

1.3.2 parkAndCheckInterrupt

  1. // 挂起当前线程,并返回当前线程的中断状态
  2. private final boolean parkAndCheckInterrupt() {
  3. LockSupport.park(this);
  4. return Thread.interrupted();
  5. }

1.3.3 cancelAcquire

  1. private void cancelAcquire(Node node) {
  2. // Ignore if node doesn't exist
  3. if (node == null)
  4. return;
  5. node.thread = null;
  6. Node pred = node.prev;
  7. while (pred.waitStatus > 0) // 往前遍历,跳过 waitStatus = CANCELLED 的节点
  8. node.prev = pred = pred.prev;
  9. Node predNext = pred.next; // 删除中间 waitStatus = CANCELLED 的节点,pred.next 指向 node
  10. // 将 node 状态改为 CANCELLED
  11. node.waitStatus = Node.CANCELLED;
  12. // 如果 node 是尾节点,直接删除末尾的 node
  13. if (node == tail && compareAndSetTail(node, pred)) {
  14. // pred 指向 predNext,predNext 为空节点
  15. compareAndSetNext(pred, predNext, null);
  16. } else {
  17. int ws;
  18. // 当 pred 不是头节点,并且 pred.waitStatus == SIGNAL 时
  19. // 或者 pred 不是头节点,并且 pred.waitStatus 不是 CANCELLED 状态,将 pred.waitStatus 改为 SIGNAL
  20. // 以上两种情况出现时,将 pred.next 指向 node.next。即删除 pred 指向 node 的指针,直接指向 node.next
  21. if (pred != head &&
  22. ((ws = pred.waitStatus) == Node.SIGNAL ||
  23. (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
  24. pred.thread != null) {
  25. Node next = node.next;
  26. if (next != null && next.waitStatus <= 0)
  27. compareAndSetNext(pred, predNext, next); //
  28. } else {
  29. // 上面条件不满足,即:
  30. // pred 是头节点
  31. // pred 状态为 CANCELLED
  32. // pred.thread == null
  33. // 以上三种情况出现时,唤醒 node 后面阻塞的线程
  34. unparkSuccessor(node);
  35. }
  36. node.next = node; // help GC
  37. }
  38. }
  39. // 唤醒 node 后面的线程
  40. private void unparkSuccessor(Node node) {
  41. int ws = node.waitStatus;
  42. if (ws < 0) // 不是 CANCELLED 状态时,把 node.waitStatus 设为 0
  43. compareAndSetWaitStatus(node, ws, 0);
  44. Node s = node.next;
  45. // 若 node 的后继节点为空,或者后继节点时 CANCELLED
  46. if (s == null || s.waitStatus > 0) {
  47. s = null;
  48. // 从尾节点开始往前遍历,找到队列中从左往右第一个不是 CANCELLED 状态的节点
  49. for (Node t = tail; t != null && t != node; t = t.prev)
  50. if (t.waitStatus <= 0)
  51. s = t;
  52. }
  53. if (s != null)
  54. // 唤醒该节点
  55. LockSupport.unpark(s.thread);
  56. }
  • 从尾节点开始往前遍历,找到第一个非 CANCELLED 节点的原因

addWaiter 方法节点入队 node.prev = pred; compareAndSetTail(pred, node) 不是原子操作,所以 pre 指针是完整的

其次,删除 CANCELLED 节点时,先删除的时 NEXT,Prev 没有改,所以可以完整遍历整个队列

1.4 acquire 方法总结

acquire 方法的总体思路:

  1. tryAcquire 子类实现获取锁逻辑,获取成功,方法结束
  2. 获取失败,通过 addWaiter 加入到队列结尾,并初始化未初始化的队列
  3. acquireQueued 循环判断 node 节点是否能获取到锁,里面做了几件事:

    • 当 node 节点前置节点是 head 并且再次尝试获取锁成功,直接返回
    • 判断获取失败的 node 节点是否需要被阻塞,里面会去删除队列中 waitstatus 为 CANCELLED 状态的节点,并返回线程的中断状态,若被阻塞,acquireQueued 中的 for 循环将被卡住,等待被唤醒
    • 获取锁期间发生异常,将队列中的 node 删除
  4. 跳出 acquireQueued 循环的条件是 node 前置节点是 head 并且获取到锁,否则进行阻塞等待
    AbstractQueuedSynchronizer 源码学习基于 JDK 1.8 - 图2

2. 独占解锁实现

释放锁,调用 子类重写的 tryRelease 方法,若解锁成功,将唤醒 head 后面的 node

  1. public final boolean release(int arg) {
  2. if (tryRelease(arg)) {
  3. Node h = head;
  4. // 当头指针已经初始化
  5. // waitStatus == 0,表示后继节点正在运行中,不需要唤醒
  6. // waitStatus < 0,表示节点状态不是 CANCELLED,那么需要被唤醒
  7. if (h != null && h.waitStatus != 0)
  8. unparkSuccessor(h);
  9. return true;
  10. }
  11. return false;
  12. }

2.1 tryRelease

ReentranLock tryRelease 释放锁实现,公平锁和非公平锁都一样

  1. protected final boolean tryRelease(int releases) {
  2. // 减少可重入次数
  3. int c = getState() - releases;
  4. // 当前线程不持有该锁,抛出异常
  5. if (Thread.currentThread() != getExclusiveOwnerThread())
  6. throw new IllegalMonitorStateException();
  7. boolean free = false;
  8. // 持有锁全部释放
  9. if (c == 0) {
  10. free = true;
  11. // 将当前锁的所占线程置为 null
  12. setExclusiveOwnerThread(null);
  13. }
  14. // 更新 state
  15. setState(c);
  16. return free;

2.2 unparkSuccessor
unparkSuccessor 在上面 1.3.3 有介绍

3. 可中断加锁实现

逻辑基本与 acquire 一致,唯一不同的地方就是阻塞等待获取锁过程中,如果被中断,直接抛出异常。

  1. public final void acquireInterruptibly(int arg)
  2. throws InterruptedException {
  3. // 如果当前线程被中断,直接抛出
  4. if (Thread.interrupted())
  5. throw new InterruptedException();
  6. // 子类获取锁失败
  7. if (!tryAcquire(arg))
  8. // 执行具体的可中断的加锁逻辑
  9. doAcquireInterruptibly(arg);
  10. }
  11. // 这个方法和 acquireQueued 基本一样,唯一不同的就是等待过程中,线程被中断直接抛出(parkAndCheckInterrupt 返回 true)
  12. private void doAcquireInterruptibly(int arg)
  13. throws InterruptedException {
  14. // 添加节点到队列尾部
  15. final Node node = addWaiter(Node.EXCLUSIVE);
  16. boolean failed = true;
  17. try {
  18. for (;;) {
  19. final Node p = node.predecessor();
  20. if (p == head && tryAcquire(arg)) {
  21. setHead(node);
  22. p.next = null; // help GC
  23. failed = false;
  24. return;
  25. }
  26. if (shouldParkAfterFailedAcquire(p, node) &&
  27. parkAndCheckInterrupt())
  28. // 与 acquireQueued 不同点
  29. throw new InterruptedException();
  30. }
  31. } finally {
  32. if (failed)
  33. cancelAcquire(node);
  34. }
  35. }

4. 超时等待式获取锁

  1. public final boolean tryAcquireNanos(int arg, long nanosTimeout)
  2. throws InterruptedException {
  3. if (Thread.interrupted())
  4. throw new InterruptedException();
  5. return tryAcquire(arg) ||
  6. doAcquireNanos(arg, nanosTimeout); // 子类获取锁成功 或者 doAcquireNanos 获取成功
  7. }
  8. // 一个时间阈值,若时间小于该值,不阻塞当前线程而是继续获取,提高响应速度
  9. static final long spinForTimeoutThreshold = 1000L;
  10. private boolean doAcquireNanos(int arg, long nanosTimeout)
  11. throws InterruptedException {
  12. if (nanosTimeout <= 0L)
  13. return false;
  14. // 超时时间点
  15. final long deadline = System.nanoTime() + nanosTimeout;
  16. // 添加到队列尾部
  17. final Node node = addWaiter(Node.EXCLUSIVE);
  18. boolean failed = true;
  19. try {
  20. for (;;) {
  21. final Node p = node.predecessor();
  22. // 获取锁成功
  23. if (p == head && tryAcquire(arg)) {
  24. setHead(node);
  25. p.next = null; // help GC
  26. failed = false;
  27. return true;
  28. }
  29. // 计算与截止时间的差距
  30. nanosTimeout = deadline - System.nanoTime();
  31. // 已经超时,直接返回 false
  32. if (nanosTimeout <= 0L)
  33. return false;
  34. // 阻塞获取锁并且差距要大于 1000,
  35. if (shouldParkAfterFailedAcquire(p, node) &&
  36. nanosTimeout > spinForTimeoutThreshold)
  37. // 指定阻塞指定时间
  38. LockSupport.parkNanos(this, nanosTimeout);
  39. if (Thread.interrupted())
  40. throw new InterruptedException();
  41. }
  42. } finally {
  43. if (failed)
  44. cancelAcquire(node);
  45. }
  46. }