JUC

AQS原理

起源

早期程序员会自己通过一种同步器去实现另一种相近的同步器,例如用可重入锁去实现信号量,或反之。这显然不够优雅,于是在 JSR166(java 规范提案)中创建了 AQS,提供了这种通用的同步器机制。

概述

全称是 AbstractQueuedSynchronizer,是阻塞式锁和相关的同步器工具的框架。

特点:

  • 用 state 属性来表示资源的状态(分独占模式和共享模式),子类需要定义如何维护这个状态,控制如何获取锁和释放锁
    • getState - 获取 state 状态
    • setState - 设置 state 状态
    • compareAndSetState - cas 机制设置 state 状态
    • 独占模式是只有一个线程能够访问资源,而共享模式可以允许多个线程访问资源
  • 提供了基于 FIFO 的等待队列,类似于 Monitor 的 EntryList
  • 条件变量来实现等待、唤醒机制,支持多个条件变量,类似于 Monitor 的 WaitSet

子类主要实现这样一些方法(默认抛出 UnsupportedOperationException)

  • tryAcquire
  • tryRelease
  • tryAcquireShared
  • tryReleaseShared
  • isHeldExclusively

获取锁的姿势

  1. // 如果获取锁失败
  2. if (!tryAcquire(arg)) {
  3. // 入队, 可以选择阻塞当前线程 park unpark
  4. }

释放锁的姿势

  1. // 如果释放锁成功
  2. if (tryRelease(arg)) {
  3. // 让阻塞线程恢复运行
  4. }

目标

AQS 要实现的功能目标

  • 阻塞版本获取锁 acquire 和非阻塞的版本尝试获取锁 tryAcquire
  • 获取锁超时机制
  • 通过打断取消机制
  • 独占机制及共享机制
  • 条件不满足时的等待机制

实现不可重入锁

  1. @Slf4j(topic = "c.MyLock")
  2. public class MyLock implements Lock {
  3. public static void main(String[] args) {
  4. MyLock lock = new MyLock();
  5. new Thread(() -> {
  6. lock.lock();
  7. try {
  8. log.debug("locking...");
  9. try {
  10. Thread.sleep(1000);
  11. } catch (InterruptedException e) {
  12. e.printStackTrace();
  13. }
  14. } finally {
  15. log.debug("unlocking...");
  16. lock.unlock();
  17. }
  18. }, "t1").start();
  19. new Thread(() -> {
  20. lock.lock();
  21. try {
  22. log.debug("locking...");
  23. } finally {
  24. log.debug("unlocking...");
  25. lock.unlock();
  26. }
  27. }, "t2").start();
  28. }
  29. // 独占锁
  30. class MySync extends AbstractQueuedSynchronizer {
  31. @Override
  32. protected boolean tryAcquire(int acquires) {
  33. if (acquires == 1) {
  34. if (compareAndSetState(0, 1)) {
  35. setExclusiveOwnerThread(Thread.currentThread());
  36. return true;
  37. }
  38. }
  39. return false;
  40. }
  41. @Override
  42. protected boolean tryRelease(int acquires) {
  43. if (acquires == 1) {
  44. if (getState() == 0) {
  45. throw new IllegalMonitorStateException();
  46. }
  47. setExclusiveOwnerThread(null);
  48. setState(0);
  49. return true;
  50. }
  51. return false;
  52. }
  53. protected Condition newCondition() {
  54. return new ConditionObject();
  55. }
  56. @Override
  57. protected boolean isHeldExclusively() {
  58. return getState() == 1;
  59. }
  60. }
  61. private MySync mySync = new MySync();
  62. @Override // 加锁,不成功会进入等待队列
  63. public void lock() {
  64. mySync.acquire(1);
  65. }
  66. @Override // 加锁,可打断
  67. public void lockInterruptibly() throws InterruptedException {
  68. mySync.acquireInterruptibly(1);
  69. }
  70. @Override // 尝试加锁(一次)
  71. public boolean tryLock() {
  72. return mySync.tryAcquire(1);
  73. }
  74. @Override // 尝试加锁,带超时
  75. public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
  76. return mySync.tryAcquireNanos(1, unit.toNanos(time));
  77. }
  78. @Override // 解锁
  79. public void unlock() {
  80. mySync.release(1);
  81. }
  82. @Override // 创建条件变量
  83. public Condition newCondition() {
  84. return mySync.newCondition();
  85. }
  86. }

设计

AQS 的基本思想其实很简单

获取锁的逻辑

  1. while(state 状态不允许获取) {
  2. if(队列中还没有此线程) {
  3. 入队并阻塞
  4. }
  5. }
  6. 当前线程出队

释放锁的逻辑

  1. if(state 状态允许了) {
  2. 恢复阻塞的线程(s)
  3. }

要点

  • 原子维护 state 状态
  • 阻塞及恢复线程
  • 维护队列

state 设计

  • state 使用 volatile 配合 cas 保证其修改时的原子性
  • state 使用了 32bit int 来维护同步状态,因为当时使用 long 在很多平台下测试的结果并不理想

阻塞恢复设计

  • 早期的控制线程暂停和恢复的 api 有 suspend 和 resume,但它们是不可用的,因为如果先调用的 resume 那么 suspend 将感知不到
  • 解决方法是使用 park & unpark 来实现线程的暂停和恢复,具体原理在之前讲过了,先 unpark 再 park 也没问题
  • park & unpark 是针对线程的,而不是针对同步器的,因此控制粒度更为精细
  • park 线程还可以通过 interrupt 打断

队列设计

  • 使用了 FIFO 先入先出队列,并不支持优先级队列
  • 设计时借鉴了 CLH 队列,它是一种单向无锁队列

JUC - 图1

队列中有 head 和 tail 两个指针节点,都用 volatile 修饰配合 cas 使用,每个节点有 state 维护节点状态。

入队伪代码,只需要考虑 tail 赋值的原子性

  1. do {
  2. // 原来的 tail
  3. Node prev = tail;
  4. // 用 cas 在原来 tail 的基础上改为 node
  5. } while(tail.compareAndSet(prev, node))

出队伪代码

  1. // prev 是上一个节点
  2. while((Node prev=node.prev).state != 唤醒状态) {
  3. }
  4. // 设置头节点
  5. head = node;

CLH 好处:

  • 无锁,使用自旋
  • 快速,无阻塞

AQS 在一些方面改进了 CLH

  1. private Node enq(final Node node) {
  2. for (;;) {
  3. Node t = tail;
  4. // 队列中还没有元素 tail 为 null
  5. if (t == null) {
  6. // 将 head 从 null -> dummy
  7. if (compareAndSetHead(new Node()))
  8. tail = head;
  9. } else {
  10. // 将 node 的 prev 设置为原来的 tail
  11. node.prev = t;
  12. // 将 tail 从原来的 tail 设置为 node
  13. if (compareAndSetTail(t, node)) {
  14. // 原来 tail 的 next 设置为 node
  15. t.next = node;
  16. return t;
  17. }
  18. }
  19. }
  20. }

主要用到AQS的并发工具类

JUC - 图2

ReentrantLock原理

JUC - 图3

非公平锁实现原理

加锁解锁流程

先从构造器开始看,默认为非公平锁实现

  1. public ReentrantLock() {
  2. sync = new NonfairSync();
  3. }

NonfairSync 继承自 AQS,没有竞争时

JUC - 图4

第一个竞争出现时

JUC - 图5

Thread-1 执行了

  1. CAS 尝试将 state 由 0 改为 1,结果失败
  2. 进入 tryAcquire 逻辑,这时 state 已经是1,结果仍然失败
  3. 接下来进入 addWaiter 逻辑,构造 Node 队列
    • 图中黄色三角表示该 Node 的 waitStatus 状态,其中 0 为默认正常状态
    • Node 的创建是懒惰的
    • 其中第一个 Node 称为 Dummy(哑元)或哨兵,用来占位,并不关联线程

JUC - 图6

当前线程进入 acquireQueued 逻辑

  1. acquireQueued 会在一个死循环中不断尝试获得锁,失败后进入 park 阻塞
  2. 如果自己是紧邻着 head(排第二位),那么再次 tryAcquire 尝试获取锁,当然这时 state 仍为 1,失败
  3. 进入 shouldParkAfterFailedAcquire 逻辑,将前驱 node,即 head 的 waitStatus 改为 -1,这次返回 false(-1表示可以唤醒下一个节点)

JUC - 图7

  1. shouldParkAfterFailedAcquire 执行完毕回到 acquireQueued ,再次 tryAcquire 尝试获取锁,当然这时state 仍为 1,失败
  2. 当再次进入 shouldParkAfterFailedAcquire 时,这时因为其前驱 node 的 waitStatus 已经是 -1,这次返回true
  3. 进入 parkAndCheckInterrupt, Thread-1 park(灰色表示)

JUC - 图8

再次有多个线程经历上述过程竞争失败,变成这个样子,都在等待

JUC - 图9

Thread-0 释放锁,进入 tryRelease 流程,如果成功

  • 设置 exclusiveOwnerThread 为 null
  • state = 0

JUC - 图10

当前队列不为 null,并且 head 的 waitStatus = -1,进入 unparkSuccessor 流程

找到队列中离 head 最近的一个 Node(没取消的),unpark 恢复其运行,本例中即为 Thread-1

回到 Thread-1 的 acquireQueued 流程

JUC - 图11

如果加锁成功(没有竞争),会设置

  • exclusiveOwnerThread 为 Thread-1,state = 1
  • head 指向刚刚 Thread-1 所在的 Node,该 Node 清空 Thread
  • 原本的 head 因为从链表断开,而可被垃圾回收

如果这时候有其它线程来竞争(非公平的体现),例如这时有 Thread-4 来了

JUC - 图12

如果不巧又被 Thread-4 占了先

  • Thread-4 被设置为 exclusiveOwnerThread,state = 1
  • Thread-1 再次进入 acquireQueued 流程,获取锁失败,重新进入 park 阻塞

加锁源码

  1. // Sync 继承自 AQS
  2. static final class NonfairSync extends Sync {
  3. private static final long serialVersionUID = 7316153563782823691L;
  4. // 加锁实现
  5. final void lock() {
  6. // 首先用 cas 尝试(仅尝试一次)将 state 从 0 改为 1, 如果成功表示获得了独占锁
  7. if (compareAndSetState(0, 1))
  8. setExclusiveOwnerThread(Thread.currentThread());
  9. else
  10. // 如果尝试失败,进入 ㈠
  11. acquire(1);
  12. }
  13. // ㈠ AQS 继承过来的方法, 方便阅读, 放在此处
  14. public final void acquire(int arg) {
  15. // ㈡ tryAcquire
  16. if (
  17. !tryAcquire(arg) &&
  18. // 当 tryAcquire 返回为 false 时, 先调用 addWaiter ㈣, 接着 acquireQueued ㈤
  19. acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
  20. ) {
  21. selfInterrupt();
  22. }
  23. }
  24. // ㈡ 进入 ㈢
  25. protected final boolean tryAcquire(int acquires) {
  26. return nonfairTryAcquire(acquires);
  27. }
  28. // ㈢ Sync 继承过来的方法, 方便阅读, 放在此处
  29. final boolean nonfairTryAcquire(int acquires) {
  30. final Thread current = Thread.currentThread();
  31. int c = getState();
  32. // 如果还没有获得锁
  33. if (c == 0) {
  34. // 尝试用 cas 获得, 这里体现了非公平性: 不去检查 AQS 队列
  35. if (compareAndSetState(0, acquires)) {
  36. setExclusiveOwnerThread(current);
  37. return true;
  38. }
  39. }
  40. // 如果已经获得了锁, 线程还是当前线程, 表示发生了锁重入
  41. else if (current == getExclusiveOwnerThread()) {
  42. // state++
  43. int nextc = c + acquires;
  44. if (nextc < 0) // overflow
  45. throw new Error("Maximum lock count exceeded");
  46. setState(nextc);
  47. return true;
  48. }
  49. // 获取失败, 回到调用处
  50. return false;
  51. }
  52. // ㈣ AQS 继承过来的方法, 方便阅读, 放在此处
  53. private Node addWaiter(Node mode) {
  54. // 将当前线程关联到一个 Node 对象上, 模式为独占模式
  55. Node node = new Node(Thread.currentThread(), mode);
  56. // 如果 tail 不为 null, cas 尝试将 Node 对象加入 AQS 队列尾部
  57. Node pred = tail;
  58. if (pred != null) {
  59. node.prev = pred;
  60. if (compareAndSetTail(pred, node)) {
  61. // 双向链表
  62. pred.next = node;
  63. return node;
  64. }
  65. }
  66. // 尝试将 Node 加入 AQS, 进入 ㈥
  67. enq(node);
  68. return node;
  69. }
  70. // ㈥ AQS 继承过来的方法, 方便阅读, 放在此处
  71. private Node enq(final Node node) {
  72. for (;;) {
  73. Node t = tail;
  74. if (t == null) {
  75. // 还没有, 设置 head 为哨兵节点(不对应线程,状态为 0)
  76. if (compareAndSetHead(new Node())) {
  77. tail = head;
  78. }
  79. } else {
  80. // cas 尝试将 Node 对象加入 AQS 队列尾部
  81. node.prev = t;
  82. if (compareAndSetTail(t, node)) {
  83. t.next = node;
  84. return t;
  85. }
  86. }
  87. }
  88. }
  89. // ㈤ AQS 继承过来的方法, 方便阅读, 放在此处
  90. final boolean acquireQueued(final Node node, int arg) {
  91. boolean failed = true;
  92. try {
  93. boolean interrupted = false;
  94. for (;;) {
  95. final Node p = node.predecessor();
  96. // 上一个节点是 head, 表示轮到自己(当前线程对应的 node)了, 尝试获取
  97. if (p == head && tryAcquire(arg)) {
  98. // 获取成功, 设置自己(当前线程对应的 node)为 head
  99. setHead(node);
  100. // 上一个节点 help GC
  101. p.next = null;
  102. failed = false;
  103. // 返回中断标记 false
  104. return interrupted;
  105. }
  106. if (
  107. // 判断是否应当 park, 进入 ㈦
  108. shouldParkAfterFailedAcquire(p, node) &&
  109. // park 等待, 此时 Node 的状态被置为 Node.SIGNAL ㈧
  110. parkAndCheckInterrupt()
  111. ) {
  112. interrupted = true;
  113. }
  114. }
  115. } finally {
  116. if (failed)
  117. cancelAcquire(node);
  118. }
  119. }
  120. // ㈦ AQS 继承过来的方法, 方便阅读, 放在此处
  121. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  122. // 获取上一个节点的状态
  123. int ws = pred.waitStatus;
  124. if (ws == Node.SIGNAL) {
  125. // 上一个节点都在阻塞, 那么自己也阻塞好了
  126. return true;
  127. }
  128. // > 0 表示取消状态
  129. if (ws > 0) {
  130. // 上一个节点取消, 那么重构删除前面所有取消的节点, 返回到外层循环重试
  131. do {
  132. node.prev = pred = pred.prev;
  133. } while (pred.waitStatus > 0);
  134. pred.next = node;
  135. } else {
  136. // 这次还没有阻塞
  137. // 但下次如果重试不成功, 则需要阻塞,这时需要设置上一个节点状态为 Node.SIGNAL
  138. compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
  139. }
  140. return false;
  141. }
  142. // ㈧ 阻塞当前线程
  143. private final boolean parkAndCheckInterrupt() {
  144. LockSupport.park(this);
  145. return Thread.interrupted();
  146. }
  147. }

注意

是否需要 unpark 是由当前节点的前驱节点的 waitStatus == Node.SIGNAL 来决定,而不是本节点的waitStatus 决定。

解锁源码

  1. // Sync 继承自 AQS
  2. static final class NonfairSync extends Sync {
  3. // 解锁实现
  4. public void unlock() {
  5. sync.release(1);
  6. }
  7. // AQS 继承过来的方法, 方便阅读, 放在此处
  8. public final boolean release(int arg) {
  9. // 尝试释放锁, 进入 ㈠
  10. if (tryRelease(arg)) {
  11. // 队列头节点 unpark
  12. Node h = head;
  13. if (
  14. // 队列不为 null
  15. h != null &&
  16. // waitStatus == Node.SIGNAL 才需要 unpark
  17. h.waitStatus != 0
  18. ) {
  19. // unpark AQS 中等待的线程, 进入 ㈡
  20. unparkSuccessor(h);
  21. }
  22. return true;
  23. }
  24. return false;
  25. }
  26. // ㈠ Sync 继承过来的方法, 方便阅读, 放在此处
  27. protected final boolean tryRelease(int releases) {
  28. // state--
  29. int c = getState() - releases;
  30. if (Thread.currentThread() != getExclusiveOwnerThread())
  31. throw new IllegalMonitorStateException();
  32. boolean free = false;
  33. // 支持锁重入, 只有 state 减为 0, 才释放成功
  34. if (c == 0) {
  35. free = true;
  36. setExclusiveOwnerThread(null);
  37. }
  38. setState(c);
  39. return free;
  40. }
  41. // ㈡ AQS 继承过来的方法, 方便阅读, 放在此处
  42. private void unparkSuccessor(Node node) {
  43. // 如果状态为 Node.SIGNAL 尝试重置状态为 0
  44. // 不成功也可以
  45. int ws = node.waitStatus;
  46. if (ws < 0) {
  47. compareAndSetWaitStatus(node, ws, 0);
  48. }
  49. // 找到需要 unpark 的节点, 但本节点从 AQS 队列中脱离, 是由唤醒节点完成的
  50. Node s = node.next;
  51. // 不考虑已取消的节点, 从 AQS 队列从后至前找到队列最前面需要 unpark 的节点
  52. if (s == null || s.waitStatus > 0) {
  53. s = null;
  54. for (Node t = tail; t != null && t != node; t = t.prev)
  55. if (t.waitStatus <= 0)
  56. s = t;
  57. }
  58. if (s != null)
  59. LockSupport.unpark(s.thread);
  60. }
  61. }

可重入原理

  1. static final class NonfairSync extends Sync {
  2. // ...
  3. // Sync 继承过来的方法, 方便阅读, 放在此处
  4. final boolean nonfairTryAcquire(int acquires) {
  5. final Thread current = Thread.currentThread();
  6. int c = getState();
  7. if (c == 0) {
  8. if (compareAndSetState(0, acquires)) {
  9. setExclusiveOwnerThread(current);
  10. return true;
  11. }
  12. }
  13. // 如果已经获得了锁, 线程还是当前线程, 表示发生了锁重入
  14. else if (current == getExclusiveOwnerThread()) {
  15. // state++
  16. int nextc = c + acquires;
  17. if (nextc < 0) // overflow
  18. throw new Error("Maximum lock count exceeded");
  19. setState(nextc);
  20. return true;
  21. }
  22. return false;
  23. }
  24. // Sync 继承过来的方法, 方便阅读, 放在此处
  25. protected final boolean tryRelease(int releases) {
  26. // state--
  27. int c = getState() - releases;
  28. if (Thread.currentThread() != getExclusiveOwnerThread())
  29. throw new IllegalMonitorStateException();
  30. boolean free = false;
  31. // 支持锁重入, 只有 state 减为 0, 才释放成功
  32. if (c == 0) {
  33. free = true;
  34. setExclusiveOwnerThread(null);
  35. }
  36. setState(c);
  37. return free;
  38. }
  39. }

可打断原理

不可打断模式

在此模式下,即使它被打断,仍会驻留在 AQS 队列中,一直要等到获得锁后方能得知自己被打断了

  1. // Sync 继承自 AQS
  2. static final class NonfairSync extends Sync {
  3. // ...
  4. private final boolean parkAndCheckInterrupt() {
  5. // 如果打断标记已经是 true, 则 park 会失效
  6. LockSupport.park(this);
  7. // interrupted 会清除打断标记
  8. return Thread.interrupted();
  9. }
  10. final boolean acquireQueued(final Node node, int arg) {
  11. boolean failed = true;
  12. try {
  13. boolean interrupted = false;
  14. for (; ; ) {
  15. final Node p = node.predecessor();
  16. if (p == head && tryAcquire(arg)) {
  17. setHead(node);
  18. p.next = null;
  19. failed = false;
  20. // 还是需要获得锁后, 才能返回打断状态
  21. return interrupted;
  22. }
  23. if (
  24. shouldParkAfterFailedAcquire(p, node) &&
  25. parkAndCheckInterrupt()
  26. ) {
  27. // 如果是因为 interrupt 被唤醒, 返回打断状态为 true
  28. interrupted = true;
  29. }
  30. }
  31. } finally {
  32. if (failed)
  33. cancelAcquire(node);
  34. }
  35. }
  36. public final void acquire(int arg) {
  37. if (
  38. !tryAcquire(arg) &&
  39. acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
  40. ) {
  41. // 如果打断状态为 true
  42. selfInterrupt();
  43. }
  44. }
  45. static void selfInterrupt() {
  46. // 重新产生一次中断
  47. Thread.currentThread().interrupt();
  48. }
  49. }

不可打断模式
  1. static final class NonfairSync extends Sync {
  2. public final void acquireInterruptibly(int arg) throws InterruptedException {
  3. if (Thread.interrupted())
  4. throw new InterruptedException();
  5. // 如果没有获得到锁, 进入 ㈠
  6. if (!tryAcquire(arg))
  7. doAcquireInterruptibly(arg);
  8. }
  9. // ㈠ 可打断的获取锁流程
  10. private void doAcquireInterruptibly(int arg) throws InterruptedException {
  11. final Node node = addWaiter(Node.EXCLUSIVE);
  12. boolean failed = true;
  13. try {
  14. for (;;) {
  15. final Node p = node.predecessor();
  16. if (p == head && tryAcquire(arg)) {
  17. setHead(node);
  18. p.next = null; // help GC
  19. failed = false;
  20. return;
  21. }
  22. if (shouldParkAfterFailedAcquire(p, node) &&
  23. parkAndCheckInterrupt()) {
  24. // 在 park 过程中如果被 interrupt 会进入此
  25. // 这时候抛出异常, 而不会再次进入 for (;;)
  26. throw new InterruptedException();
  27. }
  28. }
  29. } finally {
  30. if (failed)
  31. cancelAcquire(node);
  32. }
  33. }
  34. }

公平锁实现原理

  1. static final class FairSync extends Sync {
  2. private static final long serialVersionUID = -3000897897090466540L;
  3. final void lock() {
  4. acquire(1);
  5. }
  6. // AQS 继承过来的方法, 方便阅读, 放在此处
  7. public final void acquire(int arg) {
  8. if (
  9. !tryAcquire(arg) &&
  10. acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
  11. ) {
  12. selfInterrupt();
  13. }
  14. }
  15. // 与非公平锁主要区别在于 tryAcquire 方法的实现
  16. protected final boolean tryAcquire(int acquires) {
  17. final Thread current = Thread.currentThread();
  18. int c = getState();
  19. if (c == 0) {
  20. // 先检查 AQS 队列中是否有前驱节点, 没有才去竞争
  21. if (!hasQueuedPredecessors() &&
  22. compareAndSetState(0, acquires)) {
  23. setExclusiveOwnerThread(current);
  24. return true;
  25. }
  26. } else if (current == getExclusiveOwnerThread()) {
  27. int nextc = c + acquires;
  28. if (nextc < 0)
  29. throw new Error("Maximum lock count exceeded");
  30. setState(nextc);
  31. return true;
  32. }
  33. return false;
  34. }
  35. // ㈠ AQS 继承过来的方法, 方便阅读, 放在此处
  36. public final boolean hasQueuedPredecessors() {
  37. Node t = tail;
  38. Node h = head;
  39. Node s;
  40. // h != t 时表示队列中有 Node
  41. return h != t &&
  42. (
  43. // (s = h.next) == null 表示队列中还有没有老二
  44. (s = h.next) == null ||
  45. // 或者队列中老二线程不是此线程
  46. s.thread != Thread.currentThread()
  47. );
  48. }
  49. }

条件变量实现原理

每个条件变量其实就对应着一个等待队列,其实现类是 ConditionObject

await流程

开始 Thread-0 持有锁,调用 await,进入 ConditionObject 的 addConditionWaiter 流程

创建新的 Node 状态为 -2(Node.CONDITION),关联 Thread-0,加入等待队列尾部

JUC - 图13

接下来进入 AQS 的 fullyRelease 流程,释放同步器上的锁

JUC - 图14

unpark AQS 队列中的下一个节点,竞争锁,假设没有其他竞争线程,那么 Thread-1 竞争成功

JUC - 图15

park 阻塞 Thread-0

JUC - 图16

signal流程

假设 Thread-1 要来唤醒 Thread-0

JUC - 图17

进入 ConditionObject 的 doSignal 流程,取得等待队列中第一个 Node,即 Thread-0 所在 Node

JUC - 图18

执行 transferForSignal 流程,将该 Node 加入 AQS 队列尾部,将 Thread-0 的 waitStatus 改为 0,Thread-3 的waitStatus 改为 -1

JUC - 图19

Thread-1 释放锁,进入 unlock 流程,略

源码

  1. public class ConditionObject implements Condition, java.io.Serializable {
  2. private static final long serialVersionUID = 1173984872572414699L;
  3. // 第一个等待节点
  4. private transient Node firstWaiter;
  5. // 最后一个等待节点
  6. private transient Node lastWaiter;
  7. public ConditionObject() {
  8. }
  9. // ㈠ 添加一个 Node 至等待队列
  10. private Node addConditionWaiter() {
  11. Node t = lastWaiter;
  12. // 所有已取消的 Node 从队列链表删除, 见 ㈡
  13. public class ConditionObject implements Condition, java.io.Serializable {
  14. private static final long serialVersionUID = 1173984872572414699L;
  15. // 第一个等待节点
  16. private transient Node firstWaiter;
  17. // 最后一个等待节点
  18. private transient Node lastWaiter;
  19. public ConditionObject() {
  20. }
  21. // ㈠ 添加一个 Node 至等待队列
  22. private Node addConditionWaiter() {
  23. Node t = lastWaiter;
  24. // 所有已取消的 Node 从队列链表删除, 见 ㈡
  25. private void doSignalAll (Node first){
  26. lastWaiter = firstWaiter = null;
  27. do {
  28. Node next = first.nextWaiter;
  29. first.nextWaiter = null;
  30. transferForSignal(first);
  31. first = next;
  32. } while (first != null);
  33. }
  34. // ㈡
  35. private void unlinkCancelledWaiters () {
  36. // ...
  37. }
  38. // 唤醒 - 必须持有锁才能唤醒, 因此 doSignal 内无需考虑加锁
  39. public final void signal () {
  40. if (!isHeldExclusively())
  41. throw new IllegalMonitorStateException();
  42. Node first = firstWaiter;
  43. if (first != null)
  44. doSignal(first);
  45. }
  46. // 全部唤醒 - 必须持有锁才能唤醒, 因此 doSignalAll 内无需考虑加锁
  47. public final void signalAll () {
  48. if (!isHeldExclusively())
  49. throw new IllegalMonitorStateException();
  50. Node first = firstWaiter;
  51. if (first != null)
  52. doSignalAll(first);
  53. }
  54. // 不可打断等待 - 直到被唤醒
  55. public final void awaitUninterruptibly () {
  56. // 添加一个 Node 至等待队列, 见 ㈠
  57. Node node = addConditionWaiter();
  58. // 释放节点持有的锁, 见 ㈣
  59. int savedState = fullyRelease(node);
  60. boolean interrupted = false;
  61. // 如果该节点还没有转移至 AQS 队列, 阻塞
  62. while (!isOnSyncQueue(node)) {
  63. // park 阻塞
  64. LockSupport.park(this);
  65. // 如果被打断, 仅设置打断状态
  66. if (Thread.interrupted())
  67. interrupted = true;
  68. }
  69. // 唤醒后, 尝试竞争锁, 如果失败进入 AQS 队列
  70. if (acquireQueued(node, savedState) || interrupted)
  71. selfInterrupt();
  72. }
  73. // 外部类方法, 方便阅读, 放在此处
  74. // ㈣ 因为某线程可能重入,需要将 state 全部释放
  75. final int fullyRelease (Node node){
  76. boolean failed = true;
  77. try {
  78. int savedState = getState();
  79. if (release(savedState)) {
  80. failed = false;
  81. return savedState;
  82. } else {
  83. throw new IllegalMonitorStateException();
  84. }
  85. } finally {
  86. if (failed)
  87. node.waitStatus = Node.CANCELLED;
  88. }
  89. }
  90. // 打断模式 - 在退出等待时重新设置打断状态
  91. private static final int REINTERRUPT = 1;
  92. // 打断模式 - 在退出等待时抛出异常
  93. private static final int THROW_IE = -1;
  94. // 判断打断模式
  95. private int checkInterruptWhileWaiting (Node node){
  96. return Thread.interrupted() ?
  97. (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
  98. 0;
  99. }
  100. // ㈤ 应用打断模式
  101. private void reportInterruptAfterWait ( int interruptMode)
  102. throws InterruptedException {
  103. if (interruptMode == THROW_IE)
  104. throw new InterruptedException();
  105. else if (interruptMode == REINTERRUPT)
  106. selfInterrupt();
  107. }
  108. // 等待 - 直到被唤醒或打断
  109. public final void await () throws InterruptedException {
  110. if (Thread.interrupted()) {
  111. throw new InterruptedException();
  112. }
  113. // 添加一个 Node 至等待队列, 见 ㈠
  114. Node node = addConditionWaiter();
  115. // 释放节点持有的锁
  116. int savedState = fullyRelease(node);
  117. int interruptMode = 0;
  118. // 如果该节点还没有转移至 AQS 队列, 阻塞
  119. while (!isOnSyncQueue(node)) {
  120. // park 阻塞
  121. LockSupport.park(this);
  122. // 如果被打断, 退出等待队列
  123. if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  124. break;
  125. }
  126. // 退出等待队列后, 还需要获得 AQS 队列的锁
  127. if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
  128. interruptMode = REINTERRUPT;
  129. // 所有已取消的 Node 从队列链表删除, 见 ㈡
  130. if (node.nextWaiter != null)
  131. unlinkCancelledWaiters();
  132. // 应用打断模式, 见 ㈤
  133. if (interruptMode != 0)
  134. reportInterruptAfterWait(interruptMode);
  135. }
  136. // 等待 - 直到被唤醒或打断或超时
  137. public final long awaitNanos ( long nanosTimeout) throws InterruptedException {
  138. if (Thread.interrupted()) {
  139. throw new InterruptedException();
  140. }
  141. // 添加一个 Node 至等待队列, 见 ㈠
  142. Node node = addConditionWaiter();
  143. // 释放节点持有的锁
  144. int savedState = fullyRelease(node);
  145. // 获得最后期限
  146. final long deadline = System.nanoTime() + nanosTimeout;
  147. int interruptMode = 0;
  148. // 如果该节点还没有转移至 AQS 队列, 阻塞
  149. while (!isOnSyncQueue(node)) {
  150. // 已超时, 退出等待队列
  151. if (nanosTimeout <= 0L) {
  152. transferAfterCancelledWait(node);
  153. break;
  154. }
  155. // park 阻塞一定时间, spinForTimeoutThreshold 为 1000 ns
  156. if (nanosTimeout >= spinForTimeoutThreshold)
  157. LockSupport.parkNanos(this, nanosTimeout);
  158. // 如果被打断, 退出等待队列
  159. if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  160. break;
  161. nanosTimeout = deadline - System.nanoTime();
  162. }
  163. // 退出等待队列后, 还需要获得 AQS 队列的锁
  164. if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
  165. interruptMode = REINTERRUPT;
  166. // 所有已取消的 Node 从队列链表删除, 见 ㈡
  167. if (node.nextWaiter != null)
  168. unlinkCancelledWaiters();
  169. // 应用打断模式, 见 ㈤
  170. if (interruptMode != 0)
  171. reportInterruptAfterWait(interruptMode);
  172. return deadline - System.nanoTime();
  173. }
  174. // 等待 - 直到被唤醒或打断或超时, 逻辑类似于 awaitNanos
  175. public final boolean awaitUntil (Date deadline) throws InterruptedException {
  176. // ...
  177. }
  178. // 等待 - 直到被唤醒或打断或超时, 逻辑类似于 awaitNanos
  179. public final boolean await ( long time, TimeUnit unit) throws InterruptedException {
  180. // ...
  181. }
  182. // 工具方法 省略 ...
  183. }

读写锁

概述

ReentrantReadWriteLock

当读操作远远高于写操作时,这时候使用 读写锁 让 读-读 可以并发,提高性能。 类似于数据库中的 select …from … lock in share mode

提供一个 数据容器类 内部分别使用读锁保护数据的 read() 方法,写锁保护数据的 write() 方法

读写锁的并发问题

  • 读锁和读锁可以并发
  • 读锁和写锁不能并发
  • 写锁和写锁不能并发

注意事项

  • 读锁不支持条件变量
  • 重入时升级不支持:即持有读锁的情况下去获取写锁,会导致获取写锁永久等待
  1. r.lock();
  2. try
  3. {
  4. // ...
  5. w.lock();
  6. try {
  7. // ...
  8. } finally {
  9. w.unlock();
  10. }
  11. } finally
  12. {
  13. r.unlock();
  14. }
  • 重入时降级支持:即持有写锁的情况下去获取读锁
  1. class CachedData {
  2. Object data;
  3. // 是否有效,如果失效,需要重新计算 data
  4. volatile boolean cacheValid;
  5. final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
  6. void processCachedData() {
  7. rwl.readLock().lock();
  8. if (!cacheValid) {
  9. // 获取写锁前必须释放读锁
  10. rwl.readLock().unlock();
  11. rwl.writeLock().lock();
  12. try {
  13. // 判断是否有其它线程已经获取了写锁、更新了缓存, 避免重复更新
  14. if (!cacheValid) {
  15. data = ...
  16. cacheValid = true;
  17. }
  18. // 降级为读锁, 释放写锁, 这样能够让其它线程读取缓存
  19. rwl.readLock().lock();
  20. } finally {
  21. rwl.writeLock().unlock();
  22. }
  23. }
  24. // 自己用完数据, 释放读锁
  25. try {
  26. use(data);
  27. } finally {
  28. rwl.readLock().unlock();
  29. }
  30. }
  31. }

也就是在写锁完毕后,降级为读锁,防止其他线程再写。

应用之缓存

读写锁可以应用在缓存,但只是单机。

读写锁原理

图解流程

读写锁用的是同一个 Sycn 同步器,因此等待队列、state 等也是同一个t1 w.lock,t2 r.lock。

  1. t1 成功上锁,流程与 ReentrantLock 加锁相比没有特殊之处,不同是写锁状态占了 state 的低 16 位,而读锁使用的是 state 的高 16 位
    JUC - 图20
  2. t2 执行 r.lock,这时进入读锁的 sync.acquireShared(1) 流程,首先会进入 tryAcquireShared 流程。如果有写锁占据,那么 tryAcquireShared 返回 -1 表示失败。

    tryAcquireShared 返回值表示

    • -1 表示失败
    • 0 表示成功,但后继节点不会继续唤醒
    • 正数表示成功,而且数值是还有几个后继节点需要唤醒,读写锁返回 1


JUC - 图21

  1. 这时会进入 sync.doAcquireShared(1) 流程,首先也是调用 addWaiter 添加节点,不同之处在于节点被设置为Node.SHARED 模式而非 Node.EXCLUSIVE 模式,注意此时 t2 仍处于活跃状态

JUC - 图22

  1. t2 会看看自己的节点是不是老二,如果是,还会再次调用 tryAcquireShared(1) 来尝试获取锁
  2. 如果没有成功,在 doAcquireShared 内 for (;;) 循环一次,把前驱节点的 waitStatus 改为 -1,再 for (;;) 循环一次尝试tryAcquireShared(1) 如果还不成功,那么在 parkAndCheckInterrupt() 处 park
    JUC - 图23

t3 r.lock,t4 w.lock

这种状态下,假设又有 t3 加读锁和 t4 加写锁,这期间 t1 仍然持有锁,就变成了下面的样子

JUC - 图24

t1 w.unlock

这时会走到写锁的 sync.release(1) 流程,调用 sync.tryRelease(1) 成功,变成下面的样子

JUC - 图25

接下来执行唤醒流程 sync.unparkSuccessor,即让老二恢复运行,这时 t2 在 doAcquireShared 内parkAndCheckInterrupt() 处恢复运行,这回再来一次 for (;;) 执行 tryAcquireShared 成功则让读锁计数加一

JUC - 图26

这时 t2 已经恢复运行,接下来 t2 调用 setHeadAndPropagate(node, 1),它原本所在节点被置为头节点

JUC - 图27

事情还没完,在 setHeadAndPropagate 方法内还会检查下一个节点是否是 shared,如果是则调用doReleaseShared() 将 head 的状态从 -1 改为 0 并唤醒老二,这时 t3 在 doAcquireShared 内parkAndCheckInterrupt() 处恢复运行

JUC - 图28

这回再来一次 for (;;) 执行 tryAcquireShared 成功则让读锁计数加一

JUC - 图29

这时 t3 已经恢复运行,接下来 t3 调用 setHeadAndPropagate(node, 1),它原本所在节点被置为头节点

JUC - 图30

下一个节点不是 shared 了,因此不会继续唤醒 t4 所在节点

t2 r.unlock,t3 r.unlock

t2 进入 sync.releaseShared(1) 中,调用 tryReleaseShared(1) 让计数减一,但由于计数还不为零

JUC - 图31

t3 进入 sync.releaseShared(1) 中,调用 tryReleaseShared(1) 让计数减一,这回计数为零了,进入doReleaseShared() 将头节点从 -1 改为 0 并唤醒老二,即

JUC - 图32

之后 t4 在 acquireQueued 中 parkAndCheckInterrupt 处恢复运行,再次 for (;;) 这次自己是老二,并且没有其他竞争,tryAcquire(1) 成功,修改头结点,流程结束

JUC - 图33

StampedLock

概述

该类自 JDK 8 加入,是为了进一步优化读性能,它的特点是在使用读锁、写锁时都必须配合【戳】使用

加解读锁

  1. long stamp = lock.readLock();
  2. lock.unlockRead(stamp);

加解写锁

  1. long stamp = lock.writeLock();
  2. lock.unlockWrite(stamp);

乐观读,StampedLock 支持 tryOptimisticRead() 方法(乐观读),读取完毕后需要做一次 戳校验 如果校验通过,表示这期间确实没有写操作,数据可以安全使用,如果校验没通过,需要重新获取读锁,保证数据安全。

  1. long stamp = lock.tryOptimisticRead();
  2. // 验戳
  3. if(!lock.validate(stamp)){
  4. // 锁升级
  5. }

注意

StampedLock 不支持条件变量

StampedLock 不支持可重入

测试

提供一个 数据容器类 内部分别使用读锁保护数据的 read() 方法,写锁保护数据的 write() 方法

  1. @Slf4j(topic = "c.TestStampedLock")
  2. public class TestStampedLock {
  3. public static void main(String[] args) {
  4. DataContainerStamped dataContainer = new DataContainerStamped(1);
  5. new Thread(() -> {
  6. dataContainer.read(1);
  7. }, "t1").start();
  8. sleep(0.5);
  9. new Thread(() -> {
  10. dataContainer.read(0);
  11. }, "t2").start();
  12. }
  13. }
  14. @Slf4j(topic = "c.DataContainerStamped")
  15. class DataContainerStamped {
  16. private int data;
  17. private final StampedLock lock = new StampedLock();
  18. public DataContainerStamped(int data) {
  19. this.data = data;
  20. }
  21. public int read(int readTime) {
  22. long stamp = lock.tryOptimisticRead();
  23. log.debug("optimistic read locking...{}", stamp);
  24. sleep(readTime);
  25. if (lock.validate(stamp)) {
  26. log.debug("read finish...{}, data:{}", stamp, data);
  27. return data;
  28. }
  29. // 锁升级 - 读锁
  30. log.debug("updating to read lock... {}", stamp);
  31. try {
  32. stamp = lock.readLock();
  33. log.debug("read lock {}", stamp);
  34. sleep(readTime);
  35. log.debug("read finish...{}, data:{}", stamp, data);
  36. return data;
  37. } finally {
  38. log.debug("read unlock {}", stamp);
  39. lock.unlockRead(stamp);
  40. }
  41. }
  42. public void write(int newData) {
  43. long stamp = lock.writeLock();
  44. log.debug("write lock {}", stamp);
  45. try {
  46. sleep(2);
  47. this.data = newData;
  48. } finally {
  49. log.debug("write unlock {}", stamp);
  50. lock.unlockWrite(stamp);
  51. }
  52. }
  53. }

读读锁可以优化为通过序列号来获取,因此不用加读锁

结果

  1. 14:48:03.903 c.DataContainerStamped [t1] - optimistic read locking...256
  2. 14:48:04.413 c.DataContainerStamped [t2] - optimistic read locking...256
  3. 14:48:04.414 c.DataContainerStamped [t2] - read finish...256, data:1
  4. 14:48:04.912 c.DataContainerStamped [t1] - read finish...256, data:1

测试 读-写 时优化读补加读锁

  1. @Slf4j(topic = "c.TestStampedLock")
  2. public class TestStampedLock {
  3. public static void main(String[] args) {
  4. DataContainerStamped dataContainer = new DataContainerStamped(1);
  5. new Thread(() -> {
  6. dataContainer.read(1);
  7. }, "t1").start();
  8. sleep(0.5);
  9. new Thread(() -> {
  10. dataContainer.write(0);
  11. }, "t2").start();
  12. }
  13. }

结果

  1. 14:50:17.444 c.DataContainerStamped [t1] - optimistic read locking...256
  2. 14:50:17.954 c.DataContainerStamped [t2] - write lock 384
  3. 14:50:18.457 c.DataContainerStamped [t1] - updating to read lock... 256
  4. 14:50:19.955 c.DataContainerStamped [t2] - write unlock 384
  5. 14:50:19.955 c.DataContainerStamped [t1] - read lock 513
  6. 14:50:20.964 c.DataContainerStamped [t1] - read finish...513, data:0
  7. 14:50:20.964 c.DataContainerStamped [t1] - read unlock 513

Semaphore

概述

信号量,用来限制能同时访问共享资源的线程上限。

可以适用停车场来比较,有车位才能进行停车,如果满了,其他车只能等待,需要等正在停的车走后再能进去停车。也就是可以用来做限流。

测试

  1. public static void main(String[] args) {
  2. // 1. 创建 semaphore 对象
  3. Semaphore semaphore = new Semaphore(3);
  4. // 2. 10个线程同时运行
  5. for (int i = 0; i < 10; i++) {
  6. new Thread(() -> {
  7. // 3. 获取许可
  8. try {
  9. semaphore.acquire();
  10. } catch (InterruptedException e) {
  11. e.printStackTrace();
  12. }
  13. try {
  14. log.debug("running...");
  15. sleep(1);
  16. log.debug("end...");
  17. } finally {
  18. // 4. 释放许可
  19. semaphore.release();
  20. }
  21. }).start();
  22. }
  23. }

结果

  1. 07:35:15.485 c.TestSemaphore [Thread-2] - running...
  2. 07:35:15.485 c.TestSemaphore [Thread-1] - running...
  3. 07:35:15.485 c.TestSemaphore [Thread-0] - running...
  4. 07:35:16.490 c.TestSemaphore [Thread-2] - end...
  5. 07:35:16.490 c.TestSemaphore [Thread-0] - end...
  6. 07:35:16.490 c.TestSemaphore [Thread-1] - end...
  7. 07:35:16.490 c.TestSemaphore [Thread-3] - running...
  8. 07:35:16.490 c.TestSemaphore [Thread-5] - running...
  9. 07:35:16.490 c.TestSemaphore [Thread-4] - running...
  10. 07:35:17.490 c.TestSemaphore [Thread-5] - end...
  11. 07:35:17.490 c.TestSemaphore [Thread-4] - end...
  12. 07:35:17.490 c.TestSemaphore [Thread-3] - end...
  13. 07:35:17.490 c.TestSemaphore [Thread-6] - running...
  14. 07:35:17.490 c.TestSemaphore [Thread-7] - running...
  15. 07:35:17.490 c.TestSemaphore [Thread-9] - running...
  16. 07:35:18.491 c.TestSemaphore [Thread-6] - end...
  17. 07:35:18.491 c.TestSemaphore [Thread-7] - end...
  18. 07:35:18.491 c.TestSemaphore [Thread-9] - end...
  19. 07:35:18.491 c.TestSemaphore [Thread-8] - running...
  20. 07:35:19.492 c.TestSemaphore [Thread-8] - end...

Semaphore应用

Semaphore原理

加锁解锁流程

Semaphore 有点像一个停车场,permits 就好像停车位数量,当线程获得了 permits 就像是获得了停车位,然后停车场显示空余车位减一

刚开始,permits(state)为 3,这时 5 个线程来获取资源

JUC - 图34

假设其中 Thread-1,Thread-2,Thread-4 cas 竞争成功,而 Thread-0 和 Thread-3 竞争失败,进入 AQS 队列park 阻塞

JUC - 图35

这时 Thread-4 释放了 permits,状态如下

JUC - 图36

接下来 Thread-0 竞争成功,permits 再次设置为 0,设置自己为 head 节点,断开原来的 head 节点,unpark 接下来的 Thread-3 节点,但由于 permits 是 0,因此 Thread-3 在尝试不成功后再次进入 park 状态

JUC - 图37

源码
  1. static final class NonfairSync extends Sync {
  2. private static final long serialVersionUID = -2694183684443567898L;
  3. NonfairSync(int permits) {
  4. // permits 即 state
  5. super(permits);
  6. }
  7. // Semaphore 方法, 方便阅读, 放在此处
  8. public void acquire() throws InterruptedException {
  9. sync.acquireSharedInterruptibly(1);
  10. }
  11. // AQS 继承过来的方法, 方便阅读, 放在此处
  12. public final void acquireSharedInterruptibly(int arg)
  13. throws InterruptedException {
  14. if (Thread.interrupted())
  15. throw new InterruptedException();
  16. if (tryAcquireShared(arg) < 0)
  17. doAcquireSharedInterruptibly(arg);
  18. }
  19. // 尝试获得共享锁
  20. protected int tryAcquireShared(int acquires) {
  21. return nonfairTryAcquireShared(acquires);
  22. }
  23. // Sync 继承过来的方法, 方便阅读, 放在此处
  24. final int nonfairTryAcquireShared(int acquires) {
  25. for (;;) {
  26. int available = getState();
  27. int remaining = available - acquires;
  28. if (
  29. // 如果许可已经用完, 返回负数, 表示获取失败, 进入 doAcquireSharedInterruptibly
  30. remaining < 0 ||
  31. // 如果 cas 重试成功, 返回正数, 表示获取成功
  32. compareAndSetState(available, remaining)
  33. ) {
  34. return remaining;
  35. }
  36. }
  37. }
  38. // AQS 继承过来的方法, 方便阅读, 放在此处
  39. private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
  40. final Node node = addWaiter(Node.SHARED);
  41. boolean failed = true;
  42. try {
  43. for (;;) {
  44. final Node p = node.predecessor();
  45. if (p == head) {
  46. // 再次尝试获取许可
  47. int r = tryAcquireShared(arg);
  48. if (r >= 0) {
  49. // 成功后本线程出队(AQS), 所在 Node设置为 head
  50. // 如果 head.waitStatus == Node.SIGNAL ==> 0 成功, 下一个节点 unpark
  51. // 如果 head.waitStatus == 0 ==> Node.PROPAGATE
  52. // r 表示可用资源数, 为 0 则不会继续传播
  53. setHeadAndPropagate(node, r);
  54. p.next = null; // help GC
  55. failed = false;
  56. return;
  57. }
  58. }
  59. // 不成功, 设置上一个节点 waitStatus = Node.SIGNAL, 下轮进入 park 阻塞
  60. if (shouldParkAfterFailedAcquire(p, node) &&
  61. parkAndCheckInterrupt())
  62. throw new InterruptedException();
  63. }
  64. } finally {
  65. if (failed)
  66. cancelAcquire(node);
  67. }
  68. }
  69. // Semaphore 方法, 方便阅读, 放在此处
  70. public void release() {
  71. sync.releaseShared(1);
  72. }
  73. // AQS 继承过来的方法, 方便阅读, 放在此处
  74. public final boolean releaseShared(int arg) {
  75. if (tryReleaseShared(arg)) {
  76. doReleaseShared();
  77. return true;
  78. }
  79. return false;
  80. }
  81. // Sync 继承过来的方法, 方便阅读, 放在此处
  82. protected final boolean tryReleaseShared(int releases) {
  83. for (;;) {
  84. int current = getState();
  85. int next = current + releases;
  86. if (next < current) // overflow
  87. throw new Error("Maximum permit count exceeded");
  88. if (compareAndSetState(current, next))
  89. return true;
  90. }
  91. }
  92. }

CountdownLatch

概述

用来进行线程同步协作,等待所有线程完成倒计时。

其中构造参数用来初始化等待计数值,await() 用来等待计数归零,countDown() 用来让计数减一

配合线程池使用

  1. public static void main(String[] args) throws InterruptedException {
  2. CountDownLatch latch = new CountDownLatch(3);
  3. ExecutorService service = Executors.newFixedThreadPool(4);
  4. service.submit(() -> {
  5. log.debug("begin...");
  6. sleep(1);
  7. latch.countDown();
  8. log.debug("end...{}", latch.getCount());
  9. });
  10. service.submit(() -> {
  11. log.debug("begin...");
  12. sleep(1.5);
  13. latch.countDown();
  14. log.debug("end...{}", latch.getCount());
  15. });
  16. service.submit(() -> {
  17. log.debug("begin...");
  18. sleep(2);
  19. latch.countDown();
  20. log.debug("end...{}", latch.getCount());
  21. });
  22. service.submit(()->{
  23. try {
  24. log.debug("waiting...");
  25. latch.await();
  26. log.debug("wait end...");
  27. } catch (InterruptedException e) {
  28. e.printStackTrace();
  29. }
  30. });
  31. }

结果

  1. 18:52:25.831 c.TestCountDownLatch [pool-1-thread-3] - begin...
  2. 18:52:25.831 c.TestCountDownLatch [pool-1-thread-1] - begin...
  3. 18:52:25.831 c.TestCountDownLatch [pool-1-thread-2] - begin...
  4. 18:52:25.831 c.TestCountDownLatch [pool-1-thread-4] - waiting...
  5. 18:52:26.835 c.TestCountDownLatch [pool-1-thread-1] - end...2
  6. 18:52:27.335 c.TestCountDownLatch [pool-1-thread-2] - end...1
  7. 18:52:27.835 c.TestCountDownLatch [pool-1-thread-3] - end...0
  8. 18:52:27.835 c.TestCountDownLatch [pool-1-thread-4] - wait end...

应用之同步等待多线程准备完毕
  1. public class Test12 {
  2. public static void main(String[] args) throws InterruptedException {
  3. ExecutorService pool = Executors.newFixedThreadPool(10);
  4. CountDownLatch countDownLatch = new CountDownLatch(10);
  5. String[] all = new String[10];
  6. Random r = new Random();
  7. for (int i = 0; i < 10; i++) {
  8. int k = i;
  9. pool.submit(() -> {
  10. for (int j = 0; j <= 100; j++) {
  11. try {
  12. Thread.sleep(r.nextInt(100));
  13. } catch (InterruptedException e) {
  14. e.printStackTrace();
  15. }
  16. all[k] = j + "%";
  17. // 覆盖上一次结果
  18. System.out.print("\r" + Arrays.toString(all));
  19. }
  20. // 每加载完成一个玩家,减一
  21. countDownLatch.countDown();
  22. });
  23. }
  24. // count减为0,主进程开始运行
  25. countDownLatch.await();
  26. System.out.println();
  27. System.out.println("欢迎来到英雄联盟!");
  28. pool.shutdown();
  29. }
  30. }

结果

动态的加载进度

  1. [100%, 100%, 100%, 100%, 100%, 100%, 100%, 100%, 100%, 100%]欢迎来到英雄联盟!

应用之同步等待多个远程调用结束

可以使用在业务中,调用不同的服务,但没有返回值

  1. public static void main(String[] args) throws InterruptedException, ExecutionException {
  2. test3();
  3. }
  4. private static void test5() {
  5. CountDownLatch latch = new CountDownLatch(3);
  6. ExecutorService service = Executors.newFixedThreadPool(4);
  7. service.submit(() -> {
  8. log.debug("begin...");
  9. sleep(1);
  10. latch.countDown();
  11. log.debug("end...{}", latch.getCount());
  12. });
  13. service.submit(() -> {
  14. log.debug("begin...");
  15. sleep(1.5);
  16. latch.countDown();
  17. log.debug("end...{}", latch.getCount());
  18. });
  19. service.submit(() -> {
  20. log.debug("begin...");
  21. sleep(2);
  22. latch.countDown();
  23. log.debug("end...{}", latch.getCount());
  24. });
  25. service.submit(()->{
  26. try {
  27. log.debug("waiting...");
  28. latch.await();
  29. log.debug("wait end...");
  30. } catch (InterruptedException e) {
  31. e.printStackTrace();
  32. }
  33. });
  34. }
  35. private static void test4() throws InterruptedException {
  36. CountDownLatch latch = new CountDownLatch(3);
  37. new Thread(() -> {
  38. log.debug("begin...");
  39. sleep(1);
  40. latch.countDown();
  41. log.debug("end...{}", latch.getCount());
  42. }).start();
  43. new Thread(() -> {
  44. log.debug("begin...");
  45. sleep(2);
  46. latch.countDown();
  47. log.debug("end...{}", latch.getCount());
  48. }).start();
  49. new Thread(() -> {
  50. log.debug("begin...");
  51. sleep(1.5);
  52. latch.countDown();
  53. log.debug("end...{}", latch.getCount());
  54. }).start();
  55. log.debug("waiting...");
  56. latch.await();
  57. log.debug("wait end...");
  58. }
  59. System.out.println(f1.get());
  60. System.out.println(f2.get());
  61. System.out.println(f3.get());
  62. System.out.println(f4.get());
  63. log.debug("执行完毕");
  64. service.shutdown();
  65. }

如果需要返回值可以使用future或者使用FutureTask和get获取结果

  1. private static void test3() throws InterruptedException, ExecutionException {
  2. RestTemplate restTemplate = new RestTemplate();
  3. log.debug("begin");
  4. ExecutorService service = Executors.newCachedThreadPool();
  5. CountDownLatch latch = new CountDownLatch(4);
  6. Future<Map<String,Object>> f1 = service.submit(() -> {
  7. Map<String, Object> response = restTemplate.getForObject("http://localhost:8080/order/{1}", Map.class, 1);
  8. return response;
  9. });
  10. Future<Map<String, Object>> f2 = service.submit(() -> {
  11. Map<String, Object> response1 = restTemplate.getForObject("http://localhost:8080/product/{1}", Map.class, 1);
  12. return response1;
  13. });
  14. Future<Map<String, Object>> f3 = service.submit(() -> {
  15. Map<String, Object> response1 = restTemplate.getForObject("http://localhost:8080/product/{1}", Map.class, 2);
  16. return response1;
  17. });
  18. Future<Map<String, Object>> f4 = service.submit(() -> {
  19. Map<String, Object> response3 = restTemplate.getForObject("http://localhost:8080/logistics/{1}", Map.class, 1);
  20. return response3;
  21. });

CyclicBarrier

概述

循环栅栏,用来进行线程协作,等待线程满足某个计数。构造时设置『计数个数』,每个线程执行到某个需要“同步”的时刻调用 await() 方法进行等待,当等待的线程数满足『计数个数』时,继续执行。

测试
  1. public static void main(String[] args) {
  2. ExecutorService service = Executors.newFixedThreadPool(3);
  3. CyclicBarrier barrier = new CyclicBarrier(2, ()-> {
  4. log.debug("task1, task2 finish...");
  5. });
  6. for (int i = 0; i < 3; i++) { // task1 task2 task1
  7. service.submit(() -> {
  8. log.debug("task1 begin...");
  9. sleep(1);
  10. try {
  11. barrier.await(); // 2-1=1
  12. } catch (InterruptedException | BrokenBarrierException e) {
  13. e.printStackTrace();
  14. }
  15. });
  16. service.submit(() -> {
  17. log.debug("task2 begin...");
  18. sleep(2);
  19. try {
  20. barrier.await(); // 1-1=0
  21. } catch (InterruptedException | BrokenBarrierException e) {
  22. e.printStackTrace();
  23. }
  24. });
  25. }
  26. service.shutdown();
  27. }

注意: CyclicBarrier 与 CountDownLatch 的主要区别在于 CyclicBarrier 是可以重用的 CyclicBarrier 可以被比喻为『人满发车』