JUC 介绍

  • java.util.concurrent
  • java.util.concurrent.atomic
  • java.util.concurrent.locks

JUC 一般指 java.util.concurrent 包及其子包,其中包含 Java 对于并发编程的所有内容。下面内容介绍 java.util.concurrent 包下的内容。

线程安全的集合类

以 ArrayList 为例,ArrayList 并发操作集合元素是不安全的(多线程添加时会抛出 ConcurrentModificationException 并发修改异常)。可以尝试使用以下解决方案解决集合类的线程安全问题。

  1. List<String> list = new Vector<>(); —— 使用线程安全的集合;
  2. List<String> list = Collections.synchronizedList(new ArrayList<>()); —— 转换线程安全集合;
  3. List<String> list = new CopyOnWriteArrayList<>(); —— 使用 JUC 包下的安全集合类;

    其他集合类如 Set、Map 也有类似实现

在 java.util.concurrent 包下,定义了一些支持并发的集合类:

  1. ConcurrentHashMap
  2. ConcurrentLinkedDeque、ConcurrentLinkedQueue
  3. CopyOnWriteArrayList、CopyOnWriteArraySet

这些类在多线程环境下能保证线程安全,支持并发。

并发辅助类

CountDownLatch

CountDownLatch 初始化一个计数器,执行 countDown 方法计数器会减一,在计数器归零之前,线程会阻塞在 await() 方法门口等待。

  1. public void countDown():计数器 -1;
  2. public void await() throws InterruptedException:阻塞直到计数器归零;

示例:

  1. // 计数器
  2. public class CountDownLatchDemo {
  3. public static void main(String[] args) throws InterruptedException {
  4. // 总数是6,必须要执行任务的时候,再使用!
  5. CountDownLatch countDownLatch = new CountDownLatch(6);
  6. for (int i = 1; i <=6 ; i++) {
  7. new Thread(()->{
  8. System.out.println(Thread.currentThread().getName()+" Go out");
  9. countDownLatch.countDown(); // 数量-1
  10. },String.valueOf(i)).start();
  11. }
  12. countDownLatch.await(); // 等待计数器归零,然后再向下执行
  13. System.out.println("Close Door");
  14. }
  15. }

CyclicBarrier

CyclicBarrier 可以理解为一个加法计数器,调用 await() 方法的线程个数达到指定的 parties 时,才会调用指定的 barrierAction Runnable 接口任务。
构造器:public CyclicBarrier(int parties, Runnable barrierAction)
方法:public int await():告诉 CyclicBarrier 计数器加一;
示例:

  1. public class CyclicBarrierDemo {
  2. public static void main(String[] args) {
  3. /**
  4. * 集齐7颗龙珠召唤神龙
  5. */
  6. // 召唤龙珠的线程
  7. CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () -> {
  8. System.out.println("召唤神龙成功!");
  9. });
  10. for (int i = 1; i <= 7; i++) {
  11. final int temp = i;
  12. // lambda能操作到 i 吗
  13. new Thread(() -> {
  14. System.out.println(
  15. Thread.currentThread().getName() + "收集第 " + temp + " 个龙珠");
  16. try {
  17. cyclicBarrier.await(); // 等待
  18. } catch (InterruptedException e) {
  19. e.printStackTrace();
  20. } catch (BrokenBarrierException e) {
  21. e.printStackTrace();
  22. }
  23. }).start();
  24. }
  25. }
  26. }

Semaphore

英[ˈseməfɔː(r)] 美[ˈseməfɔːr]

Semaphore 计数信号量,构造器会初始化一个资源数量,使用 acquire() 方法阻塞抢占一个位置,通过 release() 方法释放一个位置。控制多条线程执行情况下并发的数量。
方法:

  1. public void acquire():达到最大数量时会阻塞;
  2. public void release():信号量加一,唤醒等待的线程;

示例:

  1. public class SemaphoreDemo {
  2. public static void main(String[] args) {
  3. // 线程数量:停车位! 限流!
  4. Semaphore semaphore = new Semaphore(3);
  5. for (int i = 1; i <= 6; i++) {
  6. new Thread(()->{
  7. // acquire() 得到
  8. try {
  9. semaphore.acquire();
  10. System.out.println(Thread.currentThread().getName()+"抢到车位");
  11. TimeUnit.SECONDS.sleep(2);
  12. System.out.println(Thread.currentThread().getName()+"离开车位");
  13. } catch (InterruptedException e) {
  14. e.printStackTrace();
  15. } finally {
  16. semaphore.release(); // release() 释放
  17. }
  18. }, String.valueOf(i)).start();
  19. }
  20. }
  21. }

BlockingQueue —— 阻塞队列

BlockingQueue 接口的父接口为 Queue 和 Collection。
image.png
阻塞队列的含义:当队列满时,写入会阻塞;当队列空时,读取会阻塞。

四组API

方法\方式 会抛出异常 有返回值,不抛异常 阻塞等待 超时等待
添加 add offer() put offer(E, long, TimeUnit)
移除 remove poll() take poll(long, TimeUnit)
检测队首元素 element peek - -

SynchronousQueue

SynchronousQueue 同步队列,每个添加操作必须等待与之对应的取出操作,同步队列没有内部容量,不保存元素。

ForkJoin —— 分支合并

定义

ForkJoin 从 JDK 1.7 开始,将大任务拆分成小任务执行,体现了递归的思想。

特点:工作窃取,里面维护的是一个双端队列,例如线程 A、B 分别执行两个队列的任务,当线程 B 优先执行完,可以窃取 线程 A 正在执行队列尾部的任务执行,从而提高效率。

使用

  1. RecursiveAction 递归事件,没有返回值
  2. RecursiveTask 递归任务,有返回值

示例:Gitee

Future —— 异步回调

异步调用,成功回调、失败回调

实现类

  1. CompletableFuture

    方法介绍

  2. runAsync() 无返回值的异步回调

  3. supplyAsync() 有返回值的异步回调
  4. whenComplete() 正常回调方法
  5. exceptionally() 异常回调方法
  6. get() 获取异步回调执行完成后的结果

    示例

    Gitee

    ExecutorService —— 线程池

    image.png

    基本方法

  7. void execute(Runnable command);:执行一个任务;

  8. <T> Future<T> submit(Callable<T> task);:执行任务,获取结果;
  9. void shutdown();:关闭线程池,有序等待任务结束;
  10. List<Runnable> shutdownNow();:立即关闭线程池;
  11. boolean isShutdown();:是否调用了 shutdown 方法;
  12. boolean isTerminated();:如果关闭后所有任务都已完成,则返回 true;
  13. <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
  14. <T> T invokeAny(Collection<? extends Callable<T>> tasks)