2. AQS

2.1 AQS原理

2.1.1 概述

全称是 AbstractQueuedSynchronizer,是阻塞式锁和相关的同步器工具的框架

2.2.2 特点

  • 用 state 属性来表示资源的状态(分独占模式和共享模式),子类需要定义如何维护这个状态,控制如何获取锁和释放锁
    • getState - 获取 state 状态
    • setState - 设置 state 状态
    • compareAndSetState - cas 机制设置 state 状态
    • 独占模式是只有一个线程能够访问资源,而共享模式可以允许多个线程访问资源
  • 提供了基于 FIFO 的等待队列,类似于 Monitor 的 EntryList
  • 条件变量来实现等待、唤醒机制,支持多个条件变量,类似于 Monitor 的 WaitSet

子类主要实现这样一些方法(默认抛出 UnsupportedOperationException)

  • tryAcquire
  • tryRelease
  • tryAcquireShared
  • tryReleaseShared
  • isHeldExclusively ```java // 获取锁的姿势 // 如果获取锁失败 if (!tryAcquire(arg)) { // 入队, 可以选择阻塞当前线程 park unpark }

// 释放锁的姿势 // 如果释放锁成功 if (tryRelease(arg)) { // 让阻塞线程恢复运行 }

  1. <a name="nycyv"></a>
  2. ### 2.2.3 自定义锁
  3. 下面实现一个不可重入的阻塞式锁:使用 AbstractQueuedSynchronizer 自定义一个同步器来实现自定义锁,代码如下:
  4. ```java
  5. @Slf4j(topic = "c.Test2")
  6. public class Test2 {
  7. public static void main(String[] args) {
  8. MyLock lock = new MyLock();
  9. new Thread(()->{
  10. lock.lock();
  11. try {
  12. Thread.sleep(1000);
  13. log.debug("locking...");
  14. } catch (InterruptedException e) {
  15. e.printStackTrace();
  16. } finally {
  17. lock.unlock();
  18. log.debug("unlocking....");
  19. }
  20. }, "t1").start();
  21. new Thread(()->{
  22. lock.lock();
  23. try {
  24. log.debug("locking...");
  25. } finally {
  26. lock.unlock();
  27. log.debug("unlocking....");
  28. }
  29. }, "t2").start();
  30. }
  31. }
  32. // 自定义锁,不可重复锁
  33. class MyLock implements Lock {
  34. // 独占锁 ,同步器类
  35. class MySync extends AbstractQueuedSynchronizer{
  36. @Override // 尝试获得锁 0无锁 1加上锁
  37. protected boolean tryAcquire(int arg) {
  38. if (compareAndSetState(0 ,1)){
  39. // 加上了锁,并设置owner为当前线程
  40. setExclusiveOwnerThread(Thread.currentThread());
  41. }
  42. return true;
  43. }
  44. @Override //尝试释放锁
  45. protected boolean tryRelease(int arg) {
  46. setExclusiveOwnerThread(null);
  47. setState(0); // 这一步要在后面,state是volatile的保证前面属性的可见性
  48. return true;
  49. }
  50. @Override //是否持有独占锁
  51. protected boolean isHeldExclusively() {
  52. return getState() == 1;
  53. }
  54. public Condition newCondition(){
  55. return new ConditionObject();
  56. }
  57. }
  58. private MySync mySync = new MySync();
  59. @Override //加锁(不成功进入等待队列)
  60. public void lock() {
  61. mySync.acquire(1);
  62. }
  63. @Override // 加锁, 可打断
  64. public void lockInterruptibly() throws InterruptedException {
  65. mySync.acquireInterruptibly(1);
  66. }
  67. @Override // 尝试加锁,一次
  68. public boolean tryLock() {
  69. return mySync.tryAcquire(1);
  70. }
  71. @Override // 尝试加锁,带超时
  72. public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
  73. return mySync.tryAcquireNanos(1, unit.toNanos(time));
  74. }
  75. @Override // 解锁
  76. public void unlock() {
  77. mySync.release(1);
  78. }
  79. @Override // 创建条件变量
  80. public Condition newCondition() {
  81. return mySync.newCondition();
  82. }
  1. 12:34:21.191 [t2] DEBUG c.Test2 - locking...
  2. 12:34:21.193 [t2] DEBUG c.Test2 - unlocking....
  3. 12:34:22.189 [t1] DEBUG c.Test2 - locking...
  4. 12:34:22.189 [t1] DEBUG c.Test2 - unlocking....

2.2 ReentrantLock原理

ReentrantLock 提供了抽象的Sync(继承了AQS),并实现了两个同步器,实现公平锁(FairSync)和非公平锁(NonfairSync),默认是非公平锁,实现了Lock接口
image.png

2.2.1 构造器

带参数和不带参数的构造器,不带参默认为非公平的

  1. public ReentrantLock() {
  2. sync = new NonfairSync();
  3. }
  1. public ReentrantLock(boolean fair) {
  2. sync = fair ? new FairSync() : new NonfairSync();
  3. }

2.2.2 加锁过程

NonfairSync 继承自 Sync 继承自AQS

  1. static final class NonfairSync extends Sync {
  2. private static final long serialVersionUID = 7316153563782823691L;
  3. /**
  4. * Performs lock. Try immediate barge, backing up to normal
  5. * acquire on failure.
  6. */
  7. final void lock() {
  8. if (compareAndSetState(0, 1))
  9. setExclusiveOwnerThread(Thread.currentThread());
  10. else
  11. acquire(1);
  12. }
  13. protected final boolean tryAcquire(int acquires) {
  14. return nonfairTryAcquire(acquires);
  15. }
  16. }

没有竞争时:
image.png
有竞争时:调用acquire方法

  1. public final void acquire(int arg) {
  2. // 没有重新加锁成功 且 将其加入到等待队列
  3. if (!tryAcquire(arg) &&
  4. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  5. selfInterrupt();
  6. }

image.png

  • lock方法中CAS 尝试将 state 由 0 改为 1,结果失败lock方法中进一步调用acquire方法,进入 tryAcquire 逻辑,这里我们认为这时 state 已经是1,结果仍然失败
  • 接下来进入 acquire方法的addWaiter 逻辑,构造 Node 队列
    • 图中黄色三角表示该 Node 的 waitStatus 状态,其中 0 为默认正常状态
    • Node 的创建是懒惰的
    • 其中第一个 Node 称为 Dummy(哑元)或哨兵,用来占位,并不关联线程

image.png

acquireQueued方法:
当前线程进入 acquire方法的 acquireQueued 逻辑

  • acquireQueued 会在一个死循环中不断尝试获得锁,失败后进入 park 阻塞
  • 如果自己是紧邻着 head(排第二位),那么再次 tryAcquire 尝试获取锁,我们这里设置这时 state 仍为 1,失败
  • 进入 shouldParkAfterFailedAcquire 逻辑,将前驱 node,即 head 的 waitStatus 改为 -1,这次返回 false(意思就是你尝试了好几次都没获得锁,就进入阻塞,让前面那个节点唤醒你)

image.png

  • shouldParkAfterFailedAcquire 执行完毕回到 acquireQueued ,再次 tryAcquire 尝试获取锁,当然这时 state 仍为 1,失败
  • 当再次进入 shouldParkAfterFailedAcquire 时,这时因为其前驱 node 的 waitStatus 已经是 -1,这次返回 true
  • 进入 parkAndCheckInterrupt, Thread-1 park(灰色表示已经阻塞)

image.png
再从次有多个线程经历上述过程竞争失败,变成这个样子7 共享模型之AQS - 图7
源码:

  1. // Sync 继承自 AQS
  2. static final class NonfairSync extends Sync {
  3. private static final long serialVersionUID = 7316153563782823691L;
  4. // 加锁实现
  5. final void lock() {
  6. // 首先用 cas 尝试(仅尝试一次)将 state 从 0 改为 1, 如果成功表示获得了独占锁
  7. if (compareAndSetState(0, 1))
  8. setExclusiveOwnerThread(Thread.currentThread());
  9. else
  10. // 如果尝试失败,进入 ㈠
  11. acquire(1);
  12. }
  13. // ㈠ AQS 继承过来的方法, 方便阅读, 放在此处 尝试加锁失败的过程
  14. public final void acquire(int arg) {
  15. // ㈡ tryAcquire
  16. if (
  17. // 再次尝试获取锁
  18. !tryAcquire(arg) &&
  19. // 当 tryAcquire 返回为 false 时, 先调用 addWaiter ㈣, 接着 acquireQueued ㈤
  20. acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
  21. ) {
  22. selfInterrupt();
  23. }
  24. }
  25. // ㈡ 进入 ㈢
  26. protected final boolean tryAcquire(int acquires) {
  27. return nonfairTryAcquire(acquires);
  28. }
  29. // ㈢ Sync 继承过来的方法, 方便阅读, 放在此处
  30. final boolean nonfairTryAcquire(int acquires) {
  31. final Thread current = Thread.currentThread();
  32. int c = getState();
  33. // 如果还没有获得锁
  34. if (c == 0) {
  35. // 尝试用 cas 获得, 这里体现了非公平性: 不去检查 AQS 队列
  36. if (compareAndSetState(0, acquires)) {
  37. setExclusiveOwnerThread(current);
  38. return true;
  39. }
  40. }
  41. // 如果已经获得了锁, 线程还是当前线程, 表示发生了锁重入
  42. else if (current == getExclusiveOwnerThread()) {
  43. // state++
  44. int nextc = c + acquires;
  45. if (nextc < 0) // overflow
  46. throw new Error("Maximum lock count exceeded");
  47. setState(nextc);
  48. return true;
  49. }
  50. // 获取失败, 回到调用处
  51. return false;
  52. }
  53. // ㈣ AQS 继承过来的方法, 方便阅读, 放在此处 将该节点放到等待队列中
  54. private Node addWaiter(Node mode) {
  55. // 将当前线程关联到一个 Node 对象上, 模式为独占模式,新建的Node的waitstatus默认为0,因为waitstatus是成员变量,默认被初始化为0
  56. Node node = new Node(Thread.currentThread(), mode);
  57. // 如果 tail 不为 null, cas 尝试将 Node 对象加入 AQS 队列尾部
  58. Node pred = tail;
  59. if (pred != null) {
  60. node.prev = pred;
  61. if (compareAndSetTail(pred, node)) {
  62. // 双向链表
  63. pred.next = node;
  64. return node;
  65. }
  66. }
  67. //如果tail为null,尝试将 Node 加入 AQS, 进入 ㈥
  68. enq(node);
  69. return node;
  70. }
  71. // ㈥ AQS 继承过来的方法, 方便阅读, 放在此处
  72. private Node enq(final Node node) {
  73. for (;;) {
  74. Node t = tail;
  75. if (t == null) {
  76. // 还没有, 设置 head 为哨兵节点(不对应线程,状态为 0)
  77. if (compareAndSetHead(new Node())) {
  78. tail = head;
  79. }
  80. } else {
  81. // cas 尝试将 Node 对象加入 AQS 队列尾部
  82. node.prev = t;
  83. if (compareAndSetTail(t, node)) {
  84. t.next = node;
  85. return t;
  86. }
  87. }
  88. }
  89. }
  90. // ㈤ AQS 继承过来的方法, 方便阅读, 放在此处 对等待队列进行处理
  91. final boolean acquireQueued(final Node node, int arg) {
  92. boolean failed = true;
  93. try {
  94. boolean interrupted = false;
  95. for (;;) {
  96. final Node p = node.predecessor();
  97. // 上一个节点是 head, 表示轮到自己(当前线程对应的 node)了, 尝试获取
  98. if (p == head && tryAcquire(arg)) {
  99. // 获取成功, 设置自己(当前线程对应的 node)为 head
  100. setHead(node);
  101. // 上一个节点 help GC
  102. p.next = null;
  103. failed = false;
  104. // 返回中断标记 false
  105. return interrupted;
  106. }
  107. if (
  108. // 判断是否应当 park, 进入 ㈦
  109. shouldParkAfterFailedAcquire(p, node) &&
  110. // park 等待, 此时 Node 的状态被置为 Node.SIGNAL ㈧
  111. parkAndCheckInterrupt()
  112. ) {
  113. interrupted = true;
  114. }
  115. }
  116. } finally {
  117. if (failed)
  118. cancelAcquire(node);
  119. }
  120. }
  121. // ㈦ AQS 继承过来的方法, 方便阅读, 放在此处 是否需要在获取失败时被阻塞
  122. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  123. // 获取上一个节点的状态
  124. int ws = pred.waitStatus;
  125. if (ws == Node.SIGNAL) {
  126. // 上一个节点都在阻塞, 那么自己也阻塞好了
  127. return true;
  128. }
  129. // > 0 表示取消状态
  130. if (ws > 0) {
  131. // 上一个节点取消, 那么重构删除前面所有取消的节点, 返回到外层循环重试
  132. do {
  133. node.prev = pred = pred.prev;
  134. } while (pred.waitStatus > 0);
  135. pred.next = node;
  136. } else {
  137. // 这次还没有阻塞
  138. // 但下次如果重试不成功, 则需要阻塞,这时需要设置上一个节点状态为 Node.SIGNAL
  139. compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
  140. }
  141. return false;
  142. }
  143. // ㈧ 阻塞当前线程
  144. private final boolean parkAndCheckInterrupt() {
  145. LockSupport.park(this);
  146. return Thread.interrupted();
  147. }
  148. }

2.2.3 释放锁流程

Thread-0 释放锁,进入 tryRelease 流程,如果成功

  • 设置 exclusiveOwnerThread 为 null
  • state = 0

image.png
如果当前队列不为 null,并且 head 的 waitStatus = -1,进入 unparkSuccessor 流程:

  • unparkSuccessor 中会找到队列中离 head 最近的一个 Node(没取消的),unpark 恢复其运行,本例中即为 Thread-1
  • 回到 Thread-1 的 acquireQueued 流程

image.png
如果加锁成功(没有竞争),会设置 (acquireQueued 方法中)

  • exclusiveOwnerThread 为 Thread-1,state = 1
  • head 指向刚刚 Thread-1 所在的 Node,该 Node 清空 Thread
  • 原本的 head 因为从链表断开,而可被垃圾回收

如果这时候有其它线程来竞争(非公平的体现),例如这时有 Thread-4 来了
image.png

  1. Thread-4 被设置为 exclusiveOwnerThread,state = 1
  2. Thread-1 再次进入 acquireQueued 流程,获取锁失败,重新进入 park 阻塞 ```java // Sync 继承自 AQS static final class NonfairSync extends Sync { // 解锁实现 public void unlock() {

    1. sync.release(1);

    }

    // AQS 继承过来的方法, 方便阅读, 放在此处 public final boolean release(int arg) {

    1. // 尝试释放锁, 进入 ㈠
    2. if (tryRelease(arg)) {
    3. // 队列头节点 unpark
    4. Node h = head;
    5. if (
    6. // 队列不为 null
    7. h != null &&
    8. // waitStatus == Node.SIGNAL 才需要 unpark
    9. h.waitStatus != 0
    10. ) {
    11. // unpark AQS 中等待的线程, 进入 ㈡
    12. unparkSuccessor(h);
    13. }
    14. return true;
    15. }
    16. return false;

    }

    // ㈠ Sync 继承过来的方法, 方便阅读, 放在此处 protected final boolean tryRelease(int releases) {

    1. // state--
    2. int c = getState() - releases;
    3. if (Thread.currentThread() != getExclusiveOwnerThread())
    4. throw new IllegalMonitorStateException();
    5. boolean free = false;
    6. // 支持锁重入, 只有 state 减为 0, 才释放成功
    7. if (c == 0) {
    8. free = true;
    9. setExclusiveOwnerThread(null);
    10. }
    11. setState(c);
    12. return free;

    }

    // ㈡ AQS 继承过来的方法, 方便阅读, 放在此处 private void unparkSuccessor(Node node) {

    1. // 如果状态为 Node.SIGNAL 尝试重置状态为 0, 如果线程获取到了锁那么后来头结点会被抛弃掉
    2. // 不成功也可以
    3. int ws = node.waitStatus;
    4. if (ws < 0) {
    5. compareAndSetWaitStatus(node, ws, 0);
    6. }
    7. // 找到需要 unpark 的节点, 但本节点从 AQS 队列中脱离, 是由唤醒节点完成的
    8. Node s = node.next;
    9. // 不考虑已取消的节点, 从 AQS 队列从后至前找到队列最前面需要 unpark 的节点
    10. if (s == null || s.waitStatus > 0) {
    11. s = null;
    12. for (Node t = tail; t != null && t != node; t = t.prev)
    13. if (t.waitStatus <= 0)
    14. s = t;
    15. }
    16. if (s != null)
    17. LockSupport.unpark(s.thread);

    } }

  1. <a name="hkJVf"></a>
  2. ### 2.2.4 锁重入原理
  3. 如果是同一个线程,则将state++;释放后,会将state--。如果state为0,才会释放锁。
  4. ```java
  5. static final class NonfairSync extends Sync {
  6. // ...
  7. // Sync 继承过来的方法, 方便阅读, 放在此处 不公平的获取锁的方式
  8. final boolean nonfairTryAcquire(int acquires) {
  9. final Thread current = Thread.currentThread();
  10. int c = getState();
  11. if (c == 0) {
  12. // 如果c为0,表示没有线程加锁,那么该线程进行加锁
  13. if (compareAndSetState(0, acquires)) {
  14. // 设置锁头信息 owner
  15. setExclusiveOwnerThread(current);
  16. return true;
  17. }
  18. }
  19. // 如果已经获得了锁, 线程还是当前线程, 表示发生了锁重入,更新state
  20. else if (current == getExclusiveOwnerThread()) {
  21. // state++
  22. int nextc = c + acquires;
  23. if (nextc < 0) // overflow
  24. throw new Error("Maximum lock count exceeded");
  25. setState(nextc);
  26. return true;
  27. }
  28. return false;
  29. }
  30. // Sync 继承过来的方法, 方便阅读, 放在此处 释放锁
  31. protected final boolean tryRelease(int releases) {
  32. // state--
  33. int c = getState() - releases;
  34. if (Thread.currentThread() != getExclusiveOwnerThread())
  35. throw new IllegalMonitorStateException();
  36. // c不为0,还是当前线程的锁,返回释放失败。
  37. boolean free = false;
  38. // 支持锁重入, 只有 state 减为 0, 才释放成功
  39. if (c == 0) {
  40. // 返回释放成功,并将owner置为null
  41. free = true;
  42. setExclusiveOwnerThread(null);
  43. }
  44. setState(c);
  45. return free;
  46. }
  47. }

2.2.5 可打断原理

  • 不可打断模式:

在此模式下,即使它被打断,仍会驻留在 AQS 队列中,一直要等到获得锁后方能得知自己被打断了。

  1. // Sync 继承自 AQS
  2. static final class NonfairSync extends Sync {
  3. // ...
  4. private final boolean parkAndCheckInterrupt() {
  5. // 如果打断标记已经是 true, 则 park 会失效
  6. LockSupport.park(this);
  7. // interrupted 会清除打断标记
  8. return Thread.interrupted();
  9. }
  10. final boolean acquireQueued(final Node node, int arg) {
  11. boolean failed = true;
  12. try {
  13. boolean interrupted = false;
  14. for (;;) {
  15. final Node p = node.predecessor();
  16. if (p == head && tryAcquire(arg)) {
  17. setHead(node);
  18. p.next = null;
  19. failed = false;
  20. // 还是需要获得锁后, 才能返回打断状态
  21. return interrupted;
  22. }
  23. if (
  24. shouldParkAfterFailedAcquire(p, node) &&
  25. parkAndCheckInterrupt()
  26. ) {
  27. // 如果是因为 interrupt 被唤醒, 返回打断状态为 true
  28. interrupted = true;
  29. }
  30. }
  31. } finally {
  32. if (failed)
  33. cancelAcquire(node);
  34. }
  35. }
  36. public final void acquire(int arg) {
  37. if (
  38. !tryAcquire(arg) &&
  39. acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
  40. ) {
  41. // 如果打断状态为 true
  42. selfInterrupt();
  43. }
  44. }
  45. static void selfInterrupt() {
  46. // 重新产生一次中断,这时候线程是如果正常运行的状态,那么不是出于sleep等状态,interrupt方法就不会报错
  47. Thread.currentThread().interrupt();
  48. }
  49. }
  50. }
  • 可打断模式: ```java static final class NonfairSync extends Sync { public final void acquireInterruptibly(int arg) throws InterruptedException {

    1. if (Thread.interrupted())
    2. throw new InterruptedException();
    3. // 如果没有获得到锁, 进入 ㈠
    4. if (!tryAcquire(arg))
    5. doAcquireInterruptibly(arg);

    }

    // ㈠ 可打断的获取锁流程 private void doAcquireInterruptibly(int arg) throws InterruptedException {

    1. final Node node = addWaiter(Node.EXCLUSIVE);
    2. boolean failed = true;
    3. try {
    4. for (;;) {
    5. final Node p = node.predecessor();
    6. if (p == head && tryAcquire(arg)) {
    7. setHead(node);
    8. p.next = null; // help GC
    9. failed = false;
    10. return;
    11. }
    12. if (shouldParkAfterFailedAcquire(p, node) &&
    13. parkAndCheckInterrupt()) {
    14. // 在 park 过程中如果被 interrupt 会进入此
    15. // 这时候抛出异常, 而不会再次进入 for (;;)
    16. throw new InterruptedException();
    17. }
    18. }
    19. } finally {
    20. if (failed)
    21. cancelAcquire(node);
    22. }

    } }

  1. 区别在于不可打断模式是将打断标记置为true,再次进入for循环(在AQS队列中);可打断模式直接抛出异常,不会进入for循环。
  2. <a name="qzarC"></a>
  3. ### 2.2.6 公平锁原理
  4. 非公平锁主要区别在于 tryAcquire 方法的实现:在加锁之前会判断AQS队列有没有前驱结点
  5. ```java
  6. static final class FairSync extends Sync {
  7. private static final long serialVersionUID = -3000897897090466540L;
  8. final void lock() {
  9. acquire(1);
  10. }
  11. // AQS 继承过来的方法, 方便阅读, 放在此处
  12. public final void acquire(int arg) {
  13. if (
  14. !tryAcquire(arg) &&
  15. acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
  16. ) {
  17. selfInterrupt();
  18. }
  19. }
  20. // 与非公平锁主要区别在于 tryAcquire 方法的实现
  21. protected final boolean tryAcquire(int acquires) {
  22. final Thread current = Thread.currentThread();
  23. int c = getState();
  24. if (c == 0) {
  25. // 先检查 AQS 队列中是否有前驱节点, 没有才去竞争
  26. if (!hasQueuedPredecessors() &&
  27. compareAndSetState(0, acquires)) {
  28. setExclusiveOwnerThread(current);
  29. return true;
  30. }
  31. }
  32. else if (current == getExclusiveOwnerThread()) {
  33. int nextc = c + acquires;
  34. if (nextc < 0)
  35. throw new Error("Maximum lock count exceeded");
  36. setState(nextc);
  37. return true;
  38. }
  39. return false;
  40. }
  41. // ㈠ AQS 继承过来的方法, 方便阅读, 放在此处
  42. public final boolean hasQueuedPredecessors() {
  43. Node t = tail;
  44. Node h = head;
  45. Node s;
  46. // h != t 时表示队列中有 Node
  47. return h != t &&
  48. (
  49. // (s = h.next) == null 表示队列中还有没有老二
  50. (s = h.next) == null || // 或者队列中老二线程不是此线程
  51. s.thread != Thread.currentThread()
  52. );
  53. }
  54. }

2.2.7 条件变量实现原理

每个条件变量其实就对应着一个等待队列,其实现类是 ConditionObject

  • await流程

开始 Thread-0 持有锁,调用 await,进入 ConditionObject 的 addConditionWaiter 流程 创建新的Node 状态为 -2(Node.CONDITION),关联 Thread-0,加入等待队列尾部7 共享模型之AQS - 图11
接下来进入 AQS 的 fullyRelease 流程,释放同步器上的锁
image.png
unpark AQS 队列中的下一个节点,竞争锁,假设没有其他竞争线程,那么 Thread-1 竞争成功
image.png
park 阻塞 Thread-0
image.png

  • signal流程

假设 Thread-1 要来唤醒 Thread-0
image.png
进入 ConditionObject 的 doSignal 流程,取得等待队列中第一个 Node,即 Thread-0 所在 Node
image.png
执行 transferForSignal 流程,将该 Node 加入 AQS 队列尾部,将 Thread-0 的 waitStatus 改为 0,Thread-3 的waitStatus 改为 -1
image.png
Thread-1 释放锁,进入 unlock 流程。

  1. public class ConditionObject implements Condition, java.io.Serializable {
  2. private static final long serialVersionUID = 1173984872572414699L;
  3. // 第一个等待节点
  4. private transient Node firstWaiter;
  5. // 最后一个等待节点
  6. private transient Node lastWaiter;
  7. public ConditionObject() { }
  8. // ㈠ 添加一个 Node 至等待队列
  9. private Node addConditionWaiter() {
  10. Node t = lastWaiter;
  11. // 所有已取消的 Node 从队列链表删除, 见 ㈡
  12. if (t != null && t.waitStatus != Node.CONDITION) {
  13. unlinkCancelledWaiters();
  14. t = lastWaiter;
  15. }
  16. // 创建一个关联当前线程的新 Node, 添加至队列尾部
  17. Node node = new Node(Thread.currentThread(), Node.CONDITION);
  18. if (t == null)
  19. firstWaiter = node;
  20. else
  21. t.nextWaiter = node;
  22. lastWaiter = node;
  23. return node;
  24. }
  25. // 唤醒 - 将没取消的第一个节点转移至 AQS 队列
  26. private void doSignal(Node first) {
  27. do {
  28. // 已经是尾节点了
  29. if ( (firstWaiter = first.nextWaiter) == null) {
  30. lastWaiter = null;
  31. }
  32. first.nextWaiter = null;
  33. } while (
  34. // 将等待队列中的 Node 转移至 AQS 队列, 不成功且还有节点则继续循环 ㈢
  35. !transferForSignal(first) &&
  36. // 队列还有节点
  37. (first = firstWaiter) != null
  38. );
  39. }
  40. // 外部类方法, 方便阅读, 放在此处
  41. // ㈢ 如果节点状态是取消, 返回 false 表示转移失败, 否则转移成功
  42. final boolean transferForSignal(Node node) {
  43. // 设置当前node状态为0(因为处在队列末尾),如果状态已经不是 Node.CONDITION, 说明被取消了
  44. if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
  45. return false;
  46. // 加入 AQS 队列尾部
  47. Node p = enq(node);
  48. int ws = p.waitStatus;
  49. if (
  50. // 插入节点的上一个节点被取消
  51. ws > 0 ||
  52. // 插入节点的上一个节点不能设置状态为 Node.SIGNAL
  53. !compareAndSetWaitStatus(p, ws, Node.SIGNAL)
  54. ) {
  55. // unpark 取消阻塞, 让线程重新同步状态
  56. LockSupport.unpark(node.thread);
  57. }
  58. return true;
  59. }
  60. // 全部唤醒 - 等待队列的所有节点转移至 AQS 队列
  61. private void doSignalAll(Node first) {
  62. lastWaiter = firstWaiter = null;
  63. do {
  64. Node next = first.nextWaiter;
  65. first.nextWaiter = null;
  66. transferForSignal(first);
  67. first = next;
  68. } while (first != null);
  69. }
  70. // ㈡
  71. private void unlinkCancelledWaiters() {
  72. // ...
  73. }
  74. // 唤醒 - 必须持有锁才能唤醒, 因此 doSignal 内无需考虑加锁
  75. public final void signal() {
  76. // 如果没有持有锁,会抛出异常
  77. if (!isHeldExclusively())
  78. throw new IllegalMonitorStateException();
  79. Node first = firstWaiter;
  80. if (first != null)
  81. doSignal(first);
  82. }
  83. // 全部唤醒 - 必须持有锁才能唤醒, 因此 doSignalAll 内无需考虑加锁
  84. public final void signalAll() {
  85. if (!isHeldExclusively())
  86. throw new IllegalMonitorStateException();
  87. Node first = firstWaiter;
  88. if (first != null)
  89. doSignalAll(first);
  90. }
  91. // 不可打断等待 - 直到被唤醒
  92. public final void awaitUninterruptibly() {
  93. // 添加一个 Node 至等待队列, 见 ㈠
  94. Node node = addConditionWaiter();
  95. // 释放节点持有的锁, 见 ㈣
  96. int savedState = fullyRelease(node);
  97. boolean interrupted = false;
  98. // 如果该节点还没有转移至 AQS 队列, 阻塞
  99. while (!isOnSyncQueue(node)) {
  100. // park 阻塞
  101. LockSupport.park(this);
  102. // 如果被打断, 仅设置打断状态
  103. if (Thread.interrupted())
  104. interrupted = true;
  105. }
  106. // 唤醒后, 尝试竞争锁, 如果失败进入 AQS 队列
  107. if (acquireQueued(node, savedState) || interrupted)
  108. selfInterrupt();
  109. }
  110. // 外部类方法, 方便阅读, 放在此处
  111. // ㈣ 因为某线程可能重入,需要将 state 全部释放,获取state,然后把它全部减掉,以全部释放
  112. final int fullyRelease(Node node) {
  113. boolean failed = true;
  114. try {
  115. int savedState = getState();
  116. // 唤醒等待队列队列中的下一个节点
  117. if (release(savedState)) {
  118. failed = false;
  119. return savedState;
  120. } else {
  121. throw new IllegalMonitorStateException();
  122. }
  123. } finally {
  124. if (failed)
  125. node.waitStatus = Node.CANCELLED;
  126. }
  127. }
  128. // 打断模式 - 在退出等待时重新设置打断状态
  129. private static final int REINTERRUPT = 1;
  130. // 打断模式 - 在退出等待时抛出异常
  131. private static final int THROW_IE = -1;
  132. // 判断打断模式
  133. private int checkInterruptWhileWaiting(Node node) {
  134. return Thread.interrupted() ?
  135. (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
  136. 0;
  137. }
  138. // ㈤ 应用打断模式
  139. private void reportInterruptAfterWait(int interruptMode)
  140. throws InterruptedException {
  141. if (interruptMode == THROW_IE)
  142. throw new InterruptedException();
  143. else if (interruptMode == REINTERRUPT)
  144. selfInterrupt();
  145. }
  146. // 等待 - 直到被唤醒或打断
  147. public final void await() throws InterruptedException {
  148. if (Thread.interrupted()) {
  149. throw new InterruptedException();
  150. }
  151. // 添加一个 Node 至等待队列, 见 ㈠
  152. Node node = addConditionWaiter();
  153. // 释放节点持有的锁
  154. int savedState = fullyRelease(node);
  155. int interruptMode = 0;
  156. // 如果该节点还没有转移至 AQS 队列, 阻塞
  157. while (!isOnSyncQueue(node)) {
  158. // park 阻塞
  159. LockSupport.park(this);
  160. // 如果被打断, 退出等待队列
  161. if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  162. break;
  163. }
  164. // 退出等待队列后, 还需要获得 AQS 队列的锁
  165. if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
  166. interruptMode = REINTERRUPT;
  167. // 所有已取消的 Node 从队列链表删除, 见 ㈡
  168. if (node.nextWaiter != null)
  169. unlinkCancelledWaiters();
  170. // 应用打断模式, 见 ㈤
  171. if (interruptMode != 0)
  172. reportInterruptAfterWait(interruptMode);
  173. }
  174. // 等待 - 直到被唤醒或打断或超时
  175. public final long awaitNanos(long nanosTimeout) throws InterruptedException {
  176. if (Thread.interrupted()) {
  177. throw new InterruptedException();
  178. }
  179. // 添加一个 Node 至等待队列, 见 ㈠
  180. Node node = addConditionWaiter();
  181. // 释放节点持有的锁
  182. int savedState = fullyRelease(node);
  183. // 获得最后期限
  184. final long deadline = System.nanoTime() + nanosTimeout;
  185. int interruptMode = 0;
  186. // 如果该节点还没有转移至 AQS 队列, 阻塞
  187. while (!isOnSyncQueue(node)) {
  188. // 已超时, 退出等待队列
  189. if (nanosTimeout <= 0L) {
  190. transferAfterCancelledWait(node);
  191. break;
  192. }
  193. // park 阻塞一定时间, spinForTimeoutThreshold 为 1000 ns
  194. if (nanosTimeout >= spinForTimeoutThreshold)
  195. LockSupport.parkNanos(this, nanosTimeout);
  196. // 如果被打断, 退出等待队列
  197. if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  198. break;
  199. nanosTimeout = deadline - System.nanoTime();
  200. }
  201. // 退出等待队列后, 还需要获得 AQS 队列的锁
  202. if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
  203. interruptMode = REINTERRUPT;
  204. // 所有已取消的 Node 从队列链表删除, 见 ㈡
  205. if (node.nextWaiter != null)
  206. unlinkCancelledWaiters();
  207. // 应用打断模式, 见 ㈤
  208. if (interruptMode != 0)
  209. reportInterruptAfterWait(interruptMode);
  210. return deadline - System.nanoTime();
  211. }
  212. // 等待 - 直到被唤醒或打断或超时, 逻辑类似于 awaitNanos
  213. public final boolean awaitUntil(Date deadline) throws InterruptedException {
  214. // ...
  215. }
  216. // 等待 - 直到被唤醒或打断或超时, 逻辑类似于 awaitNanos
  217. public final boolean await(long time, TimeUnit unit) throws InterruptedException {
  218. // ...
  219. }
  220. // 工具方法 省略 ...
  221. }

2.3 读写锁

2.3.1 ReentrantReadWriteLock

当读操作远远高于写操作时,这时候使用读写锁让读-读可以并发,提高性能。读-写,写-写都是相互互斥的!
提供一个数据容器类内部分别使用读锁保护数据的read()方法,写锁保护数据的write()方法 。

读-读测试:

  1. @Slf4j(topic = "c.Test3")
  2. public class Test3 {
  3. public static void main(String[] args) {
  4. DataContainer d = new DataContainer();
  5. new Thread(()->{
  6. d.read();
  7. }, "t1").start();
  8. new Thread(()->{
  9. d.read();
  10. }, "t2").start();
  11. }
  12. }
  13. // 创建读写锁
  14. @Slf4j(topic = "c.DataContainer")
  15. class DataContainer {
  16. private Object data;
  17. private ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
  18. private ReentrantReadWriteLock.ReadLock r = rw.readLock();
  19. private ReentrantReadWriteLock.WriteLock w = rw.writeLock();
  20. public Object read(){
  21. log.debug("获取读锁");
  22. r.lock();
  23. try {
  24. log.debug("读取");
  25. Thread.sleep(1000);
  26. } catch (InterruptedException e) {
  27. e.printStackTrace();
  28. } finally {
  29. log.debug("释放读锁");
  30. r.unlock();
  31. }
  32. return data;
  33. }
  34. public void write(){
  35. log.debug("获取写锁");
  36. w.lock();
  37. try {
  38. log.debug("写入");
  39. }finally {
  40. log.debug("释放写锁");
  41. w.unlock();
  42. }
  43. }
  1. 17:48:25.705 [t2] DEBUG c.DataContainer - 获取读锁
  2. 17:48:25.705 [t1] DEBUG c.DataContainer - 获取读锁
  3. 17:48:25.708 [t2] DEBUG c.DataContainer - 读取
  4. 17:48:25.708 [t1] DEBUG c.DataContainer - 读取
  5. 17:48:26.712 [t1] DEBUG c.DataContainer - 释放读锁
  6. 17:48:26.712 [t2] DEBUG c.DataContainer - 释放读锁

读是并发的

读写实例

  1. public static void main(String[] args) throws InterruptedException {
  2. DataContainer d = new DataContainer();
  3. new Thread(() -> {
  4. d.read();
  5. }, "t1").start();
  6. Thread.sleep(100);
  7. new Thread(() -> {
  8. d.write();
  9. }, "t2").start();
  10. }
  1. 17:50:56.087 [t1] DEBUG c.DataContainer - 获取读锁
  2. 17:50:56.090 [t1] DEBUG c.DataContainer - 读取
  3. 17:50:56.191 [t2] DEBUG c.DataContainer - 获取写锁
  4. 17:50:57.095 [t1] DEBUG c.DataContainer - 释放读锁
  5. 17:50:57.095 [t2] DEBUG c.DataContainer - 写入
  6. 17:50:57.095 [t2] DEBUG c.DataContainer - 释放写锁

可以看到读与写是互斥的,只有读锁先释放,才能拿到写锁

写-写

  1. @Slf4j(topic = "c.Test3")
  2. public class Test3 {
  3. public static void main(String[] args) throws InterruptedException {
  4. DataContainer d = new DataContainer();
  5. new Thread(() -> {
  6. d.write();
  7. }, "t1").start();
  8. Thread.sleep(100);
  9. new Thread(() -> {
  10. d.write();
  11. }, "t2").start();
  12. }
  13. }
  14. // 创建读写锁
  15. @Slf4j(topic = "c.DataContainer")
  16. class DataContainer {
  17. private Object data;
  18. private ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
  19. private ReentrantReadWriteLock.ReadLock r = rw.readLock();
  20. private ReentrantReadWriteLock.WriteLock w = rw.writeLock();
  21. public Object read() {
  22. log.debug("获取读锁");
  23. r.lock();
  24. try {
  25. log.debug("读取");
  26. Thread.sleep(1000);
  27. } catch (InterruptedException e) {
  28. e.printStackTrace();
  29. } finally {
  30. log.debug("释放读锁");
  31. r.unlock();
  32. }
  33. return data;
  34. }
  35. public void write() {
  36. log.debug("获取写锁");
  37. w.lock();
  38. try {
  39. log.debug("写入");
  40. Thread.sleep(1000);
  41. } catch (InterruptedException e) {
  42. e.printStackTrace();
  43. } finally {
  44. log.debug("释放写锁");
  45. w.unlock();
  46. }
  47. }
  48. }
  1. 18:06:20.475 [t1] DEBUG c.DataContainer - 获取写锁
  2. 18:06:20.480 [t1] DEBUG c.DataContainer - 写入
  3. 18:06:20.575 [t2] DEBUG c.DataContainer - 获取写锁
  4. 18:06:21.485 [t1] DEBUG c.DataContainer - 释放写锁
  5. 18:06:21.486 [t2] DEBUG c.DataContainer - 写入
  6. 18:06:22.490 [t2] DEBUG c.DataContainer - 释放写锁

写与写之间也是互斥的。

2.3.2 读写锁注意事项

  1. 读锁不支持条件变量
  2. 重入时升级不支持:即持有读锁的情况下去获取写锁,会导致获取写锁永久等待 ```java r.lock(); try {
    1. // ...
    2. w.lock();
    3. try {
    4. // ...
    5. } finally{
    6. w.unlock();
    7. }
    } finally{
    1. r.unlock();
    }
  1. 3. 重入时降级支持:即持有写锁的情况下去获取读锁
  2. ```java
  3. class CachedData {
  4. Object data;
  5. // 是否有效,如果失效,需要重新计算 data
  6. volatile boolean cacheValid;
  7. final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
  8. void processCachedData() {
  9. rwl.readLock().lock();
  10. if (!cacheValid) {
  11. // 获取写锁前必须释放读锁
  12. rwl.readLock().unlock();
  13. rwl.writeLock().lock();
  14. try {
  15. // 判断是否有其它线程已经获取了写锁、更新了缓存, 避免重复更新
  16. if (!cacheValid) {
  17. data = ...
  18. cacheValid = true;
  19. }
  20. // 降级为读锁, 释放写锁, 这样能够让其它线程读取缓存
  21. rwl.readLock().lock();
  22. } finally {
  23. rwl.writeLock().unlock();
  24. }
  25. }
  26. // 自己用完数据, 释放读锁
  27. try {
  28. use(data);
  29. } finally {
  30. rwl.readLock().unlock();
  31. }
  32. }
  33. }

2.3.3 应用之缓存

缓存更新策略:更新时,是先清缓存还是先更新数据库?
先清除缓存操作如下:
image.png
先更新数据库操作如下:
image.png
补充一种情况,假设查询线程 A 查询数据时恰好缓存数据由于时间到期失效,或是第一次查询:这种情况的出现几率非常小:
image.png

  1. /**
  2. * ReentrantReadWriteLock 读写锁解决 缓存与数据库一致性问题
  3. */
  4. public class Code_13_ReadWriteCacheTest {
  5. public static void main(String[] args) {
  6. GeneriCacheDao<Object> generiCacheDao = new GeneriCacheDao<>();
  7. Object[] objects = new Object[2];
  8. generiCacheDao.queryOne(Object.class,"Test",objects);
  9. generiCacheDao.queryOne(Object.class,"Test",objects);
  10. generiCacheDao.queryOne(Object.class,"Test",objects);
  11. generiCacheDao.queryOne(Object.class,"Test",objects);
  12. System.out.println(generiCacheDao.map);
  13. generiCacheDao.update("Test",objects);
  14. System.out.println(generiCacheDao.map);
  15. }
  16. }
  17. class GeneriCacheDao<T> extends GenericDao {
  18. HashMap<SqlPair, T> map = new HashMap<>();
  19. ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
  20. GenericDao genericDao = new GenericDao();
  21. @Override
  22. public int update(String sql, Object... params){
  23. lock.writeLock().lock();
  24. SqlPair sqlPair = new SqlPair(sql, params);
  25. try {
  26. // 先查询数据库再更新缓存,但是这里加了锁,谁先谁后都没关系
  27. int update = genericDao.update(sql, params);
  28. map.clear();
  29. return update;
  30. } finally {
  31. lock.writeLock().unlock();
  32. }
  33. }
  34. @Override
  35. public T queryOne(Class beanClass, String sql, Object... params){
  36. SqlPair key = new SqlPair(sql, params);
  37. // 加读锁, 防止其它线程对缓存更改
  38. lock.readLock().lock();
  39. try {
  40. T t = map.get(key);
  41. if (t != null){
  42. return t;
  43. }
  44. } finally {
  45. lock.readLock().unlock();
  46. }
  47. // 加写锁, 防止其它线程对缓存读取和更改
  48. lock.writeLock().lock();
  49. // get 方法上面部分是可能多个线程进来的, 可能已经向缓存填充了数据
  50. // 为防止重复查询数据库, 再次验证
  51. try {
  52. T value = map.get(key);
  53. if (value == null){
  54. value = (T) genericDao.queryOne(beanClass, sql, params);
  55. map.put(key, value);
  56. }
  57. return value;
  58. } finally {
  59. lock.writeLock().unlock();
  60. }
  61. }
  62. class SqlPair{
  63. private String sql;
  64. private Object[] params;
  65. public SqlPair(String sql, Object[] params) {
  66. this.sql = sql;
  67. this.params = params;
  68. }
  69. @Override
  70. public boolean equals(Object o) {
  71. if (this == o) return true;
  72. if (o == null || getClass() != o.getClass()) return false;
  73. SqlPair sqlMap = (SqlPair) o;
  74. return Objects.equals(sql, sqlMap.sql) &&
  75. Arrays.equals(params, sqlMap.params);
  76. }
  77. @Override
  78. public int hashCode() {
  79. int result = Objects.hash(sql);
  80. result = 31 * result + Arrays.hashCode(params);
  81. return result;
  82. }
  83. }
  84. }
  85. class GenericDao<T>{
  86. public int update(String sql, Object... params){
  87. return 1;
  88. }
  89. public T queryOne(Class<T> beanClass, String sql, Object... params){
  90. System.out.println("查询数据库中");
  91. return (T) new Object();
  92. }
  93. }

2.3.4 读写锁原理

图解流程

读写锁用的是同一个 Sync 同步器,因此等待队列、state 等也是同一个
下面执行:t1 w.lock,t2 r.lock 情况

  1. t1 成功上锁,流程与 ReentrantLock 加锁相比没有特殊之处,不同是写锁状态占了 state 的低 16 位,而读锁使用的是 state 的高 16 位

image.png

  1. t2 执行 r.lock,这时进入读锁的 sync.acquireShared(1) 流程,首先会进入 tryAcquireShared 流程。如果有写锁占据,那么 tryAcquireShared 返回 -1 表示失败。tryAcquireShared 返回值表示
    • -1 表示失败
    • 0 表示成功,但后继节点不会继续唤醒
    • 正数表示成功,而且数值是还有几个后继节点需要唤醒,我们这里的读写锁返回 1

image.png

  1. 这时会进入 sync.doAcquireShared(1) 流程,首先也是调用 addWaiter 添加节点,不同之处在于节点被设置为 Node.SHARED 模式而非 Node.EXCLUSIVE 模式,注意此时 t2 仍处于活跃状态

image.png

  1. t2 会看看自己的节点是不是老二,如果是,还会再次调用 tryAcquireShared(1) 来尝试获取锁
  2. 如果没有成功,在 doAcquireShared 内 for (;;) 循环一次,把前驱节点的 waitStatus 改为 -1,再 for (;;) 循环一次尝试 tryAcquireShared(1) 如果还不成功,那么在 parkAndCheckInterrupt() 处 park。

image.png
又继续执行:t3 r.lock,t4 w.lock。这种状态下,假设又有 t3 加读锁和 t4 加写锁,这期间 t1 仍然持有锁,就变成了下面的样子
image.png
继续执行 t1 w.unlock
这时会走到写锁的 sync.release(1) 流程,调用 sync.tryRelease(1) 成功,变成下面的样子
image.png
接下来执行唤醒流程 sync.unparkSuccessor,即让老二恢复运行,这时 t2 在 doAcquireShared 内 parkAndCheckInterrupt() 处恢复运行,图中的t2从黑色变成了蓝色(注意这里只是恢复运行而已,并没有获取到锁!) 这回再来一次 for (;; ) 执行 tryAcquireShared 成功则让读锁计数加
image.png
这时 t2 已经恢复运行,接下来 t2 调用 setHeadAndPropagate(node, 1),它原本所在节点被置为头节点
image.png
事情还没完,在 setHeadAndPropagate 方法内还会检查下一个节点是否是 shared,如果是则调用 doReleaseShared() 将 head 的状态从 -1 改为 0 并唤醒老二,这时 t3 在 doAcquireShared 内 parkAndCheckInterrupt() 处恢复运行.(读锁是共享的,不是独占的,多个线程都能读)
image.png
这回再来一次 for (;; ) 执行 tryAcquireShared 成功则让读锁计数加一
image.png
这时 t3 已经恢复运行,接下来 t3 调用 setHeadAndPropagate(node, 1),它原本所在节点被置为头节点
image.png
再继续执行t2 r.unlock,t3 r.unlock 。t2进入 sync.releaseShared(1) 中,调用 tryReleaseShared(1) 让计数减一,但由于计数还不为零
image.png
t3 进入 sync.releaseShared(1) 中,调用 tryReleaseShared(1) 让计数减一,这回计数为零了,进入 doReleaseShared() 将头节点从 -1 改为 0 并唤醒老二,即
image.png
之后 t4 在 acquireQueued 中 parkAndCheckInterrupt 处恢复运行,再次 for (;; ) 这次自己是老二,并且没有其他 竞争,tryAcquire(1) 成功,修改头结点,流程结束
image.png

源码分析

  • 写锁上锁流程 ```java static final class NonfairSync extends Sync { // … 省略无关代码

    // 外部类 WriteLock 方法, 方便阅读, 放在此处 public void lock() {

    1. sync.acquire(1);

    }

    // AQS 继承过来的方法, 方便阅读, 放在此处 public final void acquire(int arg) {

    1. if (
    2. // 尝试获得写锁失败
    3. !tryAcquire(arg) &&
    4. // 将当前线程关联到一个 Node 对象上, 模式为独占模式
    5. // 进入 AQS 队列阻塞
    6. acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
    7. ) {
    8. selfInterrupt();
    9. }

    }

    // Sync 继承过来的方法, 方便阅读, 放在此处 protected final boolean tryAcquire(int acquires) {

    1. // 获得低 16 位, 代表写锁的 state 计数
    2. Thread current = Thread.currentThread();
    3. int c = getState();
    4. int w = exclusiveCount(c);
    5. if (c != 0) {
    6. if (
    7. // c != 0 and w == 0 表示有读锁返回错误,读锁不支持锁升级, 或者
    8. w == 0 ||
    9. // c != 0 and w != 0 表示有写锁,如果 exclusiveOwnerThread 不是自己
    10. current != getExclusiveOwnerThread()
    11. ) {
    12. // 获得锁失败
    13. return false;
    14. }
    15. // 写锁计数超过低 16 位, 报异常
    16. if (w + exclusiveCount(acquires) > MAX_COUNT)
    17. throw new Error("Maximum lock count exceeded");
    18. // 写锁重入, 获得锁成功
    19. setState(c + acquires);
    20. return true;
    21. }
    22. if (
    23. // 判断写锁是否该阻塞这里返回false, 或者
    24. writerShouldBlock() ||
    25. // 尝试更改计数失败
    26. !compareAndSetState(c, c + acquires)
    27. ) {
    28. // 获得锁失败
    29. return false;
    30. }
    31. // 获得锁成功
    32. setExclusiveOwnerThread(current);
    33. return true;

    }

    // 非公平锁 writerShouldBlock 总是返回 false, 无需阻塞 final boolean writerShouldBlock() {

    1. return false;

    } }

  1. - 写锁释放过程
  2. ```java
  3. static final class NonfairSync extends Sync {
  4. // ... 省略无关代码
  5. // WriteLock 方法, 方便阅读, 放在此处
  6. public void unlock() {
  7. sync.release(1);
  8. }
  9. // AQS 继承过来的方法, 方便阅读, 放在此处
  10. public final boolean release(int arg) {
  11. // 尝试释放写锁成功
  12. if (tryRelease(arg)) {
  13. // unpark AQS 中等待的线程
  14. Node h = head;
  15. if (h != null && h.waitStatus != 0)
  16. unparkSuccessor(h);
  17. return true;
  18. }
  19. return false;
  20. }
  21. // Sync 继承过来的方法, 方便阅读, 放在此处
  22. protected final boolean tryRelease(int releases) {
  23. if (!isHeldExclusively())
  24. throw new IllegalMonitorStateException();
  25. int nextc = getState() - releases;
  26. // 因为可重入的原因, 写锁计数为 0, 才算释放成功
  27. boolean free = exclusiveCount(nextc) == 0;
  28. if (free) {
  29. setExclusiveOwnerThread(null);
  30. }
  31. setState(nextc);
  32. return free;
  33. }
  34. }
  • 读锁上锁过程 ```java static final class NonfairSync extends Sync {

    // ReadLock 方法, 方便阅读, 放在此处 public void lock() {

    1. sync.acquireShared(1);

    }

    // AQS 继承过来的方法, 方便阅读, 放在此处 public final void acquireShared(int arg) {

    1. // tryAcquireShared 返回负数, 表示获取读锁失败
    2. if (tryAcquireShared(arg) < 0) {
    3. doAcquireShared(arg);
    4. }

    }

    // Sync 继承过来的方法, 方便阅读, 放在此处 protected final int tryAcquireShared(int unused) {

    1. Thread current = Thread.currentThread();
    2. int c = getState();
    3. // 如果是其它线程持有写锁, 获取读锁失败
    4. if (
    5. exclusiveCount(c) != 0 &&
    6. getExclusiveOwnerThread() != current
    7. ) {
    8. return -1;
    9. }
    10. int r = sharedCount(c);
    11. if (
    12. // 读锁不该阻塞(如果老二是写锁,读锁该阻塞), 并且
    13. !readerShouldBlock() &&
    14. // 小于读锁计数, 并且
    15. r < MAX_COUNT &&
    16. // 尝试增加计数成功
    17. compareAndSetState(c, c + SHARED_UNIT)
    18. ) {
    19. // ... 省略不重要的代码
    20. return 1;
    21. }
    22. return fullTryAcquireShared(current);

    }

    // 非公平锁 readerShouldBlock 看 AQS 队列中第一个节点是否是写锁 // true 则该阻塞, false 则不阻塞 final boolean readerShouldBlock() {

    1. return apparentlyFirstQueuedIsExclusive();

    }

    // AQS 继承过来的方法, 方便阅读, 放在此处 // 与 tryAcquireShared 功能类似, 但会不断尝试 for (;;) 获取读锁, 执行过程中无阻塞 final int fullTryAcquireShared(Thread current) {

    1. HoldCounter rh = null;
    2. for (;;) {
    3. int c = getState();
    4. if (exclusiveCount(c) != 0) {
    5. if (getExclusiveOwnerThread() != current)
    6. return -1;
    7. } else if (readerShouldBlock()) {
    8. // ... 省略不重要的代码
    9. }
    10. if (sharedCount(c) == MAX_COUNT)
    11. throw new Error("Maximum lock count exceeded");
    12. if (compareAndSetState(c, c + SHARED_UNIT)) {
    13. // ... 省略不重要的代码
    14. return 1;
    15. }
    16. }

    }

    // AQS 继承过来的方法, 方便阅读, 放在此处 private void doAcquireShared(int arg) {

    1. // 将当前线程关联到一个 Node 对象上, 模式为共享模式
    2. final Node node = addWaiter(Node.SHARED);
    3. boolean failed = true;
    4. try {
    5. boolean interrupted = false;
    6. for (;;) {
    7. final Node p = node.predecessor();
    8. if (p == head) {
    9. // 再一次尝试获取读锁
    10. int r = tryAcquireShared(arg);
    11. // 成功
    12. if (r >= 0) {
    13. // ㈠
    14. // r 表示可用资源数, 在这里总是 1 允许传播
    15. //(唤醒 AQS 中下一个 Share 节点)
    16. setHeadAndPropagate(node, r);
    17. p.next = null; // help GC
    18. if (interrupted)
    19. selfInterrupt();
    20. failed = false;
    21. return;
    22. }
    23. }
    24. if (
    25. // 是否在获取读锁失败时阻塞(前一个阶段 waitStatus == Node.SIGNAL)
    26. shouldParkAfterFailedAcquire(p, node) &&
    27. // park 当前线程
    28. parkAndCheckInterrupt()
    29. ) {
    30. interrupted = true;
    31. }
    32. }
    33. } finally {
    34. if (failed)
    35. cancelAcquire(node);
    36. }

    }

    // ㈠ AQS 继承过来的方法, 方便阅读, 放在此处 private void setHeadAndPropagate(Node node, int propagate) {

    1. Node h = head; // Record old head for check below
    2. // 设置自己为 head
    3. setHead(node);
    4. // propagate 表示有共享资源(例如共享读锁或信号量)
    5. // 原 head waitStatus == Node.SIGNAL 或 Node.PROPAGATE
    6. // 现在 head waitStatus == Node.SIGNAL 或 Node.PROPAGATE
    7. if (propagate > 0 || h == null || h.waitStatus < 0 ||
    8. (h = head) == null || h.waitStatus < 0) {
    9. Node s = node.next;
    10. // 如果是最后一个节点或者是等待共享读锁的节点
    11. if (s == null || s.isShared()) {
    12. // 进入 ㈡
    13. doReleaseShared();
    14. }
    15. }

    }

    // ㈡ AQS 继承过来的方法, 方便阅读, 放在此处 private void doReleaseShared() {

    1. // 如果 head.waitStatus == Node.SIGNAL ==> 0 成功, 下一个节点 unpark
    2. // 如果 head.waitStatus == 0 ==> Node.PROPAGATE, 为了解决 bug, 见后面分析,参考这里:http://www.tianxiaobo.com/2018/05/01/AbstractQueuedSynchronizer-%E5%8E%9F%E7%90%86%E5%88%86%E6%9E%90-%E7%8B%AC%E5%8D%A0-%E5%85%B1%E4%BA%AB%E6%A8%A1%E5%BC%8F/#5propagate-%E7%8A%B6%E6%80%81%E5%AD%98%E5%9C%A8%E7%9A%84%E6%84%8F%E4%B9%89
    3. for (;;) {
    4. Node h = head;
    5. // 队列还有节点
    6. if (h != null && h != tail) {
    7. int ws = h.waitStatus;
    8. if (ws == Node.SIGNAL) {
    9. if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
    10. continue; // loop to recheck cases
    11. // 下一个节点 unpark 如果成功获取读锁
    12. // 并且下下个节点还是 shared, 继续 doReleaseShared
    13. unparkSuccessor(h);
    14. }
    15. else if (ws == 0 &&
    16. !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
    17. continue; // loop on failed CAS
    18. }
    19. if (h == head) // loop if head changed
    20. break;
    21. }

    } }

  1. - 读锁释放过程:
  2. ```java
  3. static final class NonfairSync extends Sync {
  4. // ReadLock 方法, 方便阅读, 放在此处
  5. public void unlock() {
  6. sync.releaseShared(1);
  7. }
  8. // AQS 继承过来的方法, 方便阅读, 放在此处
  9. public final boolean releaseShared(int arg) {
  10. if (tryReleaseShared(arg)) {
  11. doReleaseShared();
  12. return true;
  13. }
  14. return false;
  15. }
  16. // Sync 继承过来的方法, 方便阅读, 放在此处
  17. protected final boolean tryReleaseShared(int unused) {
  18. // ... 省略不重要的代码
  19. for (;;) {
  20. int c = getState();
  21. int nextc = c - SHARED_UNIT;
  22. if (compareAndSetState(c, nextc)) {
  23. // 读锁的计数不会影响其它获取读锁线程, 但会影响其它获取写锁线程
  24. // 计数为 0 才是真正释放
  25. return nextc == 0;
  26. }
  27. }
  28. }
  29. // AQS 继承过来的方法, 方便阅读, 放在此处
  30. private void doReleaseShared() {
  31. // 如果 head.waitStatus == Node.SIGNAL ==> 0 成功, 下一个节点 unpark
  32. // 如果 head.waitStatus == 0 ==> Node.PROPAGATE
  33. for (;;) {
  34. Node h = head;
  35. if (h != null && h != tail) {
  36. int ws = h.waitStatus;
  37. // 如果有其它线程也在释放读锁,那么需要将 waitStatus 先改为 0
  38. // 防止 unparkSuccessor 被多次执行
  39. if (ws == Node.SIGNAL) {
  40. if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
  41. continue; // loop to recheck cases
  42. unparkSuccessor(h);
  43. }
  44. // 如果已经是 0 了,改为 -3,用来解决传播性,见后文信号量 bug 分析
  45. else if (ws == 0 &&
  46. !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
  47. continue; // loop on failed CAS
  48. }
  49. if (h == head) // loop if head changed
  50. break;
  51. }
  52. }
  53. }

2.3.5 StampedLock

该类自 JDK 8 加入,是为了进一步优化读性能,它的特点是在使用读锁、写锁时都必须配合【戳】使用。StampedLock 不支持条件变量;StampedLock 不支持可重入

  • 加解读锁 ```java long stamp = lock.readLock(); lock.unlockRead(stamp);
  1. - 加解写锁
  2. ```java
  3. long stamp = lock.writeLock();
  4. lock.unlockWrite(stamp);
  • 乐观读,StampedLock 支持 tryOptimisticRead() 方法(乐观读),读取完毕后需要做一次 戳校验 如果校验通 过,表示这期间确实没有写操作,数据可以安全使用,如果校验没通过,需要重新获取读锁,保证数据安全。 ```java long stamp = lock.tryOptimisticRead(); // 验戳 if(!lock.validate(stamp)){ // 锁升级 }
  1. - 代码示例:
  2. 提供一个 数据容器类 内部分别使用读锁保护数据的 read() 方法,写锁保护数据的 write() 方法。
  3. ```java
  4. public class Code_14_StampedLockTest {
  5. public static void main(String[] args) throws InterruptedException {
  6. StampedLockDataContainer dataContainer = new StampedLockDataContainer(1);
  7. Thread t1 = new Thread(() -> {
  8. try {
  9. System.out.println(dataContainer.read(1));
  10. } catch (InterruptedException e) {
  11. e.printStackTrace();
  12. }
  13. }, "t1");
  14. t1.start();
  15. TimeUnit.MILLISECONDS.sleep(500);
  16. Thread t2 = new Thread(() -> {
  17. // try {
  18. // dataContainer.read(0);
  19. // } catch (InterruptedException e) {
  20. // e.printStackTrace();
  21. // }
  22. dataContainer.write(10);
  23. }, "t2");
  24. t2.start();
  25. }
  26. }
  27. @Slf4j(topic = "c.StampedLockDataContainer")
  28. class StampedLockDataContainer {
  29. private int data;
  30. private StampedLock stampedLock = new StampedLock();
  31. public StampedLockDataContainer(int data) {
  32. this.data = data;
  33. }
  34. public int read(int readTime) throws InterruptedException {
  35. long stamp = stampedLock.tryOptimisticRead();
  36. log.info("optimistic read locking ...{}", stamp);
  37. Thread.sleep(readTime * 1000);
  38. if(stampedLock.validate(stamp)) {
  39. log.info("read finish... {}", stamp);
  40. return data;
  41. }
  42. // 锁升级 - 读锁
  43. log.info("update to read lock ...");
  44. try {
  45. stamp = stampedLock.readLock();
  46. log.info("read lock {}", stamp);
  47. Thread.sleep(readTime * 1000);
  48. log.info("read finish ... {}", stamp);
  49. return data;
  50. } finally {
  51. stampedLock.unlockRead(stamp);
  52. }
  53. }
  54. public void write(int newData) {
  55. long stamp = stampedLock.writeLock();
  56. try {
  57. log.info("write lock {}", stamp);
  58. this.data = newData;
  59. try {
  60. TimeUnit.SECONDS.sleep(1);
  61. } catch (InterruptedException e) {
  62. e.printStackTrace();
  63. }
  64. log.info("write finish ... {}", stamp);
  65. log.info("write newData ... {}", this.data);
  66. } finally {
  67. stampedLock.unlockWrite(stamp);
  68. }
  69. }
  70. }

2.4 Semaphore信号量

2.4.1 基本使用

信号量,用来限制能同时访问共享资源的线程上限。

  1. @Slf4j(topic = "c.Test4")
  2. public class Test4 {
  3. public static void main(String[] args) {
  4. // 创建 semaphore对象
  5. // 同时运行的上限是3个
  6. Semaphore semaphore = new Semaphore(3);
  7. // 创建10个线程同时运行
  8. for (int i = 0; i < 10; i++) {
  9. new Thread(()->{
  10. // 获取许可
  11. try {
  12. semaphore.acquire();
  13. } catch (InterruptedException e) {
  14. e.printStackTrace();
  15. }
  16. // 运行
  17. log.debug("running");
  18. try {
  19. Thread.sleep(1000);
  20. log.debug("end...");
  21. } catch (InterruptedException e) {
  22. e.printStackTrace();
  23. }finally {
  24. // 释放许可
  25. semaphore.release();
  26. }
  27. }).start();
  28. }
  29. }
  30. }
  1. 16:56:59.721 [Thread-0] DEBUG c.Test4 - running
  2. 16:56:59.721 [Thread-2] DEBUG c.Test4 - running
  3. 16:56:59.721 [Thread-1] DEBUG c.Test4 - running
  4. 16:57:00.730 [Thread-1] DEBUG c.Test4 - end...
  5. 16:57:00.731 [Thread-2] DEBUG c.Test4 - end...
  6. 16:57:00.731 [Thread-0] DEBUG c.Test4 - end...
  7. 16:57:00.731 [Thread-3] DEBUG c.Test4 - running
  8. 16:57:00.731 [Thread-5] DEBUG c.Test4 - running
  9. 16:57:00.732 [Thread-4] DEBUG c.Test4 - running
  10. 16:57:01.736 [Thread-5] DEBUG c.Test4 - end...
  11. 16:57:01.736 [Thread-4] DEBUG c.Test4 - end...
  12. 16:57:01.736 [Thread-3] DEBUG c.Test4 - end...
  13. 16:57:01.736 [Thread-6] DEBUG c.Test4 - running
  14. 16:57:01.736 [Thread-7] DEBUG c.Test4 - running
  15. 16:57:01.736 [Thread-8] DEBUG c.Test4 - running
  16. 16:57:02.740 [Thread-8] DEBUG c.Test4 - end...
  17. 16:57:02.740 [Thread-7] DEBUG c.Test4 - end...
  18. 16:57:02.740 [Thread-6] DEBUG c.Test4 - end...
  19. 16:57:02.741 [Thread-9] DEBUG c.Test4 - running
  20. 16:57:03.744 [Thread-9] DEBUG c.Test4 - end...

2.4.2 图解流程

Semaphore 有点像一个停车场,permits 就好像停车位数量,当线程获得了 permits 就像是获得了停车位,然后停车场显示空余车位减一刚开始,permits(state)为 3,这时 5 个线程来获取资源。
image.png
假设其中 Thread-1,Thread-2,Thread-4 cas 竞争成功,而 Thread-0 和 Thread-3 竞争失败,进入 AQS 队列park 阻塞
image.png
这时 Thread-4 释放了 permits,状态如下
image.png
接下来 Thread-0 竞争成功,permits 再次设置为 0,设置自己为 head 节点,断开原来的 head 节点,unpark 接下来的 Thread-3 节点,但由于 permits 是 0,因此 Thread-3 在尝试不成功后再次进入 park 状态
image.png

2.4.3 源码

  1. static final class NonfairSync extends Sync {
  2. private static final long serialVersionUID = -2694183684443567898L;
  3. NonfairSync(int permits) {
  4. // permits 即 state
  5. super(permits);
  6. }
  7. // Semaphore 方法, 方便阅读, 放在此处
  8. public void acquire() throws InterruptedException {
  9. sync.acquireSharedInterruptibly(1);
  10. }
  11. // AQS 继承过来的方法, 方便阅读, 放在此处
  12. public final void acquireSharedInterruptibly(int arg)
  13. throws InterruptedException {
  14. if (Thread.interrupted())
  15. throw new InterruptedException();
  16. if (tryAcquireShared(arg) < 0)
  17. doAcquireSharedInterruptibly(arg);
  18. }
  19. // 尝试获得共享锁
  20. protected int tryAcquireShared(int acquires) {
  21. return nonfairTryAcquireShared(acquires);
  22. }
  23. // Sync 继承过来的方法, 方便阅读, 放在此处
  24. final int nonfairTryAcquireShared(int acquires) {
  25. for (;;) {
  26. int available = getState();
  27. int remaining = available - acquires;
  28. if (
  29. // 如果许可已经用完, 返回负数, 表示获取失败, 进入 doAcquireSharedInterruptibly
  30. remaining < 0 ||
  31. // 如果 cas 重试成功, 返回正数, 表示获取成功
  32. compareAndSetState(available, remaining)
  33. ) {
  34. return remaining;
  35. }
  36. }
  37. }
  38. // AQS 继承过来的方法, 方便阅读, 放在此处
  39. private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
  40. final Node node = addWaiter(Node.SHARED);
  41. boolean failed = true;
  42. try {
  43. for (;;) {
  44. final Node p = node.predecessor();
  45. if (p == head) {
  46. // 再次尝试获取许可
  47. int r = tryAcquireShared(arg);
  48. if (r >= 0) {
  49. // 成功后本线程出队(AQS), 所在 Node设置为 head
  50. // 如果 head.waitStatus == Node.SIGNAL ==> 0 成功, 下一个节点 unpark
  51. // 如果 head.waitStatus == 0 ==> Node.PROPAGATE
  52. // r 表示可用资源数, 为 0 则不会继续传播
  53. setHeadAndPropagate(node, r);
  54. p.next = null; // help GC
  55. failed = false;
  56. return;
  57. }
  58. }
  59. // 不成功, 设置上一个节点 waitStatus = Node.SIGNAL, 下轮进入 park 阻塞
  60. if (shouldParkAfterFailedAcquire(p, node) &&
  61. parkAndCheckInterrupt())
  62. throw new InterruptedException();
  63. }
  64. } finally {
  65. if (failed)
  66. cancelAcquire(node);
  67. }
  68. }
  69. // Semaphore 方法, 方便阅读, 放在此处
  70. public void release() {
  71. sync.releaseShared(1);
  72. }
  73. // AQS 继承过来的方法, 方便阅读, 放在此处
  74. public final boolean releaseShared(int arg) {
  75. if (tryReleaseShared(arg)) {
  76. doReleaseShared();
  77. return true;
  78. }
  79. return false;
  80. }
  81. // Sync 继承过来的方法, 方便阅读, 放在此处
  82. protected final boolean tryReleaseShared(int releases) {
  83. for (;;) {
  84. int current = getState();
  85. int next = current + releases;
  86. if (next < current) // overflow
  87. throw new Error("Maximum permit count exceeded");
  88. if (compareAndSetState(current, next))
  89. return true;
  90. }
  91. }
  92. }

2.5 CountdownLatch 倒计时锁

  • CountDownLatch 允许多线程阻塞在一个地方,直至所有线程的任务都执行完毕。在 Java 并发中,countdownlatch 的概念是一个常见的面试题,所以一定要确保你很好的理解了它。
  • CountDownLatch 是共享锁的一种实现,它默认构造 AQS 的 state 值为 count。当线程使用 countDown方法时,其实使用了 tryReleaseShared 方法以CAS 的操作来减少 state ,直至 state 为 0 就代表所有的线程都调用了countDown方法。当调用 await 方法的时候,如果 state 不为0,就代表仍然有线程没有调用 countDown 方法,那么就把已经调用过 countDown 的线程都放入阻塞队列 Park ,并自旋 CAS 判断 state == 0,直至最后一个线程调用了 countDown ,使得 state == 0,于是阻塞的线程便判断成功,全部往下执行。
  • 用来进行线程同步协作,等待所有线程完成倒计时。 其中构造参数用来初始化等待计数值,await() 用来等待计数归零,countDown() 用来让计数减一。

    1. @Slf4j(topic = "c.test5")
    2. public class Test5 {
    3. public static void main(String[] args) throws InterruptedException {
    4. CountDownLatch latch = new CountDownLatch(3);
    5. new Thread(()->{
    6. log.debug("begin..");
    7. try {
    8. Thread.sleep(1000);
    9. latch.countDown();
    10. log.debug("end...,{}",latch.getCount());
    11. } catch (InterruptedException e) {
    12. e.printStackTrace();
    13. }
    14. }).start();
    15. new Thread(()->{
    16. log.debug("begin..");
    17. try {
    18. Thread.sleep(2000);
    19. latch.countDown();
    20. log.debug("end...,{}",latch.getCount());
    21. } catch (InterruptedException e) {
    22. e.printStackTrace();
    23. }
    24. }).start();
    25. new Thread(()->{
    26. log.debug("begin..");
    27. try {
    28. Thread.sleep(1500);
    29. latch.countDown();
    30. log.debug("end...,{}",latch.getCount());
    31. } catch (InterruptedException e) {
    32. e.printStackTrace();
    33. }
    34. }).start();
    35. log.debug("等待...");
    36. latch.await();
    37. log.debug("等待结束");
    38. }
    39. }
    1. 17:31:13.539 [Thread-1] DEBUG c.test5 - begin..
    2. 17:31:13.539 [Thread-2] DEBUG c.test5 - begin..
    3. 17:31:13.539 [main] DEBUG c.test5 - 等待...
    4. 17:31:13.539 [Thread-0] DEBUG c.test5 - begin..
    5. 17:31:14.546 [Thread-0] DEBUG c.test5 - end...,2
    6. 17:31:15.045 [Thread-2] DEBUG c.test5 - end...,1
    7. 17:31:15.547 [Thread-1] DEBUG c.test5 - end...,0
    8. 17:31:15.547 [main] DEBUG c.test5 - 等待结束

    每一个线程会countDown一次,最终到0为止

    使用线程池

  1. @Slf4j(topic = "c.test5")
  2. public class Test5 {
  3. public static void main(String[] args) throws InterruptedException {
  4. CountDownLatch latch = new CountDownLatch(3);
  5. ExecutorService service = Executors.newFixedThreadPool(4);
  6. service.submit(()->{
  7. log.debug("begin..");
  8. try {
  9. Thread.sleep(1000);
  10. latch.countDown();
  11. log.debug("end...,{}",latch.getCount());
  12. } catch (InterruptedException e) {
  13. e.printStackTrace();
  14. }
  15. });
  16. service.submit(()->{
  17. log.debug("begin..");
  18. try {
  19. Thread.sleep(2000);
  20. latch.countDown();
  21. log.debug("end...,{}",latch.getCount());
  22. } catch (InterruptedException e) {
  23. e.printStackTrace();
  24. }
  25. });
  26. service.submit(()->{
  27. log.debug("begin..");
  28. try {
  29. Thread.sleep(1500);
  30. latch.countDown();
  31. log.debug("end...,{}",latch.getCount());
  32. } catch (InterruptedException e) {
  33. e.printStackTrace();
  34. }
  35. });
  36. service.submit(()->{
  37. log.debug("等待...");
  38. try {
  39. latch.await();
  40. } catch (InterruptedException e) {
  41. e.printStackTrace();
  42. }
  43. log.debug("等待结束");
  44. });
  45. }
  46. }
  1. 17:34:34.681 [pool-1-thread-3] DEBUG c.test5 - begin..
  2. 17:34:34.681 [pool-1-thread-1] DEBUG c.test5 - begin..
  3. 17:34:34.681 [pool-1-thread-2] DEBUG c.test5 - begin..
  4. 17:34:34.681 [pool-1-thread-4] DEBUG c.test5 - 等待...
  5. 17:34:35.690 [pool-1-thread-1] DEBUG c.test5 - end...,2
  6. 17:34:36.190 [pool-1-thread-3] DEBUG c.test5 - end...,1
  7. 17:34:36.690 [pool-1-thread-2] DEBUG c.test5 - end...,0
  8. 17:34:36.690 [pool-1-thread-4] DEBUG c.test5 - 等待结束

2.6 CyclicBarrier

循环栅栏,用来进行线程协作,等待线程满足某个计数。构造时设置『计数个数』,每个线程执行到某个需要“同步”的时刻调用 await() 方法进行等待,当等待的线程数满足『计数个数』时,继续执行。跟 CountdownLatch 一样,但这个可以重用。

  1. public static void main(String[] args) {
  2. ExecutorService executorService = Executors.newFixedThreadPool(2);
  3. CyclicBarrier cyclicBarrier = new CyclicBarrier(2, () -> {
  4. log.info("task2 finish ...");
  5. });
  6. for(int i = 0; i < 3; i++) {
  7. executorService.submit(() -> {
  8. log.info("task1 begin ...");
  9. try {
  10. Thread.sleep(1000);
  11. cyclicBarrier.await();
  12. } catch (InterruptedException | BrokenBarrierException e) {
  13. e.printStackTrace();
  14. }
  15. });
  16. executorService.submit(() -> {
  17. log.info("task2 begin ...");
  18. try {
  19. Thread.sleep(2000);
  20. cyclicBarrier.await();
  21. } catch (InterruptedException | BrokenBarrierException e) {
  22. e.printStackTrace();
  23. }
  24. });
  25. }
  26. executorService.shutdown();
  27. }