JUC 介绍
- java.util.concurrent
- java.util.concurrent.atomic
- java.util.concurrent.locks
JUC 一般指 java.util.concurrent 包及其子包,其中包含 Java 对于并发编程的所有内容。下面内容介绍 java.util.concurrent 包下的内容。
线程安全的集合类
以 ArrayList 为例,ArrayList 并发操作集合元素是不安全的(多线程添加时会抛出 ConcurrentModificationException 并发修改异常)。可以尝试使用以下解决方案解决集合类的线程安全问题。
List<String> list = new Vector<>();—— 使用线程安全的集合;List<String> list = Collections.synchronizedList(new ArrayList<>());—— 转换线程安全集合;List<String> list = new CopyOnWriteArrayList<>();—— 使用 JUC 包下的安全集合类;其他集合类如 Set、Map 也有类似实现
在 java.util.concurrent 包下,定义了一些支持并发的集合类:
- ConcurrentHashMap
- ConcurrentLinkedDeque、ConcurrentLinkedQueue
- CopyOnWriteArrayList、CopyOnWriteArraySet
并发辅助类
CountDownLatch
CountDownLatch 初始化一个计数器,执行 countDown 方法计数器会减一,在计数器归零之前,线程会阻塞在 await() 方法门口等待。
public void countDown():计数器 -1;public void await() throws InterruptedException:阻塞直到计数器归零;
示例:
// 计数器public class CountDownLatchDemo {public static void main(String[] args) throws InterruptedException {// 总数是6,必须要执行任务的时候,再使用!CountDownLatch countDownLatch = new CountDownLatch(6);for (int i = 1; i <=6 ; i++) {new Thread(()->{System.out.println(Thread.currentThread().getName()+" Go out");countDownLatch.countDown(); // 数量-1},String.valueOf(i)).start();}countDownLatch.await(); // 等待计数器归零,然后再向下执行System.out.println("Close Door");}}
CyclicBarrier
CyclicBarrier 可以理解为一个加法计数器,调用 await() 方法的线程个数达到指定的 parties 时,才会调用指定的 barrierAction Runnable 接口任务。
构造器:public CyclicBarrier(int parties, Runnable barrierAction);
方法:public int await():告诉 CyclicBarrier 计数器加一;
示例:
public class CyclicBarrierDemo {public static void main(String[] args) {/*** 集齐7颗龙珠召唤神龙*/// 召唤龙珠的线程CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () -> {System.out.println("召唤神龙成功!");});for (int i = 1; i <= 7; i++) {final int temp = i;// lambda能操作到 i 吗new Thread(() -> {System.out.println(Thread.currentThread().getName() + "收集第 " + temp + " 个龙珠");try {cyclicBarrier.await(); // 等待} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}}).start();}}}
Semaphore
| 英[ˈseməfɔː(r)] | 美[ˈseməfɔːr] |
|---|---|
Semaphore 计数信号量,构造器会初始化一个资源数量,使用 acquire() 方法阻塞抢占一个位置,通过 release() 方法释放一个位置。控制多条线程执行情况下并发的数量。
方法:
public void acquire():达到最大数量时会阻塞;public void release():信号量加一,唤醒等待的线程;
示例:
public class SemaphoreDemo {public static void main(String[] args) {// 线程数量:停车位! 限流!Semaphore semaphore = new Semaphore(3);for (int i = 1; i <= 6; i++) {new Thread(()->{// acquire() 得到try {semaphore.acquire();System.out.println(Thread.currentThread().getName()+"抢到车位");TimeUnit.SECONDS.sleep(2);System.out.println(Thread.currentThread().getName()+"离开车位");} catch (InterruptedException e) {e.printStackTrace();} finally {semaphore.release(); // release() 释放}}, String.valueOf(i)).start();}}}
BlockingQueue —— 阻塞队列
BlockingQueue 接口的父接口为 Queue 和 Collection。
阻塞队列的含义:当队列满时,写入会阻塞;当队列空时,读取会阻塞。
四组API
| 方法\方式 | 会抛出异常 | 有返回值,不抛异常 | 阻塞等待 | 超时等待 |
|---|---|---|---|---|
| 添加 | add | offer() | put | offer(E, long, TimeUnit) |
| 移除 | remove | poll() | take | poll(long, TimeUnit) |
| 检测队首元素 | element | peek | - | - |
SynchronousQueue
SynchronousQueue 同步队列,每个添加操作必须等待与之对应的取出操作,同步队列没有内部容量,不保存元素。
ForkJoin —— 分支合并
定义
ForkJoin 从 JDK 1.7 开始,将大任务拆分成小任务执行,体现了递归的思想。
特点:工作窃取,里面维护的是一个双端队列,例如线程 A、B 分别执行两个队列的任务,当线程 B 优先执行完,可以窃取 线程 A 正在执行队列尾部的任务执行,从而提高效率。
使用
- RecursiveAction 递归事件,没有返回值
- RecursiveTask 递归任务,有返回值
示例:Gitee
Future —— 异步回调
实现类
-
方法介绍
runAsync() 无返回值的异步回调
- supplyAsync() 有返回值的异步回调
- whenComplete() 正常回调方法
- exceptionally() 异常回调方法
-
示例
ExecutorService —— 线程池
基本方法
void execute(Runnable command);:执行一个任务;<T> Future<T> submit(Callable<T> task);:执行任务,获取结果;void shutdown();:关闭线程池,有序等待任务结束;List<Runnable> shutdownNow();:立即关闭线程池;boolean isShutdown();:是否调用了 shutdown 方法;boolean isTerminated();:如果关闭后所有任务都已完成,则返回 true;<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)<T> T invokeAny(Collection<? extends Callable<T>> tasks)
