JUC 介绍
- java.util.concurrent
- java.util.concurrent.atomic
- java.util.concurrent.locks
JUC 一般指 java.util.concurrent 包及其子包,其中包含 Java 对于并发编程的所有内容。下面内容介绍 java.util.concurrent 包下的内容。
线程安全的集合类
以 ArrayList 为例,ArrayList 并发操作集合元素是不安全的(多线程添加时会抛出 ConcurrentModificationException 并发修改异常)。可以尝试使用以下解决方案解决集合类的线程安全问题。
List<String> list = new Vector<>();
—— 使用线程安全的集合;List<String> list = Collections.synchronizedList(new ArrayList<>());
—— 转换线程安全集合;List<String> list = new CopyOnWriteArrayList<>();
—— 使用 JUC 包下的安全集合类;其他集合类如 Set、Map 也有类似实现
在 java.util.concurrent 包下,定义了一些支持并发的集合类:
- ConcurrentHashMap
- ConcurrentLinkedDeque、ConcurrentLinkedQueue
- CopyOnWriteArrayList、CopyOnWriteArraySet
并发辅助类
CountDownLatch
CountDownLatch 初始化一个计数器,执行 countDown 方法计数器会减一,在计数器归零之前,线程会阻塞在 await() 方法门口等待。
public void countDown()
:计数器 -1;public void await() throws InterruptedException
:阻塞直到计数器归零;
示例:
// 计数器
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
// 总数是6,必须要执行任务的时候,再使用!
CountDownLatch countDownLatch = new CountDownLatch(6);
for (int i = 1; i <=6 ; i++) {
new Thread(()->{
System.out.println(Thread.currentThread().getName()+" Go out");
countDownLatch.countDown(); // 数量-1
},String.valueOf(i)).start();
}
countDownLatch.await(); // 等待计数器归零,然后再向下执行
System.out.println("Close Door");
}
}
CyclicBarrier
CyclicBarrier 可以理解为一个加法计数器,调用 await() 方法的线程个数达到指定的 parties 时,才会调用指定的 barrierAction Runnable 接口任务。
构造器:public CyclicBarrier(int parties, Runnable barrierAction)
;
方法:public int await()
:告诉 CyclicBarrier 计数器加一;
示例:
public class CyclicBarrierDemo {
public static void main(String[] args) {
/**
* 集齐7颗龙珠召唤神龙
*/
// 召唤龙珠的线程
CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () -> {
System.out.println("召唤神龙成功!");
});
for (int i = 1; i <= 7; i++) {
final int temp = i;
// lambda能操作到 i 吗
new Thread(() -> {
System.out.println(
Thread.currentThread().getName() + "收集第 " + temp + " 个龙珠");
try {
cyclicBarrier.await(); // 等待
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
}
}
Semaphore
英[ˈseməfɔː(r)] | 美[ˈseməfɔːr] |
---|---|
Semaphore 计数信号量,构造器会初始化一个资源数量,使用 acquire() 方法阻塞抢占一个位置,通过 release() 方法释放一个位置。控制多条线程执行情况下并发的数量。
方法:
public void acquire()
:达到最大数量时会阻塞;public void release()
:信号量加一,唤醒等待的线程;
示例:
public class SemaphoreDemo {
public static void main(String[] args) {
// 线程数量:停车位! 限流!
Semaphore semaphore = new Semaphore(3);
for (int i = 1; i <= 6; i++) {
new Thread(()->{
// acquire() 得到
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName()+"抢到车位");
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName()+"离开车位");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release(); // release() 释放
}
}, String.valueOf(i)).start();
}
}
}
BlockingQueue —— 阻塞队列
BlockingQueue 接口的父接口为 Queue 和 Collection。
阻塞队列的含义:当队列满时,写入会阻塞;当队列空时,读取会阻塞。
四组API
方法\方式 | 会抛出异常 | 有返回值,不抛异常 | 阻塞等待 | 超时等待 |
---|---|---|---|---|
添加 | add | offer() | put | offer(E, long, TimeUnit) |
移除 | remove | poll() | take | poll(long, TimeUnit) |
检测队首元素 | element | peek | - | - |
SynchronousQueue
SynchronousQueue 同步队列,每个添加操作必须等待与之对应的取出操作,同步队列没有内部容量,不保存元素。
ForkJoin —— 分支合并
定义
ForkJoin 从 JDK 1.7 开始,将大任务拆分成小任务执行,体现了递归的思想。
特点:工作窃取,里面维护的是一个双端队列,例如线程 A、B 分别执行两个队列的任务,当线程 B 优先执行完,可以窃取 线程 A 正在执行队列尾部的任务执行,从而提高效率。
使用
- RecursiveAction 递归事件,没有返回值
- RecursiveTask 递归任务,有返回值
示例:Gitee
Future —— 异步回调
实现类
-
方法介绍
runAsync() 无返回值的异步回调
- supplyAsync() 有返回值的异步回调
- whenComplete() 正常回调方法
- exceptionally() 异常回调方法
-
示例
ExecutorService —— 线程池
基本方法
void execute(Runnable command);
:执行一个任务;<T> Future<T> submit(Callable<T> task);
:执行任务,获取结果;void shutdown();
:关闭线程池,有序等待任务结束;List<Runnable> shutdownNow();
:立即关闭线程池;boolean isShutdown();
:是否调用了 shutdown 方法;boolean isTerminated();
:如果关闭后所有任务都已完成,则返回 true;<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
<T> T invokeAny(Collection<? extends Callable<T>> tasks)