1)ThreadPoolExecutor的状态
状态与线程数量用一个int来表示,状态占高三位,其他二十九位为线程数量,其中状态有几种:
① Running(111):刚创建时的状态,表示线程池正在运行;
② Shutdown(000):当前任务队列中的任务会继续执行,但不会接收新任务;
③ Stop(001):中断执行的任务,任务队列的任务也会被丢弃;
④ Tidying(010):表示当前任务全部执行完毕,活动线程为0,即将进入终结状态;
⑤ Terminated(011):终结状态;
为什么用一个int表示?
他们存储在ctl变量中,保证原子性,一次cas操作就可以完成赋值。
2)ThreadPoolExecutor的参数
int corePoolSize, // 核心线程数
int maximumPoolSize, // 总线程数 其中包括核心线程和救急线程
long keepAliveTime, // 救急线程存活时间
TimeUnit unit, // 作为救急线程存活时间的单位
BlockingQueue workQueue, // 工作队列 为阻塞队列
ThreadFactory factory, // 线程工厂 可以自定义线程名
RejectedExecutionHandler handler // 线程数达最大时 对新任务的拒绝策略
3)执行流程
线程池创建完并不会同时创建线程,而是采用懒惰创建的方式,等到第一次有任务请求进来才去创建线程;如果当前池中无空闲线程又有新任务进来,会被传送到工作队列中排队,直到有空闲线程;现在假设工作队列都被填满了,如果开启了有界队列,则会启用救急线程去处理任务;如果没有开启有界队列,则会根据拒绝策略去处理;
处理策略主要有:
①AbortPolicy:让调用者抛异常RejectedExecutionException【默认】;
②CallerRunsPolicy:让调用者运行任务;
③DiscardPolicy:放弃本次任务;
④DiscardOldestPolicy:放弃队列中最早的任务,新任务取代;
4)Executors封装的三种类型线程池
① FixedThreadPool
// 传入的参数代表核心线程的数量
ExecutorService pool1 = Executors.newFixedThreadPool(2);
pool1.execute(() -> log.debug("doing!!"));
这种模式下,总线程等于核心线程,也就是说没有救急线程的存在,使用的队列为阻塞无界队列,适用于任务量可预知但任务执行时间较长的情况。
② CachedThreadPool
ExecutorService pool2 = Executors.newCachedThreadPool();
pool2.execute(()->log.debug("doing on cached pool !!!"));
这种模式下,全部线程都是救急线程,并且基本无创建上限【Integer的上界】,救急线程空闲六十秒会被回收,使用的队列是 SynchronousQueue 特点是需要有线程访问任务才会入队,否则调用者会阻塞等待,适用于任务量较大且任务执行时间较短的情况。
③ SingleThreadExecutor
ExecutorService pool3 = Executors.newSingleThreadExecutor();
pool3.execute(() ->log.debug("doing on single thread!!!"));
这种模式下,创建的是只有一个线程的线程池,使用阻塞的无界队列,适用于需要串行执行任务的情况【任务一失败不影响任务二的情况】。
在构造方法中使用装饰器模式对线程池对象创建进行封装,有别于直接调用Executors的方法 newFixedThreadPool(1) ,前者受保护线程数量不会被修改掉,后者运行后还可以修改线程的数量。
5)改变线程任务状态的方式
① execute:直接执行线程任务,无返回值;
② submit:可以传入Callable对象,接受一个Future类型的返回值;
③ invokeAll:批量执行线程任务,传入Callable对象集合;
④ invokeAny:传入Callable对象集合,只要其中一个任务执行,流程就算结束;
⑤ shutdown:相当于将状态调整至Shutdown,会等待当前任务、队列中所有任务执行完毕才终结线程池;
⑥ shutdownNow:相当于将状态调整至Stop,会使用interrupt中断正在运行的任务,将任务队列中的任务返回;
6)工作线程模式
简单来说就是让有限的线程资源去异步轮询执行操作,每个线程池负责自己的那部分工作,这里面也体现了享元模式的设计。
// 饥饿情况,如果空闲线程不足时,会停下等待线程池分配线程再往后执行
// 实际情况就是两个都无法运行下去
ExecutorService pool = Executors.newFixedThreadPool(2);
pool.submit(()->{
log.debug("preparing cooking");
Future<Boolean> res = pool.submit(() -> {
log.debug("start cooking");
return Boolean.TRUE;
});
try {
if (res.get()) {
log.debug("now you can eating");
}
} catch (Exception e) {
e.printStackTrace();
}
});
pool.submit(()->{
log.debug("preparing cooking");
Future<Boolean> res = pool.submit(() -> {
log.debug("start cooking");
return Boolean.TRUE;
});
try {
if (res.get()) {
log.debug("now you can eating");
}
} catch (Exception e) {
e.printStackTrace();
}
});
解决手段,让每个线程池负责某一部分工作,多个线程池分工合作:
// let them to do a part of process rather than all of process.
// create one pool to prepare food and the other to cook
ExecutorService poolToPrepare = Executors.newFixedThreadPool(2);
ExecutorService poolToCook = Executors.newFixedThreadPool(2);
poolToPrepare.submit(()->{
log.debug("preparing food");
Future<Boolean> res = poolToCook.submit(() -> {
log.debug("start cooking");
return Boolean.TRUE;
});
try {
if (res.get()) {
log.debug("now you can eating");
}
} catch (Exception e) {
e.printStackTrace();
}
});
poolToPrepare.submit(()->{
log.debug("preparing food");
Future<Boolean> res = poolToCook.submit(() -> {
log.debug("start cooking");
return Boolean.TRUE;
});
try {
if (res.get()) {
log.debug("now you can eating");
}
} catch (Exception e) {
e.printStackTrace();
}
});
7)线程数量的考量
IO密集型:IO操作占整体操作的大部分时间,这种情况下需要保证在进行IO操作时CPU不闲着,也就是一些线程正在进行IO操作时,另一些线程应该马上利用CPU计算,因此在这种环境下,推荐线程数= CPU核数*(1 + (IO操作耗时/CPU操作耗时) );
CPU密集型:纯CPU操作较多的情况,需要发挥CPU的全部性能,一核一线程的形式就可以保证任务的高效运行,不发生线程上下文切换。因此在这种环境下,推荐线程数=CPU核数+1,加一是为了避免操作系统内存页失效导致的阻塞;
8)任务调度型线程池
有点类似定时任务执行,线程池中的任务可以带有间隔时间,延时时间。
① newScheduledThreadPool—schedule
// 传参代表核心线程数量
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
// 添加两个任务,在 1s 后执行
// 这里的schedule方法实际上就是代表需要在将来那个时间运行
executor.schedule(() -> {
System.out.println("job1 start --" + new Date());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, 1000, TimeUnit.MILLISECONDS);
executor.schedule(() -> {
System.out.println("job2 start --" + new Date());
}, 1000, TimeUnit.MILLISECONDS);
// 输出
job2 start --Wed Mar 02 01:11:33 CST 2022
job1 start --Wed Mar 02 01:11:33 CST 2022
这里可以看到任务一的睡眠时间是不会影响到任务二的。
② newScheduledThreadPool— scheduleAtFixedRate
ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
System.out.println("ready --" + new Date());
pool.scheduleAtFixedRate(() -> {
System.out.println("running --" + new Date());
}, 1, 1, TimeUnit.SECONDS);
// 第一个1代表1s后运行 第二个1代表周期为1s
// 输出
ready --Wed Mar 02 01:18:31 CST 2022
running --Wed Mar 02 01:18:32 CST 2022
running --Wed Mar 02 01:18:33 CST 2022
.....
可以看到使用该方法启动的线程在给定延迟时间后会周期性运行的。
9)线程池线程异常处理
假设在主线程创建一个线程池,其中线程发生的异常在主线程是无法感知到的,而关于异常的处理方式有两种,第一种是在线程内处理掉或者抛出,第二种是主线程使用submit+Callable创建线程,设定一个返回值,如果线程成功执行则返回true,之后主线程去获取返回值,如果线程内发生了异常,这时候主线程也会感知到。