1. CountDownLatch

简介

CountDownLatch其实可以把它看作一个计数器,只不过这个计数器的操作是原子操作,同时只能有一个线程去操作这个计数器,也就是同时只能有一个线程去减这个计数器里面的值。计数器的初始值是线程的数量。每当一个线程执行完毕后,计数器的值就-1,当计数器的值为0时,表示所有线程都执行完毕,然后在闭锁上等待的线程就可以恢复工作了。

CountDownLatch和CyclicBarrier区别: 1.countDownLatch是一个计数器,线程完成一个记录一个,计数器递减,只能只用一次 2.CyclicBarrier的计数器更像一个阀门,需要所有线程都到达,然后继续执行,计数器递增,提供reset功能,可以多次使用

应用场景:

有一个任务想要往下执行,但必须要等到其他的任务执行完毕后才可以继续往下执行。

源码分析

构造函数

  1. //参数count为计数值
  2. public CountDownLatch(int count) { };

方法

  1. //调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行
  2. public void await() throws InterruptedException { };
  3. //和await()类似,只不过等待一定的时间后count值还没变为0的话就会继续执行
  4. public boolean await(long timeout, TimeUnit unit) throws InterruptedException { };
  5. //将count值减1
  6. public void countDown() { };

测试用例

  1. import java.util.Random;
  2. import java.util.concurrent.ExecutorService;
  3. import java.util.concurrent.Executors;
  4. import java.util.concurrent.Semaphore;
  5. import java.util.concurrent.CountDownLatch;
  6. public class multiprocess {
  7. public static void main(String[] args) {
  8. ExecutorService pool = Executors.newCachedThreadPool();
  9. CountDownLatch cdl = new CountDownLatch(10);
  10. for (int i = 0; i < 10; i++) {
  11. CountRunnable runnable = new CountRunnable(cdl);
  12. pool.execute(runnable);
  13. }
  14. }
  15. }
  16. class CountRunnable implements Runnable {
  17. private final CountDownLatch countDownLatch;
  18. public CountRunnable(CountDownLatch countDownLatch) {
  19. this.countDownLatch = countDownLatch;
  20. }
  21. @Override
  22. public void run() {
  23. try {
  24. synchronized (countDownLatch) {
  25. /* 每次减少一个容量*/
  26. countDownLatch.countDown();
  27. System.out.println("thread counts = " + (countDownLatch.getCount()));
  28. }
  29. countDownLatch.await();
  30. System.out.println("concurrency counts = " + (10 - countDownLatch.getCount()));
  31. } catch (InterruptedException e) {
  32. e.printStackTrace();
  33. }
  34. }
  35. }
  36. // 测试结果为:
  37. thread counts = 9
  38. thread counts = 8
  39. thread counts = 7
  40. thread counts = 6
  41. thread counts = 5
  42. thread counts = 4
  43. thread counts = 3
  44. thread counts = 2
  45. thread counts = 1
  46. thread counts = 0
  47. concurrency counts = 10
  48. concurrency counts = 10
  49. concurrency counts = 10
  50. concurrency counts = 10
  51. concurrency counts = 10
  52. concurrency counts = 10
  53. concurrency counts = 10
  54. concurrency counts = 10
  55. concurrency counts = 10
  56. concurrency counts = 10
  57. 上面最开始的地方是因为在countDownLatch.getCount()中需要-1,因此打印的时候实际上是10-i,因为初始化的时候所允许的最大线程数就是10,下面concurrency counts是因为线程运行结束,此时计数器为010-0自然为10

CountDownLatch这个东西的计数器减一居然还要手动调用countDownLatch.countDown();

2. CyclicBarrier

简介

一个线程组的线程需要等待所有线程完成任务后再继续执行下一次任务

CountDownLatch和CyclicBarrier区别:

  • CountDownLatch是一个计数器,线程完成一个记录一个,计数器递减,只能只用一次
  • CyclicBarrier的计数器更像一个阀门,需要所有线程都到达,然后继续执行,计数器递增,提供reset功能,可以多次使用
  • CountDownLatch 是一次性的,CyclicBarrier 是可循环利用的
  • CountDownLatch计数为0无法重置,而CyclicBarrier计数达到初始值,则可以重置。

源码分析

成员

  1. //同步操作锁
  2. private final ReentrantLock lock = new ReentrantLock();
  3. //线程拦截器
  4. private final Condition trip = lock.newCondition();
  5. //每次拦截的线程数
  6. private final int parties;
  7. //换代前执行的任务
  8. private final Runnable barrierCommand;
  9. //表示栅栏的当前代
  10. private Generation generation = new Generation();
  11. //计数器
  12. private int count;
  13. //静态内部类Generation
  14. private static class Generation {
  15. boolean broken = false;
  16. }

构造函数

  1. // parties 是参与线程的个数
  2. public CyclicBarrier(int parties){
  3. this(parties, null);
  4. }
  5. // Runnable 参数,这个参数的意思是最后一个到达线程要做的任务
  6. public CyclicBarrier(int parties, Runnable barrierAction){
  7. if (parties <= 0) throw new IllegalArgumentException();
  8. this.parties = parties;
  9. this.count = parties;
  10. this.barrierCommand = barrierAction;
  11. }

方法

  1. // await 表示线程挂起
  2. public int await() throws InterruptedException, BrokenBarrierException{
  3. try {
  4. return dowait(false, 0L);
  5. } catch (TimeoutException toe) {
  6. throw new Error(toe);
  7. }
  8. }
  9. // BrokenBarrierException 表示栅栏已经被破坏,破坏的原因可能是其中一个线程 await() 时被中断或者超时
  10. public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException{
  11. return dowait(true, unit.toNanos(timeout));
  12. }
  13. //核心等待方法
  14. private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
  15. final ReentrantLock lock = this.lock;
  16. lock.lock();
  17. try {
  18. final Generation g = generation;
  19. //检查当前栅栏是否被打翻
  20. if (g.broken) {
  21. throw new BrokenBarrierException();
  22. }
  23. //检查当前线程是否被中断
  24. if (Thread.interrupted()) {
  25. //如果当前线程被中断会做以下三件事
  26. //1.打翻当前栅栏
  27. //2.唤醒拦截的所有线程
  28. //3.抛出中断异常
  29. breakBarrier();
  30. throw new InterruptedException();
  31. }
  32. //每次都将计数器的值减1
  33. int index = --count;
  34. //计数器的值减为0则需唤醒所有线程并转换到下一代
  35. if (index == 0) {
  36. boolean ranAction = false;
  37. try {
  38. //唤醒所有线程前先执行指定的任务
  39. final Runnable command = barrierCommand;
  40. if (command != null) {
  41. command.run();
  42. }
  43. ranAction = true;
  44. //唤醒所有线程并转到下一代
  45. nextGeneration();
  46. return 0;
  47. } finally {
  48. //确保在任务未成功执行时能将所有线程唤醒
  49. if (!ranAction) {
  50. breakBarrier();
  51. }
  52. }
  53. }
  54. //如果计数器不为0则执行此循环
  55. for (;;) {
  56. try {
  57. //根据传入的参数来决定是定时等待还是非定时等待
  58. if (!timed) {
  59. trip.await();
  60. }else if (nanos > 0L) {
  61. nanos = trip.awaitNanos(nanos);
  62. }
  63. } catch (InterruptedException ie) {
  64. //若当前线程在等待期间被中断则打翻栅栏唤醒其他线程
  65. if (g == generation && ! g.broken) {
  66. breakBarrier();
  67. throw ie;
  68. } else {
  69. //若在捕获中断异常前已经完成在栅栏上的等待, 则直接调用中断操作
  70. Thread.currentThread().interrupt();
  71. }
  72. }
  73. //如果线程因为打翻栅栏操作而被唤醒则抛出异常
  74. if (g.broken) {
  75. throw new BrokenBarrierException();
  76. }
  77. //如果线程因为换代操作而被唤醒则返回计数器的值
  78. if (g != generation) {
  79. return index;
  80. }
  81. //如果线程因为时间到了而被唤醒则打翻栅栏并抛出异常
  82. if (timed && nanos <= 0L) {
  83. breakBarrier();
  84. throw new TimeoutException();
  85. }
  86. }
  87. } finally {
  88. lock.unlock();
  89. }
  90. }
  91. // 返回目前正在等待障碍的各方的数量。
  92. int getNumberWaiting()
  93. // 查询这个障碍是否处于破碎状态。
  94. boolean isBroken()
  95. // 将屏障重置为初始状态。
  96. public void reset() {
  97. final ReentrantLock lock = this.lock;
  98. lock.lock();
  99. try {
  100. breakBarrier(); // break the current generation
  101. nextGeneration(); // start a new generation
  102. } finally {
  103. lock.unlock();
  104. }
  105. }
  106. //切换栅栏到下一代
  107. private void nextGeneration() {
  108. //唤醒条件队列所有线程
  109. trip.signalAll();
  110. //设置计数器的值为需要拦截的线程数
  111. count = parties;
  112. //重新设置栅栏代次
  113. generation = new Generation();
  114. }
  115. //打翻当前栅栏
  116. private void breakBarrier() {
  117. //将当前栅栏状态设置为打翻
  118. generation.broken = true;
  119. //设置计数器的值为需要拦截的线程数
  120. count = parties;
  121. //唤醒所有线程
  122. trip.signalAll();
  123. }

3. Semaphore

简介

Semaphore 是一个计数信号量,必须由获取它的线程释放。常用于限制可以访问某些资源的线程数量,例如通过 Semaphore 限流。

这就是个信号量。

Semaphore 只有3个操作:

  1. 初始化
  2. 增加
  3. 减少

    源码分析

    构造函数

    ```java // 参数permits表示许可数目,即同时可以允许多少线程进行访问,默认是非公平的 public Semaphore(int permits) { sync = new NonfairSync(permits); }

// 这个多了一个参数fair表示是否是公平的,即等待时间越久的越先获取许可 public Semaphore(int permits, boolean fair) { sync = (fair) ? new FairSync(permits) : new NonfairSync(permits); }

  1. <a name="fbCGw"></a>
  2. ### 方法
  3. ```java
  4. /* 会阻塞但不等待,立即返回的acquire方法 */
  5. // 尝试获取一个许可,若获取成功,则立即返回true,若获取失败,则立即返回false
  6. public boolean tryAcquire() { }
  7. // 尝试获取一个许可,若在指定的时间内获取成功,则立即返回true,否则则立即返回false
  8. public boolean tryAcquire(long timeout, TimeUnit unit)
  9. throws InterruptedException { }
  10. // 尝试获取permits个许可,若获取成功,则立即返回true,若获取失败,则立即返回false
  11. public boolean tryAcquire(int permits) { }
  12. // 尝试获取permits个许可,若在指定的时间内获取成功,则立即返回true,否则则立即返回false
  13. public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
  14. throws InterruptedException { }

4. Exchanger

简介

用于线程间交换数据。一个线程在完成一定的事务后想与另一个线程交换数据,则第一个先拿出数据的线程会一直等待第二个线程,直到第二个线程拿着数据到来时才能彼此交换对应数据。

源码分析

构造函数

  1. // 无参构造方法
  2. Exchanger()

方法

  1. // 等待另一个线程到达此交换点(除非当前线程被中断),然后将给定的对象传送给该线程,并接收该线程的对象。
  2. public V exchange(V x) throws InterruptedException;
  3. // 等待另一个线程到达此交换点(除非当前线程被中断或超出了指定的等待时间),然后将给定的对象传送给该线程,并接收该线程的对象。
  4. public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException;

5. ConcurrentHashMap

特点

  • ConcorrentHashMap 实现了 ConcorrentMap 接口,能在并发环境实现更高的吞吐量,而在单线程环境中只损失很小的性能;
  • 采用分段锁,使得任意数量的读取线程可以并发地访问 Map,一定数量的写入线程可以并发地修改 Map;
  • 不会抛出 ConcorrentModificationException,它返回迭代器具有“弱一致性”,即可以容忍并发修改,但不保证将修改操作反映给容器;
  • size() 的返回结果可能已经过期,只是一个估计值,不过 size() 和 isEmpty() 方法在并发环境中用的也不多;
  • 提供了许多原子的复合操作:
    • V putIfAbsent(K key, V value);:K 没有相应映射才插入
    • boolean remove(K key, V value);:K 被映射到 V 才移除
    • boolean replace(K key, V oldValue, V newValue);:K 被映射到 oldValue 时才替换为 newValue
  • 在构造的时候,Segment 的数量由所谓的 concurrentcyLevel 决定,默认是 16;
  • Segment 是基于 ReentrantLock 的扩展实现的,在 put 的时候,会对修改的区域加锁。

    分段锁实现原理

    分段锁: 不同线程在同一数据的不同部分上不会互相干扰,例如,ConcurrentHashMap 支持 16 个并发的写入器,是用 16 个锁来实现的。它的实现原理如下:

  • 使用了一个包含 16 个锁的数组,每个锁保护所有散列桶的 1/16,其中第 N 个散列桶由第(N % 16)个锁来保护;

  • 这大约能把对于锁的请求减少到原来的 1/16,也是 ConcurrentHashMap 最多能支持 16 个线程同时写入的原因;
  • 对于 ConcurrentHashMap 的 size() 操作,为了避免枚举每个元素,ConcurrentHashMap 为每个分段都维护了一个独立的计数,并通过每个分段的锁来维护这个值,而不是维护一个全局计数;

    相关操作注意事项

    关于 put 操作:

    • 是否需要扩容
      • 在插入元素前判断是否需要扩容,
      • 比 HashMap 的插入元素后判断是否需要扩容要好,因为可以插入元素后,Map 扩容,之后不再有新的元素插入,Map就进行了一次无效的扩容
    • 如何扩容

      • 先创建一个容量是原来的2倍的数组,然后将原数组中的元素进行再散列后插入新数组中
      • 为了高效,ConcurrentHashMap 只对某个 segment 进行扩容

        关于 size 操作:

    • 存在问题:如果不进行同步,只是计算所有 Segment 维护区域的 size 总和,那么在计算的过程中,可能有新的元素 put 进来,导致结果不准确,但如果对所有的 Segment 加锁,代价又过高。

    • 解决方法:重试机制,通过获取两次来试图获取 size 的可靠值,如果没有监控到发生变化,即 Segment.modCount 没有变化,就直接返回,否则获取锁进行操作。

      6. CopyOnWriteArrayList

      只要正确发布了这个 list,它就是不可变的了,所以随便并发访问,当需要修改时,就创建一个新的容器副本替代原来的,以实现可变性;

      7. BlockingQueue

      BlockingQueue详解与实现