全程高能,源码解读,拜读Doug Lea前辈的并发理念和玄妙设计。

分析并不完全透彻,只能大概梳理出大致流程,有时间再深入吧。

(我只是Javadoc的搬运工罢了。)

image-20210502211334391.png

7.1 AbstractQueuedSynchronizer

7.1.1 AQS概述

AbstractQueuedSynchronizer(抽象同步队列),是阻塞式锁和相关的同步器工具的框架。

  • 使用state属性来表示资源的状态(分独占模式和共享模式),子类需要定义如何维护这个状态,控制如何获取锁和释放锁。
    • getState:获取state状态
    • setState:设置state状态
    • compareAndSetState:乐观锁机制设置state状态
    • 独占模式是只有一个线程能够访问资源,而共享模式可以允许多个线程访问资源
  • 提供了基于FIFO的等待队列,在其内部通过节点headtail记录对手和队尾元素,队列的元素类型为Node
    • SHARED:用来标记该线程是是否是获取共享资源时被阻塞挂起后放入AQS队列的
    • EXCLUSIVE:用来标记线程是获取独占资源时被挂起后放入AQS队列的
    • waitStatus:记录当前线程等待状态
      • 1:CANCELLED,线程结点已释放(超时、中断),已取消的节点不会再阻塞
      • -1:SIGNAL,该线程的后续线程需要阻塞,即只要前驱结点释放锁,就会通知标识为SIGNAL状态的后继结点的线程
      • -2:CONDITION,该线程在condition队列中阻塞
      • -3:PROPAGATE,释放共享资源时需要通知其他节点
    • prev记录当前节点的前驱结点,next记录当前阶段的后继结点
  • 内部类ConditionObject,用来结合锁实现线程同步,是条件变量,每个条件变量对应一个条件队列(单向链表队列),用来存放调用条件变量的await方法后被阻塞的线程,队列的头、尾元素分别是firstWaiterlastWaiter

7.1.2 获取与释放资源

对于AQS来说,线程同步的关键是对状态值state进行操作。根据state是否属于一个线程,操作state的方式分为独占方式和共享方式。

  • 独占方式:void acquire(int arg) void acquireInterruptibly(int arg) boolean release(int arg)
  • 共享方式:void acquireShared(int arg) void acquireSharedInterruptibly(int arg) boolean releaseShared(int arg)

使用独占方式获取的资源是与具体线程绑定的,就是说如果一个线程获取到了资源,就会标记到是这个线程获取到了,其他线程再尝试操作state获取资源时会发现当前该资源不是自己持有的,就会在获取失败后被阻塞。比如独占锁ReentrantLock的实现,当一个线程获取了ReentrantLock的锁后,在AQS内部会首先使用CAS操作把state状态值从0变为1,然后设置当前锁的持有者exclusiveOwnerThread为当前线程,当该线程再次获取锁时发现它就是锁的持有者,则会把状态值state从1变为2,即设置可重入次数,而当另外一个线程获取锁时发现自己并不是该锁的持有者就会被放入AQS阻塞队列后挂起。

对应共享方式的资源与具体线程是不相关的,当多个线程去请求资源时通过CAS方式竞争获取资源,当一个线程获取到了资源后,另外一个线程再次去获取时如果当前资源还能满足它的需要,则当前线程只需要使用CAS方式进行获取即可。比如Semaphore信号量,当一个线程通过acquire()方法获取信号量时,会首先看当前信号量个数是否满足需要,不满足则把当前线程放入阻塞队列,如果满足则通过自旋CAS获取信号量。

7.1.2.1 独占方式

(1)当一个线程调用acquire(int arg)方法获取独占资源时,会首先使用tryAcquire方法尝试获取资源,具体是设置状态变量state的值,成功则直接返回,失败则将当前线程封装为类型为Node.EXCLUSIVENode节点后插入到AQS阻塞队列的尾部,并调用LockSupport.park(this)方法挂起自己。

  1. public final void acquire(int arg) {
  2. if (!tryAcquire(arg) &&
  3. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  4. selfInterrupt();
  5. }

(2)当一个线程调用release(int arg)方法时会尝试使用tryRelease操作释放资源,这里是设置状态变量state的值,然后调用LockSupport.unpark(thread)方法激活AQS队列里面被阻塞的一个线程(thread)。被激活的线程则使用tryAcquire尝试,看当前状态变量state的值是否能满足自己的需要,满足则该线程被激活,然后继续向下运行,否则还是会被放入AQS队列并被挂起。

  1. public final boolean release(int arg) {
  2. if (tryRelease(arg)) {
  3. Node h = head;
  4. if (h != null && h.waitStatus != 0)
  5. unparkSuccessor(h);
  6. return true;
  7. }
  8. return false;
  9. }

需要注意的是,AQS类并没有提供可用的tryAcquiretryRelease方法,正如AQS是锁阻塞和同步器的基础框架一样,tryAcquiretryRelease需要由具体的子类来实现。子类在实现tryAcquiretryRelease时要根据具体场景使用CAS算法尝试修改state状态值,成功则返回true,否则返回false。子类还需要定义,在调用acquirerelease方法时state状态值的增减代表什么含义。

7.1.2.2 共享方式

(1)当线程调用acquireShared(int arg)获取共享资源时,会首先使用tryAcquireShared尝试获取资源,具体是设置状态state的值,成功则直接返回,失败则将当前线程封装为类型为Node.SHAREDNode节点后插入到AQS阻塞队列的尾部,并使用LockSupport.park(this)方法挂起自己。

  1. public final void acquireShared(int arg) {
  2. if (tryAcquireShared(arg) < 0)
  3. doAcquireShared(arg);
  4. }

(2)当一个线程调用releaseShared(int arg)时会尝试使用tryReleaseShared操作释放资源,这里是设置状态变量state的值,然后使用LockSupport.unpark(thread)激活AQS队列里面被阻塞的一个线程(thread)。被激活的线程则使用tryReleaseShared查看当前状态变量state的值是否能满足自己的需要,满足则该线程被激活,然后继续向下运行,否则还是会被放入AQS队列并被挂起。

  1. public final boolean releaseShared(int arg) {
  2. if (tryReleaseShared(arg)) {
  3. doReleaseShared();
  4. return true;
  5. }
  6. return false;
  7. }

同样需要注意的是,AQS类并没有提供可用的tryAcquireSharedtryReleaseShared方法,正如AQS是锁阻塞和同步器的基础框架一样,tryAcquiretryRelease需要由具体的子类来实现。子类在实现tryAcquireSharedtryReleaseShared时要根据具体场景使用CAS算法尝试修改state状态值,成功则返回true,否则返回false。

7.1.2.3 入队操作

当一个线程获取锁失败后该线程会被转换为Node节点,然后就会使用enq(final Node node)方法将该节点插入到AQS的阻塞队列。

  1. private Node enq(final Node node) {
  2. for (;;) {
  3. Node t = tail; // (1)
  4. if (t == null) { // 初始化
  5. if (compareAndSetHead(new Node())) // (2)
  6. tail = head;
  7. } else {
  8. node.prev = t; // (3)
  9. if (compareAndSetTail(t, node)) { // (4)
  10. t.next = node;
  11. return t;
  12. }
  13. }
  14. }
  15. }

image-20210503145933217.png

在第一次循环中,在队列尾部插入元素时,队列状态如图(default)所示,队列头尾节点都指向null;

执行代码(1)后节点t指向了尾部节点,此时队列状态如图(I)所示;

执行代码(2),使用CAS设置一个哨兵节点为头结点,如果设置成功,则让尾部节点也指向哨兵节点,此时队列状态如图(II)所示。

image-20210503151528555.png

第一次循环之后只插入了一个哨兵节点,还需要插入node节点。

在第二次循环中,执行代码(1)后节点t指向了哨兵节点,此时队列状态如图(III)所示;

执行代码(3),设置node的前驱结点为哨兵节点,此时队列状态如图(IV)所示;

执行代码(4),通过CAS设置node节点为尾部节点,如果设置成功,则队列状态如图(V)所示;

然后设置哨兵节点的后继结点为node结点,此时队列状态如图(VI)所示。

7.1.2.4 补充

基于AQS实现的锁除了需要重写上面方法以外,还需要重写isHeldExclusively方法,来判断是被当前线程独占还是被共享。

此外,与acquirerelease对应的都有一个带有Interruptibly关键字的函数。

区别如下:

不带interruptibly关键字的方法的意思是不对中断进行响应,也就是线程在调用不带interruptibly关键字的方法获取资源时或者获取资源失败被挂起时,其他线程中断了该线程,那么该线程不会因为被中断而抛出异常,它还是继续获取资源或者被挂起,也就是说不对中断进行响应,忽略中断。

而带interruptibly关键字的方法要对中断进行响应,也就是线程在调用带interruptibly关键字的方法获取资源时或者获取资源失败而被挂起时,其他线程中断了该线程,那么该线程会抛出InterruptedException异常而返回。

7.2 ReentrantLock

7.2.1 概述与应用

ReentrantLock是可重入的独占锁,同时只能有一个线程可以获取该锁,其他获取该锁的线程会被阻塞而被放入该锁的AQS阻塞队列中。

示例1:基于AQS实现的不可重入的独占锁

  1. import java.io.Serializable;
  2. import java.util.concurrent.TimeUnit;
  3. import java.util.concurrent.locks.AbstractQueuedSynchronizer;
  4. import java.util.concurrent.locks.Condition;
  5. import java.util.concurrent.locks.Lock;
  6. /**
  7. * @author KHighness
  8. * @since 2021-05-02
  9. * @apiNote 不可冲入独占锁
  10. */
  11. public class ParaKLock implements Lock, Serializable {
  12. private static final long serialVersionUID = 7379874578553124018L;
  13. // 独占锁同步器
  14. private static class ParaKSync extends AbstractQueuedSynchronizer {
  15. private static final long serialVersionUID = 6278547169976524823L;
  16. @Override // 是否锁已经被持有
  17. protected boolean isHeldExclusively() {
  18. return getState() == 1;
  19. }
  20. @Override // 如果state为0,则尝试获取锁
  21. protected boolean tryAcquire(int arg) {
  22. if (Thread.currentThread() == this.getExclusiveOwnerThread())
  23. throw new IllegalMonitorStateException("不支持锁重入");
  24. if (compareAndSetState(0, 1)) {
  25. // 设置独占者线程
  26. setExclusiveOwnerThread(Thread.currentThread());
  27. return true;
  28. }
  29. return false;
  30. }
  31. @Override // 尝释放锁,设置state = 0
  32. protected boolean tryRelease(int arg) {
  33. assert arg == 1;
  34. if (getState() == 0)
  35. throw new IllegalMonitorStateException("当前state不为0");
  36. setExclusiveOwnerThread(null);
  37. setState(0);
  38. return true;
  39. }
  40. // 提供条件变量接口
  41. Condition newCondition() {
  42. return new ConditionObject();
  43. }
  44. }
  45. private final ParaKSync sync = new ParaKSync();
  46. @Override // 一直尝试,失败进入AQS队列
  47. public void lock() {
  48. sync.acquire(1);
  49. }
  50. @Override // 尝试一次
  51. public boolean tryLock() {
  52. return sync.tryAcquire(1);
  53. }
  54. @Override // 超时尝试
  55. public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
  56. return sync.tryAcquireNanos(1, unit.toNanos(time));
  57. }
  58. @Override // 解锁
  59. public void unlock() {
  60. sync.release(1);
  61. }
  62. // 是否被占有
  63. public boolean isLocked() {
  64. return sync.isHeldExclusively();
  65. }
  66. @Override // 可被打断式加锁
  67. public void lockInterruptibly() throws InterruptedException {
  68. sync.acquireInterruptibly(1);
  69. }
  70. @Override
  71. public Condition newCondition() {
  72. return sync.newCondition();
  73. }
  74. }

示例2:基于上面自定义独占锁实现生产-消费模型

  1. import lombok.extern.slf4j.Slf4j;
  2. import java.util.Queue;
  3. import java.util.Random;
  4. import java.util.concurrent.LinkedBlockingDeque;
  5. import java.util.concurrent.TimeUnit;
  6. import java.util.concurrent.locks.Condition;
  7. /**
  8. * @author KHighness
  9. * @since 2021-05-03
  10. * @apiNote 生产-消费模型
  11. */
  12. @Slf4j(topic = "ProducerConsumerModel")
  13. public class ProducerConsumerModel {
  14. /** 不可重入独占锁 */
  15. private final static ParaKLock LOCK = new ParaKLock();
  16. /** 已满:生产者条件变量 */
  17. private final static Condition FULL = LOCK.newCondition();
  18. /** 已空:消费者者条件变量 */
  19. private final static Condition EMPTY = LOCK.newCondition();
  20. /** 产品队列 */
  21. private final static Queue<Character> QUEUE = new LinkedBlockingDeque<>();
  22. /** 最大容量 */
  23. private final static int QUEUE_MAX_SIZE = 5;
  24. /** 随机变量 */
  25. private final static Random RANDOM = new Random();
  26. /**
  27. * 生成随机数字/字母字符
  28. * ascii char
  29. * 48-57 0~9
  30. * 65-90 A~Z
  31. * 97-122 a~z
  32. * @return ascii码字符
  33. */
  34. private static Character randomChar() {
  35. int choice = RANDOM.nextInt(3);
  36. if (choice == 0)
  37. return (char) (RANDOM.nextInt(10) + 48);
  38. else if (choice == 1)
  39. return (char) (RANDOM.nextInt(26) + 65);
  40. else
  41. return (char) (RANDOM.nextInt(26) + 97);
  42. }
  43. /**
  44. * 生产者线程
  45. */
  46. static class Producer extends Thread {
  47. public Producer() {
  48. super.setName("Producer");
  49. }
  50. @Override
  51. public void run() {
  52. while (true) {
  53. // 获取独占锁
  54. LOCK.lock();
  55. try {
  56. // (1) 如果队列已满,等待
  57. while (QUEUE.size() == QUEUE_MAX_SIZE) {
  58. FULL.await();
  59. }
  60. // (2) 生产一个元素到队列
  61. Character character = randomChar();
  62. TimeUnit.SECONDS.sleep(1);
  63. log.debug("生产 => [{}]", character);
  64. QUEUE.add(character);
  65. // (3) 唤醒消费者线程
  66. EMPTY.signalAll();
  67. } catch (InterruptedException e) {
  68. e.printStackTrace();
  69. } finally {
  70. // 释放独占锁
  71. LOCK.unlock();
  72. }
  73. }
  74. }
  75. }
  76. /**
  77. * 消费者线程
  78. */
  79. static class Consumer extends Thread {
  80. public Consumer() {
  81. super.setName("Consumer");
  82. }
  83. @Override
  84. public void run() {
  85. while (true) {
  86. // 获取独占锁
  87. LOCK.lock();
  88. try {
  89. // (1) 如果队列为空,等待
  90. while (QUEUE.isEmpty()) {
  91. EMPTY.await();
  92. }
  93. // (2) 消费一个元素
  94. Character character = QUEUE.poll();
  95. TimeUnit.SECONDS.sleep(1);
  96. log.debug("消费 => [{}]", character);
  97. // (3) 唤醒生产者线程
  98. FULL.signalAll();
  99. } catch (InterruptedException e) {
  100. e.printStackTrace();
  101. } finally {
  102. LOCK.unlock();
  103. }
  104. }
  105. }
  106. }
  107. public static void main(String[] args) {
  108. Producer producer = new Producer();
  109. Consumer consumer = new Consumer();
  110. producer.start();
  111. consumer.start();
  112. }
  113. }

7.2.2 非公平锁实现原理

先结合ReentrantLock的内部类SyncNonfailSyncAbstractQueuedSynchronizer中的方法进行分析。

对于Sync中的抽象方法lock,公平锁与非公平锁分别有不同的实现:

  1. abstract void lock();

非公平锁NonfairSync的实现如下:

  1. // (一)
  2. final void lock() {
  3. if (compareAndSetState(0, 1))
  4. // 直接CAS,而不去阻塞队列中检查,体现非公平性
  5. // 成功则设置独占线程为当前线程
  6. setExclusiveOwnerThread(Thread.currentThread());
  7. else
  8. acquire(1); // 这个方法在AQS中已经实现好了
  9. }
  10. // (三)
  11. protected final boolean tryAcquire(int acquires) {
  12. return nonfairTryAcquire(acquires); // 这个方法在Sync中已经实现好了
  13. }

Sync中的nonfairTryAcquire方法:

  1. // (四)
  2. final boolean nonfairTryAcquire(int acquires) {
  3. // 获取当前进入方法的线程
  4. final Thread current = Thread.currentThread();
  5. // 获取state状态值
  6. int c = getState();
  7. // 如果还未获得锁
  8. if (c == 0) {
  9. // 不去AQS队列检查,直接尝试用CAS获得锁,体现了不公平性
  10. if (compareAndSetState(0, acquires)) {
  11. // 成功获取到锁,官宣独占
  12. setExclusiveOwnerThread(current);
  13. return true;
  14. }
  15. }
  16. // 如果已经获得了锁并且当前线程是独占线程,表示发生了锁重入
  17. else if (current == getExclusiveOwnerThread()) {
  18. // state++
  19. int nextc = c + acquires;
  20. if (nextc < 0) // overflow
  21. throw new Error("Maximum lock count exceeded");
  22. setState(nextc);
  23. return true;
  24. }
  25. // 获取失败,回到调用处
  26. return false;
  27. }

AbstractQueuedSynchronizer中涉及到的主要方法:

  1. // (二)
  2. // 以独占模式获取,忽略中断。通过至少调用一次tryAcquire返回成功。
  3. // 否则线程会排队,可能会反复阻塞和解除阻塞,调用tryAcquire直到成功。
  4. public final void acquire(int arg) {
  5. if (!tryAcquire(arg) &&
  6. // 尝试直接获取锁,看名字就知道,只是试一试
  7. // 有可能直接成功了,就不需要进队列排队了
  8. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  9. // Java编译器会优化,&&前面已经为false,后面的条件不再进行判断
  10. // 走到这一步说明,获取锁失败,然后执行addWaiter方法,节点设置为独占模式
  11. // 节点添加完毕后,执行acquireQueued方法,这个方法才是最终的BOSS
  12. // 这个方法一直循环到返回结果为true,进入if块内的代码执行
  13. selfInterrupt();
  14. }
  15. // (五)
  16. // 添加等待节点至队列末尾
  17. private Node addWaiter(Node mode) {
  18. // 创建新节点node,模式为EXCLUSIVE(独占)
  19. Node node = new Node(Thread.currentThread(), mode);
  20. // 获取尾部节点
  21. Node pred = tail;
  22. // 尾部节点非空
  23. if (pred != null) {
  24. // 先将node的前驱节点设置为当前尾部节点
  25. node.prev = pred;
  26. // 通过CAS操作将node节点设置为tail尾部节点
  27. if (compareAndSetTail(pred, node)) {
  28. // 如果CAS成功,则将原tail节点的后继结点设置为当前尾部节点node
  29. pred.next = node;
  30. return node;
  31. }
  32. }
  33. // 尾部节点为空,说明没有哨兵节点
  34. // 这个方法在7.1.2.3已经详述过
  35. // 需要创建新的哨兵节点,然后插入当前节点
  36. enq(node);
  37. return node;
  38. }
  39. // (六)
  40. // 在队列中循环尝试获取锁
  41. final boolean acquireQueued(final Node node, int arg) {
  42. boolean failed = true;
  43. try {
  44. // 线程获取锁的过程中是否产生了中断
  45. boolean interrupted = false;
  46. for (;;) {
  47. // 获取当前节点的前驱节点
  48. final Node p = node.predecessor();
  49. // 如果前驱节点为head哨兵节点,说明当前节点为第二节点
  50. // 拥有尝试获取锁的资格
  51. // 获取锁成功则进入if代码块,剔除当前节点
  52. if (p == head && tryAcquire(arg)) {
  53. // 将当前节点设置为头节点,属性thread和prev都设置为null
  54. // 直接使用setHead而非compareAndSetHead,没有使用CAS
  55. // 是因为此时已经成功获取到独占锁
  56. setHead(node);
  57. // 辅助GC
  58. p.next = null;
  59. failed = false;
  60. return interrupted;
  61. }
  62. // 执行到这里,只有两种情况
  63. // (1)不符合获取锁的前提,即当前节点不是第二节点
  64. // (2)tryAcquire失败,即没有抢占到锁
  65. if (shouldParkAfterFailedAcquire(p, node) &&
  66. parkAndCheckInterrupt())
  67. interrupted = true; // 产生了中断,设置后永久保留
  68. }
  69. } finally {
  70. // 当tryAcquire为抛出异常的时候,failed为true
  71. if (failed)
  72. cancelAcquire(node);
  73. }
  74. }
  75. // (七)
  76. // 获取失败后应该挂起
  77. // 寻找node的有效前驱,并且将有效前驱状态设置为SIGNAL,返回true代表马上可以阻塞
  78. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  79. // 前驱节点的等待状态
  80. int ws = pred.waitStatus;
  81. // 前驱节点的状态为SIGNAL,状态正常,当前线程需要挂起,自己的唤醒依赖于前驱节点。
  82. // 如果前驱节点变成了head,并且head的代表线程释放了锁,就会根据这个SIGNAL唤醒自己。
  83. if (ws == Node.SIGNAL)
  84. // 返回true,则进入parakAndCheckInterrupt挂起当前线程
  85. return true;
  86. // 前驱节点的状态为CANCELLED,说明前驱节点因为超时或者响应了中断而取消排队。
  87. // 因此需要跨越这些CANCELLED结点,直到找到一个状态<0的节点做自己的前驱。
  88. if (ws > 0) {
  89. do {
  90. node.prev = pred = pred.prev;
  91. } while (pred.waitStatus > 0);
  92. pred.next = node;
  93. }
  94. // 前驱节点的状态为0,通过CAS将其状态设置为SIGNAL(-1)
  95. else {
  96. compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
  97. }
  98. // 返回false则不进入parakAndChecKInterrupt继续在acquireQueeud中循环
  99. // 再次进入这个方法时,便会从第一个if分支返回true
  100. return false;
  101. }
  102. // (八)
  103. // 挂起并检查中断状态
  104. private final boolean parkAndCheckInterrupt() {
  105. // 挂起当前线程
  106. // 唤醒方式:unpark/interrupt/return
  107. LockSupport.park(this);
  108. // 返回中断状态,并且清除中断标记
  109. return Thread.interrupted();
  110. }
  111. // (九)
  112. // 补充中断标记
  113. static void selfInterrupt() {
  114. Thread.currentThread().interrupt();
  115. }

非公平抢锁过程总结:

调用ReentrantLocklock方法,会执行AQS的acquire方法,先执行NonFairSynctryAcquire方法尝试获取锁,失败则执行AQS的acquireQueued方法在队列中循环获取锁。

tryAcquire:检查state状态值,为0说明还未有线程获取锁,那么自己就直接尝试通过CAS操作将state设置为1,成功则官宣自己独占;不为0且当前线程是独占线程,表示锁重入,那么state+1。以上两种情况成功都返回true,从而结束抢锁过程,其他情况都代表获取失败,返回false

获取失败后,先执行addWaiter方法将当前线程封装为节点添加到队列末尾,然后进入acquireQueued方法,循环获取直到成功,执行selfInterrupt方法。

acquireQueued:设置一个标志interrupted = false,代表线程获取锁的过程中是否产生了中断,第一个if分支,判断当前节点的前驱节点是否为哨兵节点,是则拥有获取锁的资格,执行tryAcquire尝试获取锁,成功则将前驱节点剔除,并且返回interrupted;往下执行,第二个if分支,说明当前节点不是第二节点,没有获取锁的资格,或者是tryAcquire失败,没有抢占到锁,执行shouldParkAfterFailedAcquireparkAndCheckInterrupt,执行成功后将interrupted设置为trueshouldParkAfterFailedAcquire返回true则说明获取锁的过程中产生了中断,但是这个中断标记在parkAndCheckInterrupt中清除了,所以需要填补中断标记。

shouldParkAfterFailedAcquire:顾名思义,获取失败后应该挂起,如果当前节点的前驱节点的状态为SIGNAL,那么前驱节点释放锁后会通知自己,自己就可以安心阻塞,并返回true;如果当前节点的前驱节点的状态为CANCELLED即取消排队,那么需要寻找当前节点的有效前驱,即跳过状态为CANCELLED的节点,找到一个状态为SIGNAL的节点;否则当前节点的前驱节点的状态为0,则通过CAS操作将前驱节点的状态设置为SIGNAL,后两种都会返回false,那么会在acquireQueued的for循环中进行下一轮,第二次会按照第一种情况返回true。返回true后,执行parkAndCheckInterrupt

parkAndCheckInterrupt:先调用LockSupport.park挂起当前线程(有三种唤醒方式:①unpark ②interrupt ③return),那么当前线程等待唤醒(被唤醒的时候会进入acquireQueued的第一个if分支),后调用Thread.interrupted方法返回并重置中断状态。

这里为什么需要清除中断状态呢? 知乎: https://www.zhihu.com/question/399039232/answer/1260843911

如果不清除中断状态,也就是返回Thread.currentThread.isInterrupt()方法,那么当前线程挂起后依然是中断状态,那么下一轮循环的时候LockSupport.lock()方法将会失效,使得当前线程持续运行,在acquireQueued()的for循环中空转,导致CPU100%。

下面开始图示详解:

没有竞争时

image-20210502211712585.png

第一个竞争出现时

image-20210502211848516.png

Thread-1执行了

  1. CASC尝试将state0改为1,结果失败
  2. 进入tryAcquire逻辑,这时state已经是1,结果仍然失败
  3. 接下来进入addWaiter逻辑,构造Node队列

    • 图中黄色三角表示该NodewaitStatus状态,其中0为默认正常状态
    • Node的创建是懒惰的
    • 其中第一个Node称为Dummy(哑元)或哨兵,用来占位,并不关联线程


    image-20210502213906982.png

进入acquireQueued逻辑

  1. acquireQueued会在一个死循环中不断尝试获得锁,失败后进入park阻塞
  2. 如果自己是紧邻着head(排第二位),那么再次tryAcquire尝试获取锁,当然这时state仍为1,失败
  3. 进入shouldParkAfterFailedAcquire逻辑,将前驱node,即headwaitStatus改为-1,这次返回false

image-20210502220340804.png

  1. shouldParkAfterFailedAcquire执行完毕回到acquireQueued,再次tryAcquire尝试获取锁,当然这时state仍然为1,失败
  2. 当再次进入shouldParkAfterFailedAcquire时,这时因为其前驱nodewaitStatus已经是-1,这次返回true
  3. 进入parkAndCheckInterrupt,Thread-1 park(灰色表示)

image-20210502220921530.png

再次有多个线程经历上述过程竞争失败,变成这个样子

image-20210502221051585.png

Thread-0释放锁

  • 设置exclusiveOwnerThreadnull
  • state = 0

image-20210502221412738.png

当前队列不为null,并且headwaitStatus,进入unparkSuccessor流程

找到队列中离head最近的一个node(没取消的),unpark恢复其运行,本例中即为Thread-1。

回到Thread-1的acquireQueued流程

image-20210502222848420.png

如果加锁成功(没有竞争),会设置

  • exclusiveOwnerThread为Thread-1,state = 1
  • head指向刚刚Thread-1所在的Node,该node清空Thread
  • 原本的head因为从链表断开,而可被垃圾回收

如果这时候有其他线程来竞争(非公平的体现),例如这时有Thrtead-4来了

image-20210502224147622.png

如果不巧又被Thread-4占了先

  • Thread-4被设置为exclusiveOwnerThreadstate = 1
  • Thread-1再次进入acquireQueued流程,获取锁失败,重新进入park阻塞

7.2.3 公平锁实现原理

ReentrantLock-FairSync覆盖方法tryAcquire

  1. protected final boolean tryAcquire(int acquires) {
  2. final Thread current = Thread.currentThread();
  3. int c = getState();
  4. if (c == 0) {
  5. // 先检查AQS队列中是否有前驱节点,没有才去竞争
  6. if (!hasQueuedPredecessors() &&
  7. compareAndSetState(0, acquires)) {
  8. setExclusiveOwnerThread(current);
  9. return true;
  10. }
  11. }
  12. else if (current == getExclusiveOwnerThread()) {
  13. int nextc = c + acquires;
  14. if (nextc < 0)
  15. throw new Error("Maximum lock count exceeded");
  16. setState(nextc);
  17. return true;
  18. }
  19. return false;
  20. }

AbstractQueuedSynchronizer中涉及到的方法:

  1. // 查询是否有线程等待获取的时间超过当前线程,即查询等待队列中的线程
  2. public final boolean hasQueuedPredecessors() {
  3. Node t = tail; // 尾节点要么为空,要么为等待
  4. Node h = head; // 头节点要么为空,要么为哨兵节点
  5. Node s;
  6. return h != t && // 头尾节点不同说明队列中有其他节点
  7. ((s = h.next) == null || //表示队列中没有第二节点
  8. s.thread != Thread.currentThread()); // 或者队列中第二线程不是此线程
  9. }

7.2.4 可重入原理

ReentrantLock的内部类Sync

  1. // 获取锁
  2. final boolean nonfairTryAcquire(int acquires) {
  3. // 获取当前线程
  4. final Thread current = Thread.currentThread();
  5. // 获取state状态值
  6. int c = getState();
  7. // 如果未获得锁
  8. if (c == 0) {
  9. // CAS成功后设置独占线程为当前线程即可
  10. if (compareAndSetState(0, acquires)) {
  11. setExclusiveOwnerThread(current);
  12. return true;
  13. }
  14. }
  15. // 如果已经获得锁,当前线程是独占线程,表示发生了锁重入
  16. else if (current == getExclusiveOwnerThread()) {
  17. // state在当前基础上加acquires
  18. int nextc = c + acquires;
  19. if (nextc < 0) // overflow
  20. throw new Error("Maximum lock count exceeded");
  21. setState(nextc);
  22. return true;
  23. }
  24. // 如果已经获得锁,当前线程非独占线程,因为是独占锁,其他线程只能阻塞
  25. return false;
  26. }
  27. // 释放锁
  28. protected final boolean tryRelease(int releases) {
  29. int c = getState() - releases;
  30. // 当前线程非独占线程则抛出异常
  31. if (Thread.currentThread() != getExclusiveOwnerThread())
  32. throw new IllegalMonitorStateException();
  33. boolean free = false;
  34. // 当state减为0时锁才释放成功
  35. if (c == 0) {
  36. free = true;
  37. setExclusiveOwnerThread(null);
  38. }
  39. setState(c);
  40. return free;
  41. }

7.2.5 可打断原理

不可打断模式:

  1. // 线程无法获得锁时,进入这个方法
  2. final boolean acquireQueued(final Node node, int arg) {
  3. boolean failed = true;
  4. try {
  5. boolean interrupted = false;
  6. for (;;) {
  7. final Node p = node.predecessor();
  8. if (p == head && tryAcquire(arg)) {
  9. setHead(node);
  10. p.next = null; // help GC
  11. failed = false;
  12. // 还是需要获得锁后,才能返回打断状态
  13. return interrupted;
  14. }
  15. if (shouldParkAfterFailedAcquire(p, node) &&
  16. parkAndCheckInterrupt())
  17. // 如果是因为interrupt被唤醒,返回打断状态为true
  18. interrupted = true;
  19. }
  20. } finally {
  21. if (failed)
  22. cancelAcquire(node);
  23. }
  24. }
  25. private final boolean parkAndCheckInterrupt() {
  26. // 如果打断标记已经是true,则park会失效
  27. LockSupport.park(this);
  28. // interrupted会清除打断标记
  29. return Thread.interrupted();
  30. }
  31. // 重新产生一次中断
  32. static void selfInterrupt() {
  33. Thread.currentThread().interrupt();
  34. }

可打断模式:

  1. public final void acquireInterruptibly(int arg)
  2. throws InterruptedException {
  3. if (Thread.interrupted())
  4. throw new InterruptedException();
  5. // 如果没有获得到锁,进入
  6. if (!tryAcquire(arg))
  7. doAcquireInterruptibly(arg);
  8. }
  9. // 可打断锁的获取流程
  10. private void doAcquireInterruptibly(int arg)
  11. throws InterruptedException {
  12. final Node node = addWaiter(Node.EXCLUSIVE);
  13. boolean failed = true;
  14. try {
  15. for (;;) {
  16. final Node p = node.predecessor();
  17. if (p == head && tryAcquire(arg)) {
  18. setHead(node);
  19. p.next = null; // help GC
  20. failed = false;
  21. return;
  22. }
  23. if (shouldParkAfterFailedAcquire(p, node) &&
  24. parkAndCheckInterrupt())
  25. // 在park过程中如果被interrupt会进入此
  26. // 这时候抛出异常,而不会再次进入for循环
  27. throw new InterruptedException();
  28. }
  29. } finally {
  30. if (failed)
  31. cancelAcquire(node);
  32. }
  33. }

7.2.6 条件变量实现原理

每个条件变量其实就对应着一个等待队列,其实现类是ConditionObject

内部属性:

  1. // 成员变量
  2. private transient Node firstWaiter; // 首节点
  3. private transient Node lastWaiter; // 尾节点
  4. // 常量
  5. private static final int REINTERRUPT = 1; // 从等待状态切换为中断状态
  6. private static final int THROW_IE = -1; // 抛出异常标识

await流程

  1. // 向队列中添加节点并返回
  2. private Node addConditionWaiter()
  3. // 释放节点持有的锁
  4. final int fullyRelease(Node node)
  5. // 判断节点是否在同步队列中
  6. final boolean isOnSyncQueue(Node node)
  7. // 检查线程是否中断,如果是则终止Condition状态并加入到同步队列
  8. private int checkInterruptWhileWaiting(Node node)
  9. // 操作节点去申请锁
  10. final boolean acquireQueued(final Node node, int arg)
  11. // 清理等待队列中无效节点
  12. private void unlinkCancelledWaiters()
  13. // 处理线程中断情况
  14. private void reportInterruptAfterWait(int interruptMode)

开始Thread-0持有锁,调用await,进入ConditionObjectaddConditionWaiter流程。

创建新的Node状态为-2(Node.CONDITION),关联Thread-0,加入等待队列尾部。

image-20210504184051341.png

接下来进入AQS的fullRelease流程,释放同步器上的锁

image-20210505112113307.png

unpark AQS队列中的下一个节点,竞争锁,假设没有其他竞争线程,那么Thread-1竞争成功

image-20210505112236553.png

park阻塞Thread-0

image-20210505112338808.png

signal流程

  1. // 唤醒线程入口
  2. public final void signal()
  3. // 判定当前线程是否持有锁
  4. protected boolean isHeldExclusively()
  5. // 唤醒first节点
  6. private void doSignal(Node first)
  7. // 转换节点状态
  8. final boolean transferForSignal(Node node)

假设Thread-1要来唤醒Thread-0

image-20210505115153129.png

进入ConditionObjectdoSignal流程,取得等待队列中第一个Node,Thread-0所在Node

image-20210505120611070.png

执行transferForSignal流程,将该Node加入到AQS队列尾部,将Thread-0的waitStatus改为0,Thread-3的waitStatus改为-1

image-20210505121200958.png

Thread-1释放锁,进入unlock流程。

7. 3 ReentrantReadWriteLock

7.3.1 概述与应用

当读操作远远高于写操作时,这时候使用读写锁让读-读可以并发,提高性能。

  • 读锁不支持条件变量
  • 重入时升级不支持:持有读锁的情况下去获取写锁,会导致写锁永久等待
  • 重入时降级支持:持有写锁的情况下去获取读锁

示例1:数据容器

  1. import lombok.extern.slf4j.Slf4j;
  2. import java.util.Random;
  3. import java.util.concurrent.TimeUnit;
  4. import java.util.concurrent.locks.ReentrantReadWriteLock;
  5. /**
  6. * @author KHighness
  7. * @since 2021-05-05
  8. */
  9. @Slf4j(topic = "DataContainer")
  10. class DataContainer {
  11. private Object data;
  12. private final static Random RANDOM = new Random();
  13. private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
  14. private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
  15. private final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
  16. private static Character randomChar() {
  17. int choice = RANDOM.nextInt(3);
  18. if (choice == 0)
  19. return (char) (RANDOM.nextInt(10) + 48);
  20. else if (choice == 1)
  21. return (char) (RANDOM.nextInt(26) + 65);
  22. else
  23. return (char) (RANDOM.nextInt(26) + 97);
  24. }
  25. public void read() {
  26. log.debug("获取读锁...");
  27. readLock.lock();
  28. try {
  29. log.debug("读取数据 => [{}]", data);
  30. } finally {
  31. log.debug("释放读锁...");
  32. readLock.unlock();
  33. }
  34. }
  35. public void write() {
  36. log.debug("获取写锁...");
  37. writeLock.lock();
  38. try {
  39. log.debug("写入数据 => [{}]", data = randomChar());
  40. } finally {
  41. log.debug("释放写锁...");
  42. writeLock.unlock();
  43. }
  44. }
  45. }

示例2:文档中的应用示例

  1. import java.util.Random;
  2. import java.util.concurrent.locks.ReentrantReadWriteLock;
  3. /**
  4. * @author KHighness
  5. * @since 2021-05-05
  6. */
  7. class CachedData {
  8. Object data;
  9. // 是否有效,如果失效,需要重新计算data
  10. volatile boolean cacheValid;
  11. final static Random RANDOM = new Random();
  12. final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
  13. void processCachedData() {
  14. lock.readLock().lock();
  15. if (!cacheValid) {
  16. // 不支持锁升级,获取写锁钱必须释放读锁
  17. lock.readLock().unlock();
  18. lock.writeLock().lock();
  19. try {
  20. // 双重检查,判断是否其他线程已经获取了读锁更新缓存,避免重复更新
  21. if (!cacheValid) {
  22. data = write();
  23. cacheValid = true;
  24. }
  25. // 降为读锁,释放写锁,让其他线程读取缓存
  26. lock.readLock().lock();
  27. } finally {
  28. lock.writeLock().unlock();
  29. }
  30. }
  31. // 使用数据,释放读锁
  32. try {
  33. use(data);
  34. } finally {
  35. lock.readLock().unlock();
  36. }
  37. }
  38. }

示例3:Redis的Java缓存

  1. import lombok.extern.slf4j.Slf4j;
  2. import redis.clients.jedis.Jedis;
  3. import java.lang.reflect.Field;
  4. import java.util.HashMap;
  5. import java.util.Map;
  6. import java.util.Objects;
  7. import java.util.concurrent.locks.ReentrantReadWriteLock;
  8. /**
  9. * @author KHighness
  10. * @since 2021-05-05
  11. */
  12. @Slf4j(topic = "GenericRedisDemo")
  13. public class GenericRedisDemo {
  14. public static void main(String[] args) {
  15. GenericCached cached = new GenericCached();
  16. cached.update(new User(1, "KHighness", "parakovo@gmail.com"));
  17. cached.update(new User(2, "RubbishK", "rubbish@gmail.com"));
  18. cached.update(new User(3, "FlowerK", "flower@gmail.com"));
  19. log.debug(cached.query(User.class, 1).toString());
  20. log.debug(cached.query(User.class, 1).toString());
  21. log.debug(cached.query(User.class, 2).toString());
  22. }
  23. }
  24. class User {
  25. private Integer id;
  26. private String name;
  27. private String email;
  28. public User(Integer id, String name, String email) {
  29. this.id = id;
  30. this.name = name;
  31. this.email = email;
  32. }
  33. }
  34. interface Generic {
  35. String update(Object obj);
  36. Map<String, String> query(Class<?> clazz, Integer id);
  37. }
  38. class GenericRedis implements Generic {
  39. private final Jedis jedis = new Jedis("127.0.0.1", 6379);
  40. public String update(Object obj) {
  41. Class<?> clazz = obj.getClass();
  42. Field[] declaredFields = clazz.getDeclaredFields();
  43. Map<String, String> map = new HashMap<>();
  44. Integer id = null;
  45. try {
  46. Field idField = clazz.getDeclaredField("id");
  47. idField.setAccessible(true);
  48. id = (Integer) idField.get(obj);
  49. } catch (NoSuchFieldException | IllegalAccessException e) {
  50. e.printStackTrace();
  51. }
  52. for (Field field : declaredFields) {
  53. field.setAccessible(true);
  54. try {
  55. map.put(field.getName(), field.get(obj).toString());
  56. } catch (IllegalAccessException e) {
  57. e.printStackTrace();
  58. }
  59. }
  60. return jedis.hmset(clazz.getSimpleName() + ":" + id, map);
  61. }
  62. public Map<String, String> query(Class<?> clazz, Integer id) {
  63. return jedis.hgetAll(clazz.getSimpleName() + ":" + id);
  64. }
  65. }
  66. // 装饰器模式,强化
  67. @Slf4j(topic = "GenericCached")
  68. class GenericCached implements Generic {
  69. private final GenericRedis redis = new GenericRedis();
  70. private final Map<Key, Map<String, String>> cache = new HashMap<>();
  71. private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
  72. static class Key {
  73. Class<?> clazz;
  74. Integer id;
  75. public Key(Class<?> clazz, Integer id) {
  76. this.clazz = clazz;
  77. this.id = id;
  78. }
  79. @Override
  80. public boolean equals(Object o) {
  81. if (this == o) return true;
  82. if (o == null || getClass() != o.getClass()) return false;
  83. Key key = (Key) o;
  84. return Objects.equals(clazz, key.clazz) && Objects.equals(id, key.id);
  85. }
  86. @Override
  87. public int hashCode() {
  88. return Objects.hash(clazz, id);
  89. }
  90. @Override
  91. public String toString() {
  92. return "Key[" + "clazz=" + clazz + ", id=" + id + ']';
  93. }
  94. }
  95. // 先更新redis再清除缓存,需要加写锁
  96. @Override
  97. public String update(Object obj) {
  98. String res = null;
  99. lock.writeLock().lock();
  100. try {
  101. // 更新redis
  102. res = redis.update(obj);
  103. // 清除缓存
  104. Field idField = obj.getClass().getDeclaredField("id");
  105. idField.setAccessible(true);
  106. Key key = new Key(obj.getClass(), (int) idField.get(obj));
  107. if (cache.remove(key) != null) {
  108. log.debug("清除缓存: {}", key);
  109. }
  110. } catch (IllegalAccessException | NoSuchFieldException e) {
  111. e.printStackTrace();
  112. } finally {
  113. lock.writeLock().unlock();
  114. }
  115. return res;
  116. }
  117. // 先查询缓存,需要加读锁;
  118. // 没有则查询redis添加到缓存,需要加写锁
  119. @Override
  120. public Map<String, String> query(Class<?> clazz, Integer id) {
  121. Key key = new Key(clazz, id);
  122. Map<String, String> res = null;
  123. lock.readLock().lock();
  124. // 查询缓存
  125. try {
  126. if (cache.containsKey(key)) {
  127. log.debug("缓存查询: {}", res = cache.get(key));
  128. return res;
  129. }
  130. } finally {
  131. lock.readLock().unlock();
  132. }
  133. lock.writeLock().lock();
  134. // 查询数据库,添加到缓存
  135. try {
  136. // 双重检查
  137. if (cache.containsKey(key)) {
  138. log.debug("缓存查询: {}", res = cache.get(key));
  139. return res;
  140. }
  141. cache.put(key, res = redis.query(clazz, id));
  142. log.debug("添加缓存: {}", key);
  143. } finally {
  144. lock.writeLock().unlock();
  145. }
  146. return res;
  147. }
  148. }

7.3.2 内部类Sync概述

读写锁的内部维护了一个ReadLock和一个WriteLock,它们依赖Sync实现具体功能,Sync继承自AQS,并且也提供了公平锁和非公平锁的实现。

AQS中只维护了一个state状态,那么如何表示读和写两种状态呢?

Sync巧妙地使用state的高16位表示读获取到读锁的次数,低16位表示获取到写锁的线程的可重入次数。

  1. static final int SHARED_SHIFT = 16;
  2. // 共享锁(读锁)状态单位值65536
  3. static final int SHARED_UNIT = (1 << SHARED_SHIFT);
  4. // 共享锁线程最大个数65535
  5. static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
  6. // 排它锁(写锁)掩码
  7. static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
  8. /** 返回读锁线程数 */
  9. static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
  10. /** 返回写锁可重入个数 */
  11. static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

Sync的成员:

  • firstReader:记录第一个获取到读锁的线程
  • firstReaderHoldCount:记录第一个获取到读锁的线程获取读锁的可重入次数
  • cacheHoldCounter:记录最后一个获取读锁的线程获取读锁的可重入次数
  • readHolds:存放除去第一个获取锁线程外的其他线程获取读锁的可重入次数。
  1. static final class HoldCounter {
  2. int count = 0;
  3. // 线程ID
  4. final long tid = getThreadId(Thread.currentThread());
  5. }
  6. static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> {
  7. public HoldCounter initialValue() {
  8. return new HoldCounter();
  9. }
  10. }

7.3.3 写锁的获取与释放

ReentrantReadWriteLock中写锁使用WriteLock实现。

获取

写锁是个独占锁,某时只有一个线程可以获取该锁。

如果当前没有线程获取到读锁和写锁,则当前线程可以获取到写锁然后返回。

如果当前已经有线程获取到读锁和写锁,则当前请求写锁的线程会被阻塞挂起。

另外,写锁是可重入锁,再次获取只是简单地把可重入次数加1后直接返回。

(1)WriteLocklock方法:

  1. public void lock() {
  2. sync.acquire(1); // 继承自AQS
  3. }

(2)AbstractQueuedSynchronizeracquire方法:

  1. public final void acquire(int arg) {
  2. if (!tryAcquire(arg) && // Sync重写
  3. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  4. selfInterrupt();
  5. }

(3)Sync重写的tryAcquire方法:

  1. protected final boolean tryAcquire(int acquires) {
  2. Thread current = Thread.currentThread();
  3. int c = getState();
  4. int w = exclusiveCount(c);
  5. // (1) c != 0说明读锁或者写锁已经被某线程获取
  6. if (c != 0) {
  7. // (2) w = 0说明已经有线程获取了读锁,w != 0并且当前线程不是写锁拥有者,则返回false
  8. if (w == 0 || current != getExclusiveOwnerThread())
  9. return false;
  10. // (3) 说明当前线程获取了写锁,判断可重入次数
  11. if (w + exclusiveCount(acquires) > MAX_COUNT)
  12. throw new Error("Maximum lock count exceeded");
  13. // (4)设置可重入次数(+1)
  14. setState(c + acquires);
  15. return true;
  16. }
  17. // (5) c == 0说明第一个写线程获取写锁
  18. if (writerShouldBlock() ||
  19. !compareAndSetState(c, c + acquires))
  20. return false;
  21. setExclusiveOwnerThread(current);
  22. return true;
  23. }

(4)writerShouldBlock方法:

  • 公平锁NonfairSync实现:return false,即抢占式执行CAS尝试获取写锁,获取成功则设置当前锁的持有者为当前线程并返回true。
  • 非公平锁FairSync的实现:return hashQueuedPredecessors(),判断当前线程节点是否有前驱结点,有则放弃获取写锁的权限,直接返回false。

释放

如果当前线程持有该所,unlock会让该线程对该线程持有的AQS状态值减1,如果减1后当前状态值为0则当前线程会释放该锁,否则仅仅减1而已。

如果当前线程没有持有该锁而调用unlock抛出IllegalMonitorStateException异常。

(1)WriteLockunlock方法:

  1. public void unlock() {
  2. sync.release(1); // 继承自AQS
  3. }

(2)AbstractQueuedSynchronizerrelease方法:

  1. public final boolean release(int arg) {
  2. if (tryRelease(arg)) { // Sync重写
  3. Node h = head;
  4. // 激活阻塞队列中的一个线程
  5. if (h != null && h.waitStatus != 0)
  6. unparkSuccessor(h);
  7. return true;
  8. }
  9. return false;
  10. }

(3)Sync重写的tryRelease方法:

  1. protected final boolean tryRelease(int releases) {
  2. // (1) 检查当前线程是否为锁拥有者
  3. if (!isHeldExclusively())
  4. throw new IllegalMonitorStateException();
  5. // (2) 获取可重入值,这里没有考虑高16位,因为获取写锁时读锁状态值肯定为0
  6. int nextc = getState() - releases;
  7. boolean free = exclusiveCount(nextc) == 0;
  8. // (3) 如果写锁可重入值为0则释放锁,否则只是简单更新状态值
  9. if (free)
  10. setExclusiveOwnerThread(null);
  11. setState(nextc);
  12. return free;
  13. }

7.3.4 读锁的获取与释放

ReentrantReadWriteLock中读锁使用ReadLock实现。

获取

如果当前没有线程持有写锁,则当前线程可以获取锁,AQS的状态值state的高16位的值会加1,然后方法返回。

如果其他一个线程持有锁,则当前线程会阻塞。

(1)Readlock方法:

  1. public void lock() {
  2. sync.acquireShared(1); // 继承自AQS
  3. }

(2)AbstractQueuedSynchronizeracquireShared方法:

  1. public final void acquireShared(int arg) {
  2. if (tryAcquireShared(arg) < 0) // Sync重写
  3. doAcquireShared(arg);
  4. }

(3)Sync重写的tryAcquireShared方法:

  1. protected final int tryAcquireShared(int unused) {
  2. // (1) 获取当前线程
  3. Thread current = Thread.currentThread();
  4. // (2) 获取当前状态值
  5. int c = getState();
  6. // (3) 判断当前写锁是否被占用
  7. if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current)
  8. return -1;
  9. // (4) 获取读锁计数
  10. int r = sharedCount(c);
  11. // (5) 尝试获取锁,多个度线程只有一个会成功,失败的进入fullTryAcquireShared进行重试
  12. if (!readerShouldBlock() &&
  13. r < MAX_COUNT &&
  14. compareAndSetState(c, c + SHARED_UNIT)) {
  15. // (6) 如果当前线程是第一个获取锁的线程
  16. if (r == 0) {
  17. firstReader = current;
  18. firstReaderHoldCount = 1;
  19. }
  20. // (7) 如果当前线程是第一个获取读锁的线程
  21. else if (firstReader == current) {
  22. firstReaderHoldCount++;
  23. } else {
  24. // (8) 记录最后一个获取读锁的线程或者记录其他线程读锁的可重入个数
  25. HoldCounter rh = cachedHoldCounter;
  26. if (rh == null || rh.tid != getThreadId(current))
  27. cachedHoldCounter = rh = readHolds.get();
  28. else if (rh.count == 0)
  29. readHolds.set(rh);
  30. rh.count++;
  31. }
  32. return 1;
  33. }
  34. // (9) 自旋获取
  35. return fullTryAcquireShared(current);
  36. }

(4)readerShouldBlock方法:

  • 公平锁NonfairStnc实现:return apparentlyFirstQueuedIsExclusive(),实现如下:
    上述代码的作用是,如果队列里面存在一个元素,咋判断第一个元素是不是正在尝试获取写锁,如果不是,则当前线程判断当前获取读锁的线程是否到达了最大值,最后执行CAS操作将AQS状态值的高16位值加1。

    1. final boolean apparentlyFirstQueuedIsExclusive() {
    2. Node h, s;
    3. return (h = head) != null &&
    4. (s = h.next) != null &&
    5. !s.isShared() &&
    6. s.thread != null;
    7. }
  • 非公平锁FairSync的实现:return hashQueuedPredecessors(),判断当前线程节点是否有前驱结点,有则放弃获取读锁的权限。

(5)SyncfullTryAcquireShared方法:

  1. final int fullTryAcquireShared(Thread current) {
  2. HoldCounter rh = null;
  3. for (;;) {
  4. int c = getState();
  5. if (exclusiveCount(c) != 0) {
  6. if (getExclusiveOwnerThread() != current)
  7. return -1;
  8. // else we hold the exclusive lock; blocking here
  9. // would cause deadlock.
  10. } else if (readerShouldBlock()) {
  11. // Make sure we're not acquiring read lock reentrantly
  12. if (firstReader == current) {
  13. // assert firstReaderHoldCount > 0;
  14. } else {
  15. if (rh == null) {
  16. rh = cachedHoldCounter;
  17. if (rh == null || rh.tid != getThreadId(current)) {
  18. rh = readHolds.get();
  19. if (rh.count == 0)
  20. readHolds.remove();
  21. }
  22. }
  23. if (rh.count == 0)
  24. return -1;
  25. }
  26. }
  27. if (sharedCount(c) == MAX_COUNT)
  28. throw new Error("Maximum lock count exceeded");
  29. if (compareAndSetState(c, c + SHARED_UNIT)) {
  30. if (sharedCount(c) == 0) {
  31. firstReader = current;
  32. firstReaderHoldCount = 1;
  33. } else if (firstReader == current) {
  34. firstReaderHoldCount++;
  35. } else {
  36. if (rh == null)
  37. rh = cachedHoldCounter;
  38. if (rh == null || rh.tid != getThreadId(current))
  39. rh = readHolds.get();
  40. else if (rh.count == 0)
  41. readHolds.set(rh);
  42. rh.count++;
  43. cachedHoldCounter = rh; // cache for release
  44. }
  45. return 1;
  46. }
  47. }
  48. }

释放

(1)ReadLockunlock方法:

  1. public void unlock() {
  2. sync.releaseShared(1); // 继承自AQS
  3. }

(2)AbstractQueuedSynchronizerreleaseShared方法:

  1. public final boolean releaseShared(int arg) {
  2. if (tryReleaseShared(arg)) {
  3. doReleaseShared();
  4. return true;
  5. }
  6. return false;
  7. }

(3)SynctryReleaseShared方法:

  1. protected final boolean tryReleaseShared(int unused) {
  2. Thread current = Thread.currentThread();
  3. // (1) 判断当前线程是否为第一个获取读锁的线程
  4. if (firstReader == current) {
  5. // 可保证firstReaderHoldCount > 0
  6. // 可重入次数为1,清空读锁第一线程
  7. if (firstReaderHoldCount == 1)
  8. firstReader = null;
  9. // 否则,可重入次数减一
  10. else
  11. firstReaderHoldCount--;
  12. }
  13. // (2) 那么维护最后一个获取读锁的线程的可重入次数
  14. else {
  15. // 最后一个获取读锁的线程的可重入次数
  16. HoldCounter rh = cachedHoldCounter;
  17. // 初始化ch
  18. if (rh == null || rh.tid != getThreadId(current))
  19. rh = readHolds.get();
  20. int count = rh.count;
  21. // 如果可重入次数小于等于1,说明当前线程没有持有读锁,ch是get()成的
  22. if (count <= 1) {
  23. readHolds.remove();
  24. if (count <= 0)
  25. throw unmatchedUnlockException();
  26. }
  27. // 执行到这里,说明当前线程持有读锁中,可重入次数减一
  28. --rh.count;
  29. }
  30. // (3) 修改同步状态,循环直到自己的读计数-1,CAS更新成功
  31. for (;;) {
  32. int c = getState();
  33. int nextc = c - SHARED_UNIT;
  34. if (compareAndSetState(c, nextc))
  35. // 状态值=0释放成功
  36. return nextc == 0;
  37. }
  38. }

7.4 StampedLock

StampedLock是JDK1.8新增的一个锁,该锁提供了三种模式的读写控制,但是不支持条件变量和锁重入。

当调用获取锁的系列函数时,会返回一个long型的变量,称为戳记(stamp),代表了锁的状态。其中try系列获取锁的函数,当获取失败后会返回为0的stamp值。当调用释放锁和转换锁的方法时需要传入获取锁时返回的stamp值。

StampedLock提供了三种读写模式的锁:

  1. 写锁writeLock:是一个独占锁,任意时刻只有一个线程可以获取到该锁,当一个线程获取该锁后,其他请求读锁和写锁的线程必须等待,这类似于ReentrantReadWriteLock的写锁(不同的是这里的写锁是不可重入锁);当目前没有线程持有读锁或者写锁时才可以获取到该锁。请求该锁成功后会返回一个stamp变量用来表示该锁的版本,当解释该锁时需要调用unlockWrite方法并传递获取锁时的stamp参数。并且它提供了非阻塞的tryWriteLock方法。

    1. long stamp = lock.writeLock();
    2. lock.unlockWrite(stamp);
  2. 悲观读锁readLock:是一个共享锁,在没有线程获取独占写锁的情况下,多个线程可以同时获取该锁。如果已经有线程持有写锁,则其他线程请求获取该锁会被阻塞,这类似于ReentrantReadWriteLock的读锁(不同的是这里的读锁是不可重入锁)。这里说的悲观是指具体操作数据前会悲观地认为其他线程可以要对自己操作的数据进行修改,所以需要先对数据加锁,这是在读少写多的情况下的一种考虑。请求该所成功后会返回一个stamp变量用来表示该锁的版本,当释放该锁时需要调用unlockRead方法并传递stamp参数。并且它提供了非阻塞的tryReadLock方法。

    1. long stamp = lock.readLock();
    2. lock.unlockRead(stamp);
  3. 乐观读锁tryOptimisticRead:它是相对于悲观锁来说的,在操作数据前并没有通过CAS设置锁的状态,仅仅通过位运算测试。如果当前没有线程持有写锁,则简单地返回一个非0的stamp值。获取该stamp后在具体操作数据前还需要调用validate方法验证该stamp是否已经不可用,也就是看当调用tryOptimisticRead返回stamp后到当前时间期间是否有其他线程持有了写锁,如果是则validate会返回0,否则就可以使用该stamp版本的锁对数据进行操作。由于tryOptimisticRead并没有使用CAS设置锁状态,所以不需要显式地释放该锁。该锁的一个特点是适用于读多写少的场景,因为获取读锁只是使用位操作进行检验,不涉及CAS操作,所以效率上会高很多,但是同时由于没有使用真正的锁,在保证数据一致性上需要复制一份要操作的变量到方法栈,并且在操作数据时可能其他写线程已经修改了数据,而我们操作的是方法栈里面的数据,也就是一个快照,所以最多返回的不是最新的数据,但是一致性还是得到保障的。

    1. long stamp = lock.tryOptimisticRead();
    2. if (!lock.validate(stamp)) {
    3. // 锁升级
    4. }

示例:笛卡尔二维点

  1. import java.util.concurrent.locks.StampedLock;
  2. /**
  3. * @author KHighness
  4. * @since 2021-05-05
  5. */
  6. public class Point {
  7. /**
  8. * 横坐标
  9. */
  10. private double x;
  11. /**
  12. * 纵坐标
  13. */
  14. private double y;
  15. /**
  16. * 锁实例
  17. */
  18. private final StampedLock lock = new StampedLock();
  19. /**
  20. * 移动点到新位置
  21. * <p>使用排它锁-写锁</p>
  22. * @param deltaX 横坐标变化值
  23. * @param deltaY 纵坐标变化值
  24. */
  25. void move(double deltaX, int deltaY) {
  26. long stamp = lock.writeLock();
  27. try {
  28. x += deltaX;
  29. y += deltaY;
  30. } finally {
  31. lock.unlockWrite(stamp);
  32. }
  33. }
  34. /**
  35. * 计算当前位置到原点的距离
  36. * <p>使用乐观锁读</p>
  37. * @return 距离原点的距离
  38. */
  39. double distanceFromOrigin() {
  40. long stamp = lock.tryOptimisticRead();
  41. double currentX = x, currentY = y;
  42. // 检查获取stamp后,锁是否被其他写线程排他性抢占
  43. if (!lock.validate(stamp)) {
  44. // 如果被抢占则进行锁升级,获取悲观读锁
  45. stamp = lock.readLock();
  46. try {
  47. currentX = x;
  48. currentY = y;
  49. } finally {
  50. lock.unlockRead(stamp);
  51. }
  52. }
  53. return Math.sqrt(currentX * currentX + currentY * currentY);
  54. }
  55. /**
  56. * 如果当前位置在原点则移动至新位置
  57. * <p>使用悲观锁获取读锁,并尝试转换为写锁</p>
  58. * @param newX 新位置的横坐标
  59. * @param newY 新位置的纵坐标
  60. */
  61. void moveIfAtOrigin(double newX, double newY) {
  62. // 这里可以用乐观锁读替换
  63. long stamp = lock.readLock();
  64. try {
  65. while (x == 0.0 && y == 0.0) {
  66. // 尝试将读锁升级为写锁
  67. long ws = lock.tryConvertToWriteLock(stamp);
  68. // 升级成功,则更换戳记,更新坐标,退出循环
  69. if (ws != 0L) {
  70. stamp = ws;
  71. x = newX;
  72. y = newY;
  73. break;
  74. }
  75. // 升级失败,则释放读锁,显式获取写锁,循环重试
  76. else {
  77. lock.unlockRead(stamp);
  78. stamp = lock.writeLock();
  79. }
  80. }
  81. } finally {
  82. // 释放锁
  83. lock.unlockWrite(stamp);
  84. }
  85. }
  86. }