image.png

线程池状态

ThreadPoolExecutor 使用 int的高 3 位来表示线程池状态,低 29 位表示线程数量。

  1. private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
  2. private static final int COUNT_BITS = Integer.SIZE - 3;
  3. //CAPACITY的前三位为0,后29位为1
  4. private static final int CAPACITY = (1 << COUNT_BITS) - 1;
  5. // 高三位存储线程池运行状态该
  6. private static final int RUNNING = -1 << COUNT_BITS;
  7. private static final int SHUTDOWN = 0 << COUNT_BITS;
  8. private static final int STOP = 1 << COUNT_BITS;
  9. private static final int TIDYING = 2 << COUNT_BITS;
  10. private static final int TERMINATED = 3 << COUNT_BITS;
  11. //计算线程状态,CAPACITY取反后再进行与计算
  12. private static int runStateOf(int c) { return c & ~CAPACITY; }
  13. //计算线程数
  14. private static int workerCountOf(int c) { return c & CAPACITY; }
  15. // 将线程池状态与线程个数合二为一
  16. private static int ctlOf(int rs, int wc) { return rs | wc; }
状态名 高3**位** 接收新任务 处理阻塞队列任务 说明
RUNNING 111 Y Y
SHUTDOWN 000 N Y 不会接收新任务,但会处理阻塞队列剩余
任务
STOP 001 N N 会中断正在执行的任务,并抛弃阻塞队列
任务
TIDYING 010 - - 任务全执行完毕,活动线程为 0 即将进入
终结
TERMINATED 011 - - 终结状态

从数字上比较,TERMINATED > TIDYING > STOP > SHUTDOWN > RUNNING,这些信息存储在一个原子变量 ctl中,目的是将线程池状态与线程个数合二为一,这样就可以用一次 cas 原子操作进行赋值

构造方法

  1. public ThreadPoolExecutor(int corePoolSize,
  2. int maximumPoolSize,
  3. long keepAliveTime,
  4. TimeUnit unit,
  5. BlockingQueue<Runnable> workQueue,
  6. ThreadFactory threadFactory,
  7. RejectedExecutionHandler handler)
  • corePoolSize核心线程数目 (最多保留的线程数)
  • maximumPoolSize最大线程数目
  • keepAliveTime生存时间 - 针对救急线程
  • unit时间单位 - 针对救急线程
  • workQueue阻塞队列
  • threadFactory线程工厂 - 可以为线程创建时起个好名字
  • handler拒绝策略

根据这个构造方法,JDK Executors类中提供了众多工厂方法来创建各种用途的线程池

工作模式

  1. 线程池中刚开始没有线程,当一个任务提交给线程池后,线程池会创建一个新线程来执行任务。
  2. 当线程数达到 corePoolSize并没有线程空闲,这时再加入任务,新加的任务会被加入workQueue队列排队,直到有空闲的线程。
  3. 如果队列选择了有界队列,那么任务超过了队列大小时,最多会创建 maximumPoolSize - corePoolSize 数目的线程来救急。
  4. 如果线程到达 maximumPoolSize仍然有新任务这时会执行拒绝策略。拒绝策略 jdk 提供了 4 种实现,其它著名框架也提供了实现

image.png

  • AbortPolicy让调用者抛出 RejectedExecutionException异常,这是默认策略
  • CallerRunsPolicy让调用者运行任务
  • DiscardPolicy放弃本次任务
  • DiscardOldestPolicy放弃队列中最早的任务,本任务取而代之
  • Dubbo的实现,在抛出 RejectedExecutionException 异常之前会记录日志,并 dump线程栈信息,方
    便定位问题
  • Netty的实现,是创建一个新线程来执行任务
  • ActiveMQ的实现,带超时等待(60s)尝试放入队列,类似我们之前自定义的拒绝策略
  • PinPoint的实现,它使用了一个拒绝策略链,会逐一尝试策略链中每种拒绝策略
    1. 当高峰过去后,超过corePoolSize的救急线程如果一段时间没有任务做,需要结束节省资源,这个时间由keepAliveTimeunit来控制

newFixedThreadPool

创建一个固定大小的线程池。

  1. public static ExecutorService newFixedThreadPool(int nThreads) {
  2. return new ThreadPoolExecutor(nThreads, nThreads,
  3. 0L, TimeUnit.MILLISECONDS,
  4. new LinkedBlockingQueue<Runnable>());
  5. }
  • 特点
  1. 核心线程数 == 最大线程数(没有救急线程被创建),因此也无需超时时间
  2. 阻塞队列是无界的,可以放任意数量的任务
  3. 如果有一个线程挂了,那么线程池会重新创建一个线程来代替挂了的线程
  • 评价

适用于任务量已知,相对耗时的任务

newCachedThreadPool

创建一个带缓冲区的线程池

  1. public static ExecutorService newCachedThreadPool() {
  2. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
  3. 60L, TimeUnit.SECONDS,
  4. new SynchronousQueue<Runnable>());
  5. }
  • 特点
  1. 核心线程数是 0, 最大线程数是 Integer.MAX_VALUE,救急线程的空闲生存时间是 60s,意味着全部都是救急线程(60s 后可以回收)
  2. 救急线程可以无限创建
  3. 队列采用了 SynchronousQueue 实现特点是,它没有容量,没有线程来取是放不进去的
  • 评价

整个线程池表现为线程数会根据任务量不断增长,没有上限,当任务执行完毕,空闲 1分钟后释放线程。 适合任务数比较密集,但每个任务执行时间较短的情况
**

newSingleThreadExecutor

创建只有一个线程的线程池

  1. public static ExecutorService newSingleThreadExecutor() {
  2. return new FinalizableDelegatedExecutorService
  3. (new ThreadPoolExecutor(1, 1,
  4. 0L, TimeUnit.MILLISECONDS,
  5. new LinkedBlockingQueue<Runnable>()));
  6. }
  • 使用场景:
    希望多个任务排队执行。线程数固定为 1,任务数多于 1 时,会放入无界队列排队。任务执行完毕,这唯一的线程也不会被释放。
  • 区别:
    • 自己创建一个单线程串行执行任务,如果任务执行失败而终止那么没有任何补救措施,而线程池还会新建一个线程,保证池的正常工作
    • Executors.newSingleThreadExecutor() 线程个数始终为1,不能修改,FinalizableDelegatedExecutorService 应用的是装饰器模式,只对外暴露了 ExecutorService接口,因此不能调用 ThreadPoolExecutor中特有的方法
    • Executors.newFixedThreadPool(1) 初始时为1,以后还可以修改,它对外暴露的是 ThreadPoolExecutor对象,可以强转后调用 setCorePoolSize等方法进行修改

      提交任务

      ```java // 执行任务 void execute(Runnable command);

// 有返回值的提交任务 task,用返回值 Future 获得任务执行结果

Future submit(Callable task);

// 提交 tasks 中所有任务

List> invokeAll(Collection<? extends Callable> tasks) throws InterruptedException;

// 提交 tasks 中所有任务,带超时时间

List> invokeAll(Collection<? extends Callable> tasks,long timeout, TimeUnit unit) throws InterruptedException;

// 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消,只要有一个成功了就返回,其余的丢弃

T invokeAny(Collection<? extends Callable> tasks) throws InterruptedException, ExecutionException;

// 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消,带超时时间

T invokeAny(Collection<? extends Callable> tasks,long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;

  1. <a name="jaHjU"></a>
  2. # 任务调度线程池
  3. 【任务调度线程池】表现为:线程数固定,任务数多于线程数时,会放入无界队列排队。任务执行完毕,这些线程也不会被释放。用来执行延迟或反复执行的任务<br />在【任务调度线程池】功能加入之前,可以使用`java.util.Timer `来实现定时功能,`Timer`的优点在于简单易用,但由于所有任务都是由同一个线程来调度,因此所有任务都是串行执行的,同一时间只能有一个任务在执行,前一个任务的延迟或异常都将会影响到之后的任务
  4. ```java
  5. public static void testTime() {
  6. Timer timer = new Timer();
  7. TimerTask task1 = new TimerTask() {
  8. @Override
  9. public void run() {
  10. log.info("task 1");
  11. sleep(2);
  12. }
  13. };
  14. TimerTask task2 = new TimerTask() {
  15. @Override
  16. public void run() {
  17. log.info("task 2");
  18. }
  19. };
  20. // 使用 timer 添加两个任务,希望它们都在 1s 后执行
  21. // 但由于 timer 内只有一个线程来顺序执行队列中的任务,因此『任务1』的延时,影响了『任务2』的执行
  22. timer.schedule(task1, 1000);
  23. timer.schedule(task2, 1000);
  24. }
  25. // 程序希望1s后两个任务同时执行,但是最终结果是任务1先执行完毕,才能执行任务2
  26. 2021-10-04 20:50:07.090 [Timer-0] INFO - task 1
  27. 2021-10-04 20:50:09.092 [Timer-0] INFO - task 2
  • 使用 ScheduledExecutorService改写

    1. public static void testScheduled() {
    2. ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
    3. // 添加两个任务,希望它们都在 1s 后执行
    4. executor.schedule(() -> {
    5. System.out.println("任务1,执行时间:" + new Date());
    6. try {
    7. Thread.sleep(2000);
    8. } catch (InterruptedException e) {
    9. }
    10. }, 1000, TimeUnit.MILLISECONDS);
    11. executor.schedule(() -> {
    12. System.out.println("任务2,执行时间:" + new Date());
    13. }, 1000, TimeUnit.MILLISECONDS);
    14. }
    15. //任务1 和 任务2几乎同时执行
    16. 任务1,执行时间:Mon Oct 04 20:52:33 CST 2021
    17. 任务2,执行时间:Mon Oct 04 20:52:33 CST 2021
  • scheduleAtFixedRate用法

    1. public static void testScheduleAtFixedRate() {
    2. ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
    3. log.info("start...");
    4. pool.scheduleAtFixedRate(() -> {
    5. log.info("running...");
    6. }, 1, 1, TimeUnit.SECONDS);
    7. }
    8. // 程序每隔1s执行一次,首次执行是在主程序启动后1s
    9. 2021-10-04 20:54:38.153 [main] INFO - start...
    10. 2021-10-04 20:54:39.185 [pool-1-thread-1] INFO - running...
    11. 2021-10-04 20:54:40.185 [pool-1-thread-1] INFO - running...
    12. 2021-10-04 20:54:41.184 [pool-1-thread-1] INFO - running...
    13. 2021-10-04 20:54:42.185 [pool-1-thread-1] INFO - running...
    14. 2021-10-04 20:54:43.184 [pool-1-thread-1] INFO - running...

    如果任务执行耗时超过了间隔时间

    1. public static void testScheduleAtFixedRateTimeOut() {
    2. ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
    3. log.info("start...");
    4. pool.scheduleAtFixedRate(() -> {
    5. log.info("running...");
    6. sleep(2);
    7. }, 1, 1, TimeUnit.SECONDS);
    8. }
    9. //程序本意是希望每隔1s执行一次,但是任务的执行耗时是2s,大于时间间隔,所以采用的是任务执行完毕后再执行
    10. //输出分析:一开始,延时 1s,接下来,由于任务执行时间 > 间隔时间,间隔被『撑』到了 2s
    11. 2021-10-04 20:56:08.081 [main] INFO - start...
    12. 2021-10-04 20:56:09.113 [pool-1-thread-1] INFO - running...
    13. 2021-10-04 20:56:11.116 [pool-1-thread-1] INFO - running...
    14. 2021-10-04 20:56:13.117 [pool-1-thread-1] INFO - running...
    15. 2021-10-04 20:56:15.117 [pool-1-thread-1] INFO - running...
  • scheduleWithFixedDelay用法

    1. public static void testScheduleWithFixedDelay() {
    2. ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
    3. log.info("start...");
    4. pool.scheduleWithFixedDelay(() -> {
    5. log.info("running...");
    6. sleep(2);
    7. }, 1, 1, TimeUnit.SECONDS);
    8. }
    9. //输出分析:一开始,延时 1s,scheduleWithFixedDelay 的间隔是 上一个任务结束 <--> 延时 <--> 下一个任务开始 所以间隔都是 3s
    10. 2021-10-04 20:58:04.276 [main] INFO - start...
    11. 2021-10-04 20:58:05.307 [pool-1-thread-1] INFO - running...
    12. 2021-10-04 20:58:08.311 [pool-1-thread-1] INFO - running...
    13. 2021-10-04 20:58:11.312 [pool-1-thread-1] INFO - running...
    14. 2021-10-04 20:58:14.313 [pool-1-thread-1] INFO - running...

    处理任务执行异常

  • 主动捕捉异常

    1. ExecutorService pool = Executors.newFixedThreadPool(1);
    2. pool.submit(() -> {
    3. try {
    4. log.debug("task1");
    5. int i = 1 / 0;
    6. } catch (Exception e) {
    7. log.error("error:", e);
    8. }
    9. });
  • 使用 Future

f.get 会抛出InterruptedExceptionExecutionException异常

  1. public void test() throws ExecutionException, InterruptedException {
  2. ExecutorService pool = Executors.newFixedThreadPool(1);
  3. Future<Boolean> f = pool.submit(() -> {
  4. log.debug("task1");
  5. int i = 1 / 0;
  6. return true;
  7. });
  8. log.debug("result:{}", f.get());
  9. }