6.1 锁与队列的关系

  1. CLH锁的内部队列

image.png

  1. 分布式锁的内部队列

image.png

  1. AQS的内部队列

image.png

6.2 AQS的核心成员

6.2.1 状态标志位

  1. //同步状态,使用 volatile保证线程可见
  2. private volatile int state;

state因为使用volatile保证了操作的可见性,所以任何线程通过getState()获得状态都可以得到最新值。AQS提供了getState()、setState()来获取和设置同步状态,具体如下:

  1. // 获取同步的状态
  2. protected final int getState() {
  3. return state;
  4. }
  5. // 设置同步的状态
  6. protected final void setState(int newState) {
  7. state = newState;
  8. }
  9. // 通过CAS设置同步的状态
  10. protected final boolean compareAndSetState(int expect, int update) {
  11. return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
  12. }
  1. AbstractQueuedSynchronizer继承了AbstractOwnableSynchronizer,这个基类只有一个变量叫exclusiveOwnerThread,表示当前占用该锁的线程,并且提供了相应的get()和set()方法,具体如下:
  1. public abstract class AbstractOwnableSynchronizer
  2. implements java.io.Serializable {
  3. //表示当前占用该锁的线程
  4. private transient Thread exclusiveOwnerThread;
  5. // 省略get/set方法
  6. }

6.2.2 队列节点类

AQS是一个虚拟队列,不存在队列实例,仅存在节点之间的前后关系。节点类型通过内部类Node定义,其核心的成员如下:

  1. static final class Node {
  2. /**节点等待状态值1:取消状态*/
  3. static final int CANCELLED = 1;
  4. /**节点等待状态值-1:标识后继线程处于等待状态*/
  5. static final int SIGNAL = -1;
  6. /**节点等待状态值-2:标识当前线程正在进行条件等待*/
  7. static final int CONDITION = -2;
  8. /**节点等待状态值-3:标识下一次共享锁的acquireShared操作需要无条件传播*/
  9. static final int PROPAGATE = -3;
  10. //节点状态:值为SIGNAL、CANCELLED、CONDITION、PROPAGATE、0
  11. //普通的同步节点的初始值为0,条件等待节点的初始值为CONDITION(-2)
  12. volatile int waitStatus;
  13. //节点所对应的线程,为抢锁线程或者条件等待线程
  14. volatile Thread thread;
  15. //前驱节点,当前节点会在前驱节点上自旋,循环检查前驱节点的waitStatus状态
  16. volatile Node prev;
  17. //后继节点
  18. volatile Node next;
  19. //若当前Node不是普通节点而是条件等待节点,则节点处于某个条件的等待队列上
  20. //此属性指向下一个条件等待节点,即其条件队列上的后继节点
  21. Node nextWaiter;
  22. ...
  23. }

6.2.3 FIFO双向同步队列

AQS的内部队列是CLH队列的变种,每当线程通过AQS获取锁失败时,线程将被封装成一个Node节点,通过CAS原子操作插入队列尾部。当有线程释放锁时,AQS会尝试让队头的后继节点占用锁。

  1. /*首节点的引用*/
  2. private transient volatile Node head;
  3. /*尾节点的引用*/
  4. private transient volatile Node tail;

image.png

6.2.4 JUC显式锁与AQS的关系

6.2.5 ReentrantLock与AQS的组合关系

  1. ReentrantLock与AQS的组合关系

ReentrantLock把所有Lock接口的操作都委派到一个Sync类上,该类继承了AbstractQueuedSynchronizer:

  1. static abstract class Sync extends AbstractQueuedSynchronizer {...}

ReentrantLock为了支持公平锁和非公平锁两种模式,为Sync又定义了两个子类,具体如下:

  1. final static class NonfairSync extends Sync {...}
  2. final static class FairSync extends Sync {...}
  1. ReentrantLock提供了两个构造器,具体如下:
  1. public ReentrantLock() { //默认的构造器
  2. sync = new NonfairSync(); //内部使用非公平同步器
  3. }
  4. public ReentrantLock(boolean fair) { //true 为公平锁,否则为非公平锁
  5. sync = fair ? new FairSync() : new NonfairSync();
  6. }
  1. ReentrantLocklock()和unlock()的源码可以看到,它们只是分别调用了sync对象的lock()和release()方法。
  1. public void lock() { //抢占显式锁
  2. sync.lock();
  3. }
  4. public void unlock() { //释放显式锁
  5. sync.release(1);
  6. }
  1. 显式锁与AQS之间的组合关系

    6.3 AQS中的模板模式

    6.3.1 模板模式

    6.3.2 一个模板模式的参考实现

  2. 模板模式的参考实现代码 ```java package com.crazymakercircle.demo.lock; import com.crazymakercircle.util.Print; public class TemplateDemo {

    1. static abstract class AbstractAction
    2. {
    3. /**
    4. * 模板方法:算法骨架
    5. */
    6. public void tempMethod()
    7. {
    8. Print.cfo("模板方法的算法骨架被执行");
    9. beforeAction(); // 执行前的公共操作
    10. action(); // 调用钩子方法
    11. afterAction(); // 执行后的公共操作
    12. }
    13. /**
    14. * 执行前
    15. */
    16. protected void beforeAction()
    17. {
    18. Print.cfo("准备执行钩子方法");
    19. }
    20. /**
    21. * 钩子方法:这里定义为一个抽象方法
    22. */
    23. public abstract void action();
    24. /**
    25. * 执行后
    26. */
    27. private void afterAction()
    28. {
    29. Print.cfo("钩子方法执行完成");
    30. }
    31. }
    32. //子类A:提供了钩子方法实现
    33. static class ActionA extends AbstractAction
    34. {
    35. /**
    36. * 钩子方法的实现
    37. */
    38. @Override
    39. public void action()
    40. {
    41. Print.cfo("钩子方法的实现 ActionA.action() 被执行");
    42. }
    43. }
    44. //子类B:提供了钩子方法实现
    45. static class ActionB extends AbstractAction
    46. {
    47. /**
    48. * 钩子方法的实现
    49. */
    50. @Override
    51. public void action()
    52. {
    53. Print.cfo("钩子方法的实现 ActionB.action() 被执行");
    54. }
    55. }
    56. public static void main(String[] args)
    57. {
    58. AbstractAction action = null;
    59. //创建一个 ActionA 实例
    60. action= new ActionA();
    61. //执行基类的模板方法
    62. action.tempMethod();
    63. //创建一个 ActionB 实例
    64. action= new ActionB();
    65. //执行基类的模板方法
    66. action.tempMethod();
    67. }

    }

  1. <a name="jHzxv"></a>
  2. ## 6.3.3 AQS的模板流程
  3. - Exclusive(独享锁):只有一个线程能占有锁资源,如ReentrantLock。独享锁又可分为公平锁和非公平锁。
  4. - Share(共享锁):多个线程可同时占有锁资源,如Semaphore、CountDownLatch、CyclicBarrier、ReadWriteLock的Read锁。
  5. <a name="MhyL3"></a>
  6. ## 6.3.4 AQS中的钩子方法
  7. 自定义同步器时,AQS中需要重写的钩子方法大致如下:
  8. - tryAcquire(int):独占锁钩子,尝试获取资源,若成功则返回true,若失败则返回false
  9. - tryRelease(int):独占锁钩子,尝试释放资源,若成功则返回true,若失败则返回false
  10. - tryAcquireShared(int):共享锁钩子,尝试获取资源,负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
  11. - tryReleaseShared(int):共享锁钩子,尝试释放资源,若成功则返回true,若失败则返回false。
  12. - isHeldExclusively():独占锁钩子,判断该线程是否正在独占资源。只有用到condition条件队列时才需要去实现它。
  13. 1. tryAcquire独占式获取锁
  14. ```java
  15. protected boolean tryAcquire(int arg) {
  16. throw new UnsupportedOperationException();
  17. }
  1. tryRelease独占式释放锁 ```java protected boolean tryRelease(long arg) {
    1. throw new UnsupportedOperationException();
    2. }
  1. 3. tryAcquireShared共享式获取
  2. ```java
  3. protected long tryAcquireShared(long arg) {
  4. throw new UnsupportedOperationException();
  5. }
  1. tryReleaseShared共享式释放 ```java protected boolean tryReleaseShared(long arg) {
    1. throw new UnsupportedOperationException();
    2. }
  1. 5. 查询是否处于独占模式
  2. ```java
  3. protected boolean isHeldExclusively() {
  4. throw new UnsupportedOperationException();
  5. }

6.4 通过AQS实现一把简单的独占锁

6.4.1 简单的独占锁的UML类图

image.png

6.4.2 简单的独占锁的实现

  1. package com.crazymakercircle.demo.lock.custom;
  2. // 省略import
  3. public class SimpleMockLock implements Lock
  4. {
  5. //同步器实例
  6. private final Sync sync = new Sync();
  7. // 自定义的内部类:同步器
  8. // 直接使用 AbstractQueuedSynchronizer.state 值表示锁的状态
  9. // AbstractQueuedSynchronizer.state=1 表示锁没有被占用
  10. // AbstractQueuedSynchronizer.state=0 表示锁没已经被占用
  11. private static class Sync extends AbstractQueuedSynchronizer
  12. {
  13. //钩子方法
  14. protected boolean tryAcquire(int arg)
  15. {
  16. //CAS更新状态值为1
  17. if (compareAndSetState(0, 1))
  18. {
  19. setExclusiveOwnerThread(Thread.currentThread());
  20. return true;
  21. }
  22. return false;
  23. }
  24. //钩子方法
  25. protected boolean tryRelease(int arg)
  26. {
  27. //如果当前线程不是占用锁的线程
  28. if (Thread.currentThread() != getExclusiveOwnerThread())
  29. {
  30. //抛出非法状态的异常
  31. throw new IllegalMonitorStateException();
  32. }
  33. //如果锁的状态为没有占用
  34. if (getState() == 0)
  35. {
  36. //抛出非法状态的异常
  37. throw new IllegalMonitorStateException();
  38. }
  39. //接下来不需要使用CAS操作,因为下面的操作不存在并发场景
  40. setExclusiveOwnerThread(null);
  41. //设置状态
  42. setState(0);
  43. return true;
  44. }
  45. }
  46. //显式锁的抢占方法
  47. @Override
  48. public void lock()
  49. {
  50. //委托给同步器的acquire()抢占方法
  51. sync.acquire(1);
  52. }
  53. //显式锁的释放方法
  54. @Override
  55. public void unlock()
  56. {
  57. //委托给同步器的release()释放方法
  58. sync.release(1);
  59. }
  60. // 省略其他未实现的方法
  61. }
  62. }

6.4.3 SimpleMockLock测试用例

  1. package com.crazymakercircle.demo.lock;
  2. // 省略import
  3. public class LockTest
  4. {
  5. @org.junit.Test
  6. public void testMockLock()
  7. {
  8. // 每个线程的执行轮数
  9. final int TURNS = 1000;
  10. // 线程数
  11. final int THREADS = 10;
  12. //线程池,用于多线程模拟测试
  13. ExecutorService pool = Executors.newFixedThreadPool(THREADS);
  14. //自定义的独占锁
  15. Lock lock = new SimpleMockLock();
  16. // 倒数闩
  17. CountDownLatch countDownLatch = new CountDownLatch(THREADS);
  18. long start = System.currentTimeMillis();
  19. //10个线程并发执行
  20. for (int i = 0; i < THREADS; i++)
  21. {
  22. pool.submit(() ->
  23. {
  24. try
  25. {
  26. //累加 1000 次
  27. for (int j = 0; j < TURNS; j++)
  28. {
  29. //传入锁,执行一次累加
  30. IncrementData.lockAndFastIncrease(lock);
  31. }
  32. Print.tco("本线程累加完成");
  33. } catch (Exception e)
  34. {
  35. e.printStackTrace();
  36. }
  37. //线程执行完成,倒数闩减少一次
  38. countDownLatch.countDown();
  39. });
  40. }
  41. // 省略等待并发执行完成、结果输出的代码
  42. }
  43. }

6.5 AQS锁抢占的原理

6.5.1 显式锁抢占的总体流程

image.png

6.5.2 AQS模板方法:acquire(arg)

  1. public final void acquire(int arg) {
  2. if (!tryAcquire(arg) &&
  3. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  4. selfInterrupt();
  5. }

6.5.3 钩子实现:tryAcquire(arg)

  1. private static class Sync extends AbstractQueuedSynchronizer
  2. {
  3. //钩子方法
  4. protected boolean tryAcquire(int arg)
  5. {
  6. //CAS更新状态值为1
  7. if (compareAndSetState(0, 1))
  8. {
  9. setExclusiveOwnerThread(Thread.currentThread());
  10. return true;
  11. }
  12. return false;
  13. }

6.5.4 直接入队:addWaiter

  1. private Node addWaiter(Node mode) {
  2. //创建新节点
  3. Node node = new Node(Thread.currentThread(), mode);
  4. // 加入队列尾部,将目前的队列tail作为自己的前驱节点pred
  5. Node pred = tail;
  6. // 队列不为空的时候
  7. if (pred != null) {
  8. node.prev = pred;
  9. // 先尝试通过AQS方式修改尾节点为最新的节点
  10. // 如果修改成功,将节点加入队列的尾部
  11. if (compareAndSetTail(pred, node)) {
  12. pred.next = node;
  13. return node;
  14. }
  15. }
  16. //第一次尝试添加尾部失败,意味着有并发抢锁发生,需要进行自旋
  17. enq(node);
  18. return node;
  19. }
  1. static final class Node {
  2. /** 常量标识:标识当前的队列节点类型为共享型抢占 */
  3. static final Node SHARED = new Node();
  4. /** 常量标识:标识当前的队列节点类型为独占型抢占 */
  5. static final Node EXCLUSIVE = null;
  6. // 省略其他代码
  7. }

6.5.5 自旋入队:enq

  1. /**
  2. * 这里进行了循环,如果此时存在tail,就执行添加新队尾的操作
  3. * 如果依然不存在,就把当前线程作为head节点
  4. * 插入节点后,调用acquireQueued()进行阻塞
  5. */
  6. private Node enq(final Node node) {
  7. for (;;) {
  8. Node t = tail;
  9. if (t == null) {
  10. //队列为空,初始化尾节点和头节点为新节点
  11. if (compareAndSetHead(new Node()))
  12. tail = head;
  13. } else {
  14. // 队列不为空,将新节点插入队列尾部
  15. node.prev = t;
  16. if (compareAndSetTail(t, node)) {
  17. t.next = node;
  18. return t;
  19. }
  20. }
  21. }
  22. }
  23. /**
  24. * CAS 操作head指针,仅仅被enq()调用
  25. */
  26. private final boolean compareAndSetHead(Node update) {
  27. return unsafe.compareAndSwapObject(this, headOffset, null, update);
  28. }
  29. /**
  30. * CAS 操作tail指针,仅仅被enq()调用
  31. */
  32. private final boolean compareAndSetTail(Node expect, Node update) {
  33. return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
  34. }

6.5.6 自旋抢占:acquireQueued()

  1. final boolean acquireQueued(final Node node, int arg) {
  2. boolean failed = true;
  3. try {
  4. boolean interrupted = false;
  5. // 自旋检查当前节点的前驱节点是否为头节点,才能获取锁
  6. for (;;) {
  7. // 获取节点的前驱节点
  8. final Node p = node.predecessor();
  9. // 节点中的线程循环地检查自己的前驱节点是否为 head 节点
  10. // 前驱节点是head时,进一步调用子类的tryAcquire(…)实现
  11. if (p == head && tryAcquire(arg)) {
  12. // tryAcquire()成功后,将当前节点设置为头节点,移除之前的头节点
  13. setHead(node);
  14. p.next = null; // help GC
  15. failed = false;
  16. return interrupted;
  17. }
  18. // 检查前一个节点的状态,预判当前获取锁失败的线程是否要挂起
  19. // 如果需要挂起
  20. // 调用parkAndCheckInterrupt()方法挂起当前线程,直到被唤醒
  21. if (shouldParkAfterFailedAcquire(p, node) &&
  22. parkAndCheckInterrupt())
  23. interrupted = true; // 若两个操作都是true,则置true
  24. }
  25. } finally {
  26. //如果等待过程中没有成功获取资源(如timeout,或者可中断的情况下被中断了)
  27. //那么取消节点在队列中的等待
  28. if (failed)
  29. //取消请求,将当前节点从队列中移除
  30. cancelAcquire(node);
  31. }
  32. }

6.5.7 挂起预判:shouldParkAfterFailedAcquire()

  1. private static boolean shouldParkAfterFailedAcquire(
  2. Node pred, Node node) {
  3. int ws = pred.waitStatus; // 获得前驱节点的状态
  4. if (ws == Node.SIGNAL) //如果前驱节点状态为SIGNAL(值为-1)就直接返回
  5. return true;
  6. if (ws > 0) { // 前驱节点以及取消CANCELLED(1)
  7. do {
  8. // 不断地循环,找到有效前驱节点,即非CANCELLED(值为1)类型节点
  9. // 将pred记录前驱的前驱
  10. pred = pred.prev;
  11. // 调整当前节点的prev指针,保持为前驱的前驱
  12. node.prev = pred;
  13. } while (pred.waitStatus > 0);
  14. // 调整前驱节点的next指针
  15. pred.next = node;
  16. } else {
  17. // 如果前驱状态不是CANCELLED,也不是SIGNAL,就设置为SIGNAL
  18. compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
  19. // 设置前驱状态之后,此方法返回值还是为false,表示线程不可用,被阻塞
  20. }
  21. return false;
  22. }

6.5.8 线程挂起:parkAndCheckInterrupt()

  1. private final boolean parkAndCheckInterrupt() {
  2. LockSupport.park(this); // 调用park()使线程进入waiting状态
  3. return Thread.interrupted(); // 如果被唤醒,查看自己是否已经被中断
  4. }

6.6 AQS的两个关键点:节点的入队和出队

6.6.1 节点的自旋入队

  1. private Node enq(final Node node) {
  2. for (;;) { //自旋入队
  3. Node t = tail;
  4. if (t == null) {
  5. //队列为空,初始化尾节点和头节点为新节点
  6. if (compareAndSetHead(new Node()))
  7. tail = head;
  8. } else {
  9. //如果队列不为空,将新节点插入队列尾部
  10. node.prev = t;
  11. if (compareAndSetTail(t, node)) {
  12. t.next = node;
  13. return t;
  14. }
  15. }
  16. }
  17. }

6.6.2 节点的出队

  1. final boolean acquireQueued(final Node node, int arg) {
  2. boolean failed = true;
  3. try {
  4. boolean interrupted = false;
  5. // 在前驱节点上自旋
  6. for (;;) {
  7. // 获取节点的前驱节点
  8. final Node p = node.predecessor();
  9. // (1)前驱节点是头节点
  10. // (2)通过子类的tryAcquire()钩子实现抢占成功
  11. if (p == head && tryAcquire(arg)) {
  12. // 将当前节点设置为头节点,之前的头节点出队
  13. setHead(node);
  14. p.next = null; // help GC
  15. failed = false;
  16. return interrupted;
  17. }
  18. // 省略park(无限期阻塞)线程的代码
  19. }
  20. } finally {
  21. // 省略其他
  22. }
  23. }

AQS释放锁时是如何唤醒后继线程的呢?AQS释放锁的核心代码如下:

  1. public final boolean release(long arg) {
  2. if (tryRelease(arg)) { //释放锁的钩子方法的实现
  3. Node h = head; //队列头节点
  4. if (h != null && h.waitStatus != 0)
  5. unparkSuccessor(h); //唤醒后继线程
  6. return true;
  7. }
  8. return false;
  9. }

unparkSuccessor的核心代码如下:

  1. private void unparkSuccessor(Node node) {
  2. // 省略不相关代码
  3. Node s = node.next; //后继节点
  4. // 省略不相关代码
  5. if (s != null)
  6. LockSupport.unpark(s.thread); //唤醒后继节点的线程
  7. }

6.7 AQS锁释放的原理

6.7.1 SimpleMockLock独占锁的释放流程

image.png

6.7.2 AQS模板方法:release()

  1. public final boolean release(long arg) {
  2. if (tryRelease(arg)) {
  3. Node h = head;
  4. if (h != null && h.waitStatus != 0)
  5. unparkSuccessor(h);
  6. return true;
  7. }
  8. return false;
  9. }

6.7.3 钩子实现:tryRelease()

  1. //钩子方法
  2. protected boolean tryRelease(int arg)
  3. {
  4. //如果当前线程不是占用锁的线程
  5. if (Thread.currentThread() != getExclusiveOwnerThread())
  6. {
  7. //抛出非法状态的异常
  8. throw new IllegalMonitorStateException();
  9. }
  10. //如果锁的状态为没有占用
  11. if (getState() == 0)
  12. {
  13. //抛出非法状态的异常
  14. throw new IllegalMonitorStateException();
  15. }
  16. //接下来不需要使用CAS操作,因为下面的操作不存在并发场景
  17. setExclusiveOwnerThread(null);
  18. //设置状态
  19. setState(0);
  20. return true;
  21. }
  22. }

6.7.4 唤醒后继:unparkSuccessor()

  1. private void unparkSuccessor(Node node) {
  2. int ws = node.waitStatus; // 获得节点状态,释放锁的节点,也就是头节点
  3. //CANCELLED(1)、SIGNAL(-1)、CONDITION (-2)、PROPAGATE(-3)
  4. //若头节点状态小于0,则将其置为0,表示初始状态
  5. if (ws < 0)
  6. compareAndSetWaitStatus(node, ws, 0);
  7. Node s = node.next; // 找到后头的一个节点
  8. if (s == null || s.waitStatus > 0) {
  9. // 如果新节点已经被取消CANCELLED(1)
  10. s = null;
  11. // 从队列尾部开始,往前去找最前面的一个 waitStatus 小于0的节点
  12. for (Node t = tail; t != null && t != node; t = t.prev)
  13. if (t.waitStatus <= 0) s = t;
  14. }
  15. //唤醒后继节点对应的线程
  16. if (s != null)
  17. LockSupport.unpark(s.thread);
  18. }

6.8 ReentrantLock的抢锁流程

6.8.1 ReentrantLock非公平锁的抢占流程

image.png

6.8.2 非公平锁的同步器子类

ReentrantLock为非公平锁实现了一个内部的同步器——NonfairSync,其显式锁获取方法lock()的源码如下:

  1. static final class NonfairSync extends Sync {
  2. //非公平锁抢占
  3. final void lock() {
  4. if (compareAndSetState(0, 1))
  5. setExclusiveOwnerThread(Thread.currentThread());
  6. else
  7. acquire(1);
  8. }
  9. // 省略其他
  10. }

6.8.3 非公平抢占的钩子方法:tryAcquire(arg)

  1. static final class NonfairSync extends Sync {
  2. //非公平锁抢占的钩子方法
  3. protected final boolean tryAcquire(int acquires) {
  4. return nonfairTryAcquire(acquires);
  5. }
  6. // 省略其他
  7. }
  8. abstract static class Sync extends AbstractQueuedSynchronizer {
  9. final boolean nonfairTryAcquire(int acquires) {
  10. final Thread current = Thread.currentThread();
  11. // 先直接获得锁的状态
  12. int c = getState();
  13. if (c == 0) {
  14. // 如果内部队列首节点的线程执行完了,它会将锁的state设置为0
  15. // 当前抢锁线程的下一步就是直接进行抢占,不管不顾
  16. // 发现state是空的,就直接拿来加锁使用,根本不考虑后面继承者的存在
  17. if (compareAndSetState(0, acquires)) {
  18. // 1. 利用CAS自旋方式判断当前state确实为0,然后设置成acquire(1)
  19. // 这是原子性的操作,可以保证线程安全
  20. setExclusiveOwnerThread(current);
  21. // 设置当前执行的线程,直接返回true
  22. return true;
  23. }
  24. }
  25. else if (current == getExclusiveOwnerThread()) {
  26. // 2.当前的线程和执行中的线程是同一个,也就意味着可重入操作
  27. int nextc = c + acquires;
  28. if (nextc < 0) // overflow
  29. throw new Error("Maximum lock count exceeded");
  30. setState(nextc);
  31. // 表示当前锁被1个线程重复获取了nextc次
  32. return true;
  33. }
  34. // 否则就返回false,表示没有成功获取当前锁,进入排队过程
  35. return false;
  36. }
  37. // 省略其他
  38. }

6.8.4 ReentrantLock公平锁的抢占流程

image.png

6.8.5 公平锁的同步器子类

  1. static final class FairSync extends Sync {
  2. //公平锁抢占的钩子方法
  3. final void lock() {
  4. acquire(1);
  5. }
  6. // 省略其他
  7. }

6.8.6 公平抢占的钩子方法:tryAcquire(arg)

  1. static final class FairSync extends Sync {
  2. //公平抢占的钩子方法
  3. protected final boolean tryAcquire(int acquires) {
  4. final Thread current = Thread.currentThread();
  5. int c = getState(); //锁状态
  6. if (c == 0) {
  7. if (!hasQueuedPredecessors() && //有后继节点就返回,足够讲义气
  8. compareAndSetState(0, acquires)) {
  9. setExclusiveOwnerThread(current);
  10. return true;
  11. }
  12. }
  13. else if (current == getExclusiveOwnerThread()) {
  14. int nextc = c + acquires;
  15. if (nextc < 0)
  16. throw new Error("Maximum lock count exceeded");
  17. setState(nextc);
  18. return true;
  19. }
  20. return false;
  21. }
  22. }

6.8.7 是否有后继节点的判断

6.9 AQS条件队列

6.9.1 Condition基本原理

  1. public class ConditionObject implements Condition, java.io.Serializable {
  2. //记录该队列的头节点
  3. private transient Node firstWaiter;
  4. //记录该队列的尾节点
  5. private transient Node lastWaiter;
  6. }

image.png
在一个显式锁上,我们可以创建多个等待任务队列,这点和内置锁不同,Java内置锁上只有唯一的一个等待队列。比如,我们可以调用newCondition()创建两个等待队列,具体如下:

  1. private Lock lock = new ReentrantLock();
  2. //创建第一个等待队列
  3. private Condition firstCond = lock.newCondition();
  4. //创建第二个等待队列
  5. private Condition secondCond = lock.newCondition();

image.png

6.9.2 await()等待方法原理

image.png
在await()方法将当前线程挪动到Condition等待队列后,还会唤醒AQS同步队列中head节点的下一个节点。await()方法的核心代码如下:

  1. public final void await() throws InterruptedException {
  2. if (Thread.interrupted())
  3. throw new InterruptedException();
  4. Node node = addConditionWaiter(); // step 1
  5. int savedState = fullyRelease(node); // step 2
  6. int interruptMode = 0;
  7. while (!isOnSyncQueue(node)) { // step 3
  8. LockSupport.park(this);
  9. if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  10. break;
  11. }
  12. if (acquireQueued(node, savedState) // step 4
  13. && interruptMode != THROW_IE)
  14. interruptMode = REINTERRUPT;
  15. if (node.nextWaiter != null) //step 5
  16. unlinkCancelledWaiters();
  17. if (interruptMode != 0)
  18. reportInterruptAfterWait(interruptMode);
  19. }

创建一个新节点并放入Condition队列尾部的工作由addConditionWaiter()方法完成,该方法具体如下:

  1. private Node addConditionWaiter() {
  2. Node t = lastWaiter;
  3. // 如果尾节点取消,重新定位尾节点
  4. if (t != null && t.waitStatus != Node.CONDITION) {
  5. unlinkCancelledWaiters();
  6. t = lastWaiter;
  7. }
  8. // 创建一个新Node,作为等待节点
  9. Node node = new Node(Thread.currentThread(), Node.CONDITION);
  10. // 将新Node加入等待队列
  11. if (t == null)
  12. firstWaiter = node;
  13. else
  14. t.nextWaiter = node;
  15. lastWaiter = node;
  16. return node;
  17. }

6.9.3 signal()唤醒方法原理

image.png

  1. //唤醒
  2. public final void signal() {
  3. //如果当前线程不是持有该锁的线程,就抛出异常
  4. if (!isHeldExclusively())
  5. throw new IllegalMonitorStateException();
  6. Node first = firstWaiter;
  7. if (first != null)
  8. doSignal(first); //唤醒头节点
  9. }
  10. //执行唤醒
  11. private void doSignal(Node first) {
  12. do {
  13. //出队的代码写的很巧妙,要看仔细
  14. //first出队,firstWaiter 头部指向下一个节点,自己的nextWaiter
  15. if ((firstWaiter = first.nextWaiter) == null)
  16. lastWaiter = null; //如果第二节点为空,则尾部也为空
  17. //将原来头部first的后继置空,help for GC
  18. first.nextWaiter = null;
  19. } while (!transferForSignal(first) &&
  20. (first = firstWaiter) != null);
  21. }
  22. //将被唤醒的节点转移到同步队列
  23. final boolean transferForSignal(Node node) {
  24. if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
  25. return false;
  26. Node p = enq(node); // step 1
  27. int ws = p.waitStatus;
  28. if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
  29. LockSupport.unpark(node.thread); // step 2:唤醒线程
  30. return true;
  31. }

6.10 AQS的实际应用