JDK提供的线程池结构:
1) 线程池状态
ThreadPoolExecutor 使用 int的高3位来表示线程池状态,低29位表示线程数量
从数字上比较, TERMINATED> TIDYING> STOP> SHUTDOWN >RUNNING
线程池状态信息(线程池状态+线程数量)存储在一个原子变量ctl中,目的是将线程池状态与线程个数合二为一,且保证对ctl的操作原子化,可以用一次cas原子操作进行赋值以改变线程池信息:
// C为旧值,ctlOf返回结果为新值
ctl.compareAndSet(c,ctlOf(targetState,workerCountOf(c)));
// rs为高3位代表线程池状态,wc为低29位代表线程个数,ctl是合并它们
private static int ctlOf(int rs, int wc){ return rs| wc;}
2) 构造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long KeepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
- corePoolSize 核心线程数目(最多保留的线程数)
- maximumPoolSize 最大线程数目
- keepAliveTime 生存时间-针对救急线程,指定救急线程无任务可做后,多久销毁
- unit 时间单位-针对救急线程,销毁时间的单位
- workQueue 阻塞队列
- threadFactory 线程工厂-可以为线程创建时起一个好名字
- handler 拒绝策略
与自定义线程池做区分:
1、线程池分为两种类型的线程:核心线程、救急线程
2、corePoolSize指线程池可以容纳的最大核心线程数目,maximumPoolSize指线程池可以容纳两种线程的最大数量,maximumPoolSize - corePoolSize == 救急线程数
3、当有新任务,核心线程优先处理任务,如果核心线程已经达到coreSize,任务被存进阻塞队列中。若阻塞队列已满,则新任务会被交给救急线程处理。当线程池中线程数量达到maximumPoolSize无法再处理新任务,这时才会执行拒绝策略。
4、核心线程没有生存时间限制,处理完任务后仍会被保存在线程池中(注意一般线程在执行完run方法后就被销毁,但线程池实现策略使得线程池中的核心线程一直保留在线程池中);而救急线程有生存时间限制,处理完任务后会被立即销毁掉(外包人员emoj)、
![image.png](https://cdn.nlark.com/yuque/0/2022/png/23190196/1647747286179-53abe3d2-215e-4b78-b2da-31ce97df9fcc.png#clientId=u25df96f1-3111-4&from=paste&height=471&id=u5fbf05ca&margin=%5Bobject%20Object%5D&name=image.png&originHeight=831&originWidth=806&originalType=binary&ratio=1&size=107830&status=done&style=none&taskId=u4ed5dd40-7e0c-4600-96fe-0fb05b61a4b&width=456.9747314453125)
线程池运行流程:
- 线程池中刚开始没有线程,当一个任务提交给线程池后,线程池会创建一个新线程来执行任务
- 当线程数达到 corePoolSize 并没有线程空闲,这时再加入任务,新加的任务会被加入workQueue队列排队,直到有空闲的线程
- 如果队列选择有界队列,那么任务超过了队列大小时,会创建 maximumPoolSize - corePoolSize数目的线程来救急
如果线程到达 maximumPoolSize 仍然有新任务这时会执行拒绝策略,jdk提供了4种拒绝策略的实现,其它著名框架也提供了实现
- AbortPolicy,让调用者抛出RejectedExecutionException异常,这是默认策略
- CallerRunsPolicy,让调用者运行任务
- DiscardPolicy,放弃本次任务
- DiscardOldestPolicy,放弃队列中最早的任务,本任务取而代之
- Dubbo的实现,抛出RejectedExecutionException异常之前会记录日志,并dump线程栈信息,方便定位问题
- Netty的实现,创建一个新线程来执行任务
- ActiveMQ的实现,带超时等待(60s)尝试放入队列,类似之前自定义的拒绝策略
- PinPoint的实现,它使用了一个拒绝策略链,会逐一尝试策略链中每种拒绝策略![image.png](https://cdn.nlark.com/yuque/0/2022/png/23190196/1647757825814-e7551c62-c45f-4fe0-9fd4-c450d629f20d.png#clientId=u3cc09a9a-9541-4&from=paste&height=143&id=ub956118e&margin=%5Bobject%20Object%5D&name=image.png&originHeight=206&originWidth=1070&originalType=binary&ratio=1&size=117349&status=done&style=none&taskId=ud59526fe-40a9-4bb3-abd9-1fa900b2219&width=744.9957885742188)
当高峰过去后,超过corePoolSize的救急线程如果一段时间没有任务做,需要结束节省资源,这个时间由 keepAliveTime 和 unit 来控制。
根据ThreadPoolExecutor构造方法,JDK Executors类中提供了众多工厂方法来创建各种用途的线程池,实际是对ThreadPoolExecutor构造方法的封装,通过指定构造方法ThreadPoolExecutor中不同的参数,来创建不同功能的线程池。
3) newFixedThreadPool
线程数量固定的线程池:
public static ExecutorService newFixedThreadPool(int nThreads){
return new ThreadPoolExecutor(nThread,nThread,0L,TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}
特点
- 核心线程数==最大线程数,无救急线程,因此也无需设置超时时间
- 阻塞队列是无界的,可以存放任意数量的任务
- 适用于任务量已知、相对耗时的任务
newFixedThreadPool线程池的使用:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class TestThreadPoolExecutors {
public static void main(String[] args) {
ExecutorService pool = Executors.newFixedThreadPool(2);
pool.execute(()->{
System.out.println(1);
});
pool.execute(()->{
System.out.println(1);
});
pool.execute(()->{
System.out.println(1);
});
}
}
线程池的execute方法,传入Runnable对象,因为Runnable是函数式接口,使用lambda表达式直接创建其实现类对象即可。
调用一次execute方法则相当于向线程池传入一个任务。
newFixedThreadPool线程池的使用,且自定义线程工厂:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
public class TestThreadPoolExecutors {
public static void main(String[] args) {
ExecutorService pool = Executors.newFixedThreadPool(2, new ThreadFactory() {
private AtomicInteger t = new AtomicInteger(1);
@Override
//实现newThread方法,创建线程,因为线程工厂的作用就是来创建线程
public Thread newThread(Runnable r) {
return new Thread(r,"mypool_t" + t.getAndIncrement());
}
});
pool.execute(()->{
System.out.println(1);
});
pool.execute(()->{
System.out.println(1);
});
pool.execute(()->{
System.out.println(1);
});
}
}
4) newCachedThreadPool
带缓冲的线程池:
public static ExecutorService newCachedThreadPool{
return new ThreadPoolExecutor(0,Integer.MAX_VALUE,60L,TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}
特点
核心线程数是0,最大线程数是 Integer.MAX_VALUE,救急线程的空闲生存时间是60s,意味着
- 线程池中线程均是救急线程 (60s后被回收)<br /> - 救急线程可以无限创建
阻塞队列采用了 SynchronousQueue 实现特点是,该队列没有容量,但如果某一线程在向队列中存放任务时,没有其它线程来取,这一线程是无法将任务存放进任务队列中的(一手交钱、一手交货),该队列更像是用来在两个线程间交换任务
- 整个线程池表现为线程数会根据任务量不断增长,没有上限,当任务执行完毕,空闲1分钟后释放线程
- 适合任务数比较密集,但每个任务执行时间较短的情况
5) newSingleThreadExecutor
单线程的线程池:
public static ExecutorService newSingleThreadExecutor(){
return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1,1,0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
使用场景:
希望多个任务排队执行,线程数固定为1,任务数多于1时,会放入无界队列排队。任务执行完毕,这唯一的线程不会被释放
区别:
- 自己创建一个单线程串行执行任务,如果任务执行失败而终止那么没有任何补救措施,而线程池还会新建一个线程,保证池的正常工作。
Executors.newSingleThreadExecutor()线程个数始终为1,不能修改
- FinalizableDelegatedExecutorService应用的是装饰器模式,只对外暴露了ExecutorService接口,因此不能调用ThreadPoolExecutor中独有的实现方法,而只能调用从ExecutorService继承/实现的公共方法,如shutdown、stop等方法
Executors.newFixedThreadPool(1)初始为1,虽然实现效果与newSingleThreadExecutor()效果一致,但之后还可以手动修改线程池内线程容量
- 对外暴露的是 ThreadPoolExecutor对象,可以从拿到的ExecutorService引用强转为(ThreadPoolExecutor)类对象,从而调用如 setCorePoolSize方法以恶意修改线程池信息。
注意,一个线程正常执行过程中如果出现异常,那么线程则会强制终止,与进程遇到错误而停止是一个道理,除非线程内部代码使用try-catch块处理异常。
下例中创建单线程池,执行到 int i = 1/0这行代码时,会抛出异常,但线程池中的唯一线程不会终止,线程池会重新创建一个线程以保证接下来任务的顺利执行。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class TestSingleThreadExecutor {
public static void main(String[] args) {
//创建单线程池
ExecutorService pool = Executors.newSingleThreadExecutor();
pool.execute(()->{
System.out.println(1);
int i = 1/0;
});
pool.execute(()->{
System.out.println(2);
});
pool.execute(()->{
System.out.println(3);
int i = 1/0;
});
}
}
6) 提交任务
创建线程池会需要向线程池中提交任务以执行,除了基本的execute方法外有其它提交任务的方式:
1、execute
execute方法直接传入Runnable对象,这里的“任务”指的是Runnable对象,方法无返回结果。
2、submit
submit方法传入Callable对象,这里的任务指的是Callable对象(注意线程实现方式除了Runnable,还有Callable)。此方法返回任务执行结果,其中T表示返回结果的类型。
任务执行结果以Future对象的形式返回,与Runnable对象的run()方法不同,Callable是call()方法;
import java.util.concurrent.*;
public class TestSingleThreadExecutor {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//创建单线程池
ExecutorService pool = Executors.newSingleThreadExecutor();
//返回任务执行结果,Future对象
Future<String> future = pool.submit(new Callable<String>() {
@Override
public String call() throws Exception {
Thread.sleep(1000);
return "ok";
}
});
System.out.println(future.get());
}
}
3、invokeAll
invokeAll指以任务集合的形式向线程池提交任务,其中集合中存储对象的类型为Callable类,集合类的选取不固定,方法返回值是List
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class TestInovkeAll {
public static void main(String[] args) throws InterruptedException {
//创建容量为2的线程池
ExecutorService executorService = Executors.newFixedThreadPool(2);
List<Future<String>> futures = executorService.invokeAll(Arrays.asList(
() -> {
System.out.println("begin");
Thread.sleep(1000);
return "1";
},
()->{
System.out.println("begin");
Thread.sleep(500);
return "2";
}
,()->{
System.out.println("begin");
Thread.sleep(2000);
return "3";
}
));
futures.forEach(f->{
try {
System.out.println(f.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
});
}
}
本例中的实现选择Arrays.asList方法,方法中传入Callable列表,因为Callable是函数式接口,这里直接使用lambda表达式更方便。
注意,前两个任务提交给线程池后,两个线程并行执行task1、task2,task2只等待0.5秒,所以先执行,但注意主线程遍历是要等到所有任务执行完才会执行(同步等待),所以遍历的顺序仍然是1、2、3
3、invokeAny
invokeAny提交tasks中所有任务,但不会等待所有任务执行完毕(即拿到所有任务的返回结果),而是哪个任务先执行完毕,则返回此任务执行结果,其它任务取消,因为只返回一个任务结果,所以返回值类型是Object类型(T类型)
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
如下例,将三个task提交给线程池(这里应该是顺序提交),前两个任务被分配到两个线程上,第三个任务由于无线程可分配,存入任务队列中。
明显任务2睡眠时间更短,所以返回task2的结果。
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class TestInvokeAny {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(2);
String result = executorService.invokeAny(Arrays.asList(
() -> {
System.out.println("begin");
Thread.sleep(1000);
System.out.println("end");
return "1";
},
()->{
System.out.println("begin");
Thread.sleep(500);
return "2";
}
,()->{
System.out.println("begin");
Thread.sleep(2000);
System.out.println("end");
return "3";
}
));
System.out.println(result);
}
}
7) 关闭线程池
shutdown
/*
线程状态变为 SHUTDOWN
- 不会接收新任务
- 但已提交任务会执行完毕,阻塞队列中任务也算已提交任务
- 此方法不会阻塞调用线程的执行
*/
void shutdown();
shutdown源码:
public void shutdown(){
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
//修改线程池状态
advanceRunState(SHUTDOWN);
//仅会打断空闲线程,没有执行任务的线程结束掉
interruptIdleWorkers();
onShutdown();
}finally {
mainLock.unlock();
}
//尝试终结(没有运行的线程可以立刻终结)
tryTerminate();
}
shutdownNow
/*
线程状态变为 STOP
- 不会接收新任务
- 会将线程池中所有线程打断(包括空闲线程以及正在执行任务的线程),并将队列中的任务返回.
-
*/
List<Runnable> shutdownNow();
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class TestShutdownNow {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(10);
executorService.submit(new Callable<String>() {
@Override
public String call() throws Exception {
for(int i=0;i<10000000;i++){
int j=i;
j++;
}
return "string";
}
});
executorService.shutdown();
}
}
经测试,shutdownNow的打断正在执行的线程并不是调用interrupt方法,所以上例中即使线程没有进入【WAITING】状态,也会被强制终止。
总结:shutdownNow会强制打断所有线程池中的线程,并将任务队列中的任务返回。
其它方法
// 只要不在RUNNING状态的线程池,此方法就返回true
boolean isShutdown();
// 线程池状态是否是 TERMINATED,终止代表线程池已经终止
boolean isTerminated();
// 调用 shutdown后,由于调用线程并不会等待所有任务运行结束,因此如果它想在线程池TERMINATED后做些事情,可以利用此方法等待。
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
shutdown方法只是将线程池状态标记为SHUTDOWN状态,不再接收新任务,但旧任务会继续执行。通常shutdown方法的线程不会等待线程池进入终止状态,如果等待线程池一段时间,可以调用awaitTermination方法判断,注意方法传入等待时长、返回布尔值。
awaitTermination方法一般与shutdown方法配合使用,用来判断shutdown调用后的固定时间内,线程池是否关闭成功:
// 普通任务处理类
class Task implements Runnable {
@Override
public void run() {
System.out.println("普通任务");
}
}
// 长时间任务处理类
class LongTask implements Runnable {
@Override
public void run() {
System.out.println("长时间任务");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
}
}
}
public class TestAwaitTermination {
public static void main(String[] args) throws InterruptedException {
ExecutorService service = Executors.newFixedThreadPool(4);
service.execute(new Task());
service.execute(new Task());
service.execute(new LongTask());
service.execute(new Task());
service.shutdown();
while (!service.awaitTermination(1, TimeUnit.SECONDS)) {
System.out.println("线程池没有关闭");
}
System.out.println("线程池已经关闭");
}
}