1. 使用场景
-
2. 使用线程池的好处
降低资源消耗,通过重复利用已创建的线程降低线程创建和销毁造成的消耗;
- 提高响应速度,当任务到达时,任务可以不需要的等到线程创建就能立即执行;
提高线程的可管理性,线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控;
3. jdk 自带线程池类图
Executor 接口
- 对于不同的 Executor 实现,execute() 方法可能是创建一个新线程并立即启动,也有可能是使用已有的工作线程来运行传入的任务,也可能是根据设置线程池的容量或者阻塞队列的容量来决定是否要将传入的线程放入阻塞队列中或者拒绝接收传入的线程;
- ExecutorService 接口
- ExecutorService 接口继承自 Executor 接口,提供了管理终止的方法,以及可为跟踪一个或多个异步任务执行状况而生成 Future 的方法。增加了 shutDown(),shutDownNow(),invokeAll(),invokeAny()和submit() 等方法。如果需要支持即时关闭,也就是 shutDownNow() 方法,则任务需要正确处理中断;
ScheduledExecutorService 接口
构造函数
public ThreadPoolExecutor(
int corePoolSize, // 队列没满时,线程最大并发数
int maximumPoolSize, // 队列满后线程能够达到的最大并发数
long keepAliveTime, // 空闲线程过多久被回收的时间限制
TimeUnit unit, // keepAliveTime 的时间单位
BlockingQueue<Runnable> workQueue, // 阻塞的队列类型
ThreadFactory threadFactory,
RejectedExecutionHandler handler // 超出 maximumPoolSizes + workQueue 时,任务会交给RejectedExecutionHandler来处理
)
向线程池提交任务
补充: RunnableTaskQueue 用于保存等待执行的任务的阻塞队列
- ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO原则对元素进行排序;
- LinkedBlockingQueue:一个基于链表结构的无界阻塞队列,此队列按FIFO排序元素,吞吐量通常要高于ArrayBlockingQueue,应用: nexFixedThreadPool;
- SynchronousQueue:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于 LinkedBlockingQueue;
- PriorityBlockingQueue:一个具有优先级得无限阻塞队列;
4.2. newFixedThreadPool
实现:创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待;
构造函数
public static ExecutorService newFixedThreadPool(int nThreads){
return new ThreadPoolExecutor(
nThreads, // corePoolSize
nThreads, // maximumPoolSize == corePoolSize
0L, // 空闲时间限制是 0
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>() // 无界阻塞队列
);
}
ExecutorService fixPool = Executors.newFixedThreadPool(corePoolSize);
-
4.3. newCachedThreadPool
实现: 创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收则新建线程;
构造函数
public static ExecutorService newCachedThreadPool(){
return new ThreadPoolExecutor(
0, // corePoolSoze == 0
Integer.MAX_VALUE, // maximumPoolSize 非常大
60L, // 空闲判定是60 秒
TimeUnit.SECONDS,
// 神奇的无存储空间阻塞队列,每个 put 必须要等待一个 take
new SynchronousQueue<Runnable>()
);
}
ExecutorService cachePool = Executors.newCachedThreadPool();
向线程池提交任务
4.4. newSingleThreadExecutor
- 实现:创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行;
构造函数
public static ExecutorService newSingleThreadExecutor() {
return
new FinalizableDelegatedExecutorService
(
new ThreadPoolExecutor
(
1,
1,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory
)
);
}
ExecutorService singPool = Executors.newSingleThreadExecutor();
除了多了个 FinalizableDelegatedExecutorService 代理,其初始化和 newFiexdThreadPool 的 nThreads = 1 的时候是一样的。 区别在于:
定时线程池,该线程池可用于周期性地去执行任务,通常用于周期性的同步数据;
public static ScheduledExecutorService newScheduledThreadPool(int var0) {
return new ScheduledThreadPoolExecutor(var0);
}
public static ScheduledExecutorService newScheduledThreadPool(int var0, ThreadFactory var1) {
return new ScheduledThreadPoolExecutor(var0, var1);
}
。使用
- scheduleAtFixedRate:以固定频率执行任务,周期是指每次执行任务成功执行之间的间隔;
- schedultWithFixedDelay:以固定延时执行任务,延时是指上一次执行成功后和下一次开始执行前的时间;
public class ScheduledExecutorServiceDemo {
public static void main(String args[]) {
ScheduledExecutorService ses = Executors.newScheduledThreadPool(10);
ses.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(4000);
System.out.println(Thread.currentThread().getId() + "执行了");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, 0, 2, TimeUnit.SECONDS);
}
}
5. 其他
5.1 关闭线程池
shutdown : 不会立即终止线程池,而是将线程池的状态设置成 SHUTDOWN 状态,然后中断所有没有正在执行任务的线程,继续将缓存队列中的人物执行完后才终止;;
- shutdownNow :
- 将线程池的状态设置成 STOP ;
- 遍历线程池中的工作线程,逐个调用线程的 interrupt 方法来中断线程;
- 无法响应中断的任务可能永远无法终止;
- 总结:立即终止线程池,并尝试打断正在执行的任务,清空任务缓存队列,返回尚未执行的任务;
所有任务均已关闭,说明线程池关闭成功,调用 isTerminaed 方法会返回 true;
5.2 监控线程池
taskCount:线程池需要执行的任务数量;
- completedTaskCount:线程池在运行过程中已完成的任务数量,不大于 taskCount;
- largestPoolSize:线程池曾创建的最大线程数量,通过此参数可判断线程池是否满过;
- getPoolSize:线程池的线程数量;若线程池不销毁,池里线程不会自动销毁;
-
5.3 提交任务 execute 与 submit 的区别
execute 方法无返回值,无法判断任务是否被线程池执行成功;
submit 方法会返回一个 future 对象,future.get() 方法获取返回值;
6. 如何选择
6.1 如何选择线程池数量
线程池的大小决定着系统的性能,过大或者过小的线程池数量都无法发挥最优的系统性能;
- 考虑因素: CPU 的数量,内存大小,任务类型(eg. 计算密集型、IO密集型等)
- 一般公式 ```java NCPU = CPU的数量 UCPU = 期望对CPU的使用率 0 ≤ UCPU ≤ 1 W/C = 等待时间与计算时间的比率
如果希望处理器达到理想的使用率,那么线程池的最优大小为: 线程池大小= NCPU *UCPU(1+W/C)
int ncpus = Runtime.getRuntime().availableProcessors();//获取CPU的数量
<a name="aS4el"></a>
### 6.2 线程池创建方式
- 线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式【阿里编码规范】;
- Executors 各个方法的弊端:
- newFixedThreadPool 和 newSingleThreadExecutor
- 主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至OOM;
- newCachedThreadPool和newScheduledThreadPool
- 主要问题是线程数最大数是 Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至OOM;
<a name="KNfgJ"></a>
## 7. 手写一个简单线程池
```java
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
/** 参考链接: https://yq.aliyun.com/articles/680584
* 简单的手写线程池,貌似是针对 coreSize = maxSize 做的处理,而对 maxSize > coreSize 的情况没有处理,
* 目前运行无终止,后续继续优化;
*/
public class testThreadPool {
public static void main(String[] args ){
CustomThreadPool();
}
public static void CustomThreadPool(){
TestPool testPool = new TestPool(2,2,new ArrayBlockingQueue<>(3));
for (int i = 0; i < 100; i++) {
final int j = i;
System.out.println("i = " + i + " " + Thread.currentThread().getName());
testPool.execute(()->{
try {
Thread.sleep(100);
System.out.println("睡 0.1 秒,完成 :" + j);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
}
class TestPool{
private int coreSize;
private int maxSize;
private AtomicInteger running = new AtomicInteger(0);
private BlockingQueue<Runnable> queue;
public TestPool(int coreSize, int maxSize, ArrayBlockingQueue<Runnable> queue){
this.coreSize = coreSize;
this.maxSize = maxSize;
this.queue = queue;
}
public void execute(Runnable runnable){
if (running.get() < coreSize) {
if (!addWorker(runnable)) {
reject();
}
} else {
System.out.println("当前队列大小: " + queue.size());
if (!queue.offer(runnable)) {
System.out.println("offer失败,当前线程数: " + running.get());//offer是有返回boolean类型的,put()不可以;
if (!addWorker(runnable)) {
reject();
}
}
}
}
private void reject(){
throw new RuntimeException("超出大小,线程数: " + running.get() + " , 队列大小 : " + queue.size());
}
private boolean addWorker(Runnable runnable){
if (running.get() >= maxSize) {
return false;
}
Worker worker = new Worker(runnable);
worker.start();
return true;
}
private class Worker extends Thread{
private Runnable runnable;
public Worker(Runnable runnable){
this.runnable = runnable;
System.out.println("创建线程,当前线程数 : " + running.incrementAndGet());
}
@Override
public void run() {
try{
while (true) {
runnable.run();
System.out.println("运行结束,当前线程数 : " + running.get());
if (running.get() > coreSize) {
break;
} else {
try {
System.out.println("000000: 队列大小" + queue.size());
runnable = queue.take();
System.out.println("11111111: 队列大小" + queue.size());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
} finally {
running.decrementAndGet();
System.out.println("结束线程,当前线程数: " + running.get());
}
}
}
}
8. ThreadPool 与 ForkJoinPool
参考