1)ThreadPoolExecutor的状态

状态与线程数量用一个int来表示,状态占高三位,其他二十九位为线程数量,其中状态有几种:
① Running(111):刚创建时的状态,表示线程池正在运行;
② Shutdown(000):当前任务队列中的任务会继续执行,但不会接收新任务;
③ Stop(001):中断执行的任务,任务队列的任务也会被丢弃;
④ Tidying(010):表示当前任务全部执行完毕,活动线程为0,即将进入终结状态;
⑤ Terminated(011):终结状态;

为什么用一个int表示?
他们存储在ctl变量中,保证原子性,一次cas操作就可以完成赋值。

2)ThreadPoolExecutor的参数

  1. int corePoolSize, // 核心线程数
  2. int maximumPoolSize, // 总线程数 其中包括核心线程和救急线程
  3. long keepAliveTime, // 救急线程存活时间
  4. TimeUnit unit, // 作为救急线程存活时间的单位
  5. BlockingQueue workQueue, // 工作队列 为阻塞队列
  6. ThreadFactory factory, // 线程工厂 可以自定义线程名
  7. RejectedExecutionHandler handler // 线程数达最大时 对新任务的拒绝策略

3)执行流程

线程池创建完并不会同时创建线程,而是采用懒惰创建的方式,等到第一次有任务请求进来才去创建线程;如果当前池中无空闲线程又有新任务进来,会被传送到工作队列中排队,直到有空闲线程;现在假设工作队列都被填满了,如果开启了有界队列,则会启用救急线程去处理任务;如果没有开启有界队列,则会根据拒绝策略去处理;
处理策略主要有:
①AbortPolicy:让调用者抛异常RejectedExecutionException【默认】;
②CallerRunsPolicy:让调用者运行任务;
③DiscardPolicy:放弃本次任务;
④DiscardOldestPolicy:放弃队列中最早的任务,新任务取代;

4)Executors封装的三种类型线程池

① FixedThreadPool

  1. // 传入的参数代表核心线程的数量
  2. ExecutorService pool1 = Executors.newFixedThreadPool(2);
  3. pool1.execute(() -> log.debug("doing!!"));

这种模式下,总线程等于核心线程,也就是说没有救急线程的存在,使用的队列为阻塞无界队列,适用于任务量可预知但任务执行时间较长的情况。

② CachedThreadPool

  1. ExecutorService pool2 = Executors.newCachedThreadPool();
  2. pool2.execute(()->log.debug("doing on cached pool !!!"));

这种模式下,全部线程都是救急线程,并且基本无创建上限【Integer的上界】,救急线程空闲六十秒会被回收,使用的队列是 SynchronousQueue 特点是需要有线程访问任务才会入队,否则调用者会阻塞等待,适用于任务量较大且任务执行时间较短的情况。

③ SingleThreadExecutor

  1. ExecutorService pool3 = Executors.newSingleThreadExecutor();
  2. pool3.execute(() ->log.debug("doing on single thread!!!"));

这种模式下,创建的是只有一个线程的线程池,使用阻塞的无界队列,适用于需要串行执行任务的情况【任务一失败不影响任务二的情况】。
在构造方法中使用装饰器模式对线程池对象创建进行封装,有别于直接调用Executors的方法 newFixedThreadPool(1) ,前者受保护线程数量不会被修改掉,后者运行后还可以修改线程的数量。

5)改变线程任务状态的方式

① execute:直接执行线程任务,无返回值;
② submit:可以传入Callable对象,接受一个Future类型的返回值;
③ invokeAll:批量执行线程任务,传入Callable对象集合;
④ invokeAny:传入Callable对象集合,只要其中一个任务执行,流程就算结束;
⑤ shutdown:相当于将状态调整至Shutdown,会等待当前任务、队列中所有任务执行完毕才终结线程池;
⑥ shutdownNow:相当于将状态调整至Stop,会使用interrupt中断正在运行的任务,将任务队列中的任务返回;

6)工作线程模式

简单来说就是让有限的线程资源去异步轮询执行操作,每个线程池负责自己的那部分工作,这里面也体现了享元模式的设计。

  1. // 饥饿情况,如果空闲线程不足时,会停下等待线程池分配线程再往后执行
  2. // 实际情况就是两个都无法运行下去
  3. ExecutorService pool = Executors.newFixedThreadPool(2);
  4. pool.submit(()->{
  5. log.debug("preparing cooking");
  6. Future<Boolean> res = pool.submit(() -> {
  7. log.debug("start cooking");
  8. return Boolean.TRUE;
  9. });
  10. try {
  11. if (res.get()) {
  12. log.debug("now you can eating");
  13. }
  14. } catch (Exception e) {
  15. e.printStackTrace();
  16. }
  17. });
  18. pool.submit(()->{
  19. log.debug("preparing cooking");
  20. Future<Boolean> res = pool.submit(() -> {
  21. log.debug("start cooking");
  22. return Boolean.TRUE;
  23. });
  24. try {
  25. if (res.get()) {
  26. log.debug("now you can eating");
  27. }
  28. } catch (Exception e) {
  29. e.printStackTrace();
  30. }
  31. });

解决手段,让每个线程池负责某一部分工作,多个线程池分工合作:

  1. // let them to do a part of process rather than all of process.
  2. // create one pool to prepare food and the other to cook
  3. ExecutorService poolToPrepare = Executors.newFixedThreadPool(2);
  4. ExecutorService poolToCook = Executors.newFixedThreadPool(2);
  5. poolToPrepare.submit(()->{
  6. log.debug("preparing food");
  7. Future<Boolean> res = poolToCook.submit(() -> {
  8. log.debug("start cooking");
  9. return Boolean.TRUE;
  10. });
  11. try {
  12. if (res.get()) {
  13. log.debug("now you can eating");
  14. }
  15. } catch (Exception e) {
  16. e.printStackTrace();
  17. }
  18. });
  19. poolToPrepare.submit(()->{
  20. log.debug("preparing food");
  21. Future<Boolean> res = poolToCook.submit(() -> {
  22. log.debug("start cooking");
  23. return Boolean.TRUE;
  24. });
  25. try {
  26. if (res.get()) {
  27. log.debug("now you can eating");
  28. }
  29. } catch (Exception e) {
  30. e.printStackTrace();
  31. }
  32. });

7)线程数量的考量

IO密集型:IO操作占整体操作的大部分时间,这种情况下需要保证在进行IO操作时CPU不闲着,也就是一些线程正在进行IO操作时,另一些线程应该马上利用CPU计算,因此在这种环境下,推荐线程数= CPU核数*(1 + (IO操作耗时/CPU操作耗时) );

CPU密集型:纯CPU操作较多的情况,需要发挥CPU的全部性能,一核一线程的形式就可以保证任务的高效运行,不发生线程上下文切换。因此在这种环境下,推荐线程数=CPU核数+1,加一是为了避免操作系统内存页失效导致的阻塞;

8)任务调度型线程池

有点类似定时任务执行,线程池中的任务可以带有间隔时间,延时时间。

① newScheduledThreadPool—schedule

  1. // 传参代表核心线程数量
  2. ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
  3. // 添加两个任务,在 1s 后执行
  4. // 这里的schedule方法实际上就是代表需要在将来那个时间运行
  5. executor.schedule(() -> {
  6. System.out.println("job1 start --" + new Date());
  7. try {
  8. Thread.sleep(2000);
  9. } catch (InterruptedException e) {
  10. e.printStackTrace();
  11. }
  12. }, 1000, TimeUnit.MILLISECONDS);
  13. executor.schedule(() -> {
  14. System.out.println("job2 start --" + new Date());
  15. }, 1000, TimeUnit.MILLISECONDS);
  1. // 输出
  2. job2 start --Wed Mar 02 01:11:33 CST 2022
  3. job1 start --Wed Mar 02 01:11:33 CST 2022

这里可以看到任务一的睡眠时间是不会影响到任务二的。

② newScheduledThreadPool— scheduleAtFixedRate

  1. ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
  2. System.out.println("ready --" + new Date());
  3. pool.scheduleAtFixedRate(() -> {
  4. System.out.println("running --" + new Date());
  5. }, 1, 1, TimeUnit.SECONDS);
  6. // 第一个1代表1s后运行 第二个1代表周期为1s
  1. // 输出
  2. ready --Wed Mar 02 01:18:31 CST 2022
  3. running --Wed Mar 02 01:18:32 CST 2022
  4. running --Wed Mar 02 01:18:33 CST 2022
  5. .....

可以看到使用该方法启动的线程在给定延迟时间后会周期性运行的。

9)线程池线程异常处理

假设在主线程创建一个线程池,其中线程发生的异常在主线程是无法感知到的,而关于异常的处理方式有两种,第一种是在线程内处理掉或者抛出,第二种是主线程使用submit+Callable创建线程,设定一个返回值,如果线程成功执行则返回true,之后主线程去获取返回值,如果线程内发生了异常,这时候主线程也会感知到。