一、线程池工具类Executors

Executors是juc包中用来快速创建线程池的工具类,可以调用这个类的静态方法来创建线程池,不过用这种方法创建的线程池的一些线程已经被配置好了,建议在逻辑比较简单的情况下使用。
1.png
创建的线程池对于不同的业务场景和逻辑复杂度,处理的速度是不一样的。

  1. ExecutorService executorService1 = Executors.newCachedThreadPool();
  2. ExecutorService executorService2 = Executors.newFixedThreadPool(10);
  3. ExecutorService executorService3 = Executors.newSingleThreadExecutor();

1、newCachedThreadPool()

使用线程工具类创建的newCachedThreadPool线程池,指定的核心线程数是0,最大线程数是Integer的最大值(即全部是非核心线程),非核心线程的生命周期是60秒,阻塞队列使用SynchronousQueue,这个队列的容量为0,每个生产者都需要一个消费者线程来消费。

  1. public static ExecutorService newCachedThreadPool() {
  2. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
  3. 60L, TimeUnit.SECONDS,
  4. new SynchronousQueue<Runnable>());
  5. }

2、newFixedThreadPool()

需要指定线程数量,核心线程数是指定的数量,也是最大的线程数量,所有没有非核心线程。

  1. public static ExecutorService newFixedThreadPool(int nThreads) {
  2. return new ThreadPoolExecutor(nThreads, nThreads,
  3. 0L, TimeUnit.MILLISECONDS,
  4. new LinkedBlockingQueue<Runnable>());
  5. }

3、newSingleThreadExecutor()

只有一个线程来处理任务。

  1. public static ExecutorService newSingleThreadExecutor() {
  2. return new FinalizableDelegatedExecutorService
  3. (new ThreadPoolExecutor(1, 1,
  4. 0L, TimeUnit.MILLISECONDS,
  5. new LinkedBlockingQueue<Runnable>()));
  6. }

自带的工具类对于线程池的配置非常有限,例如LinkedBlockingQueue阻塞队列的容量等,所以大多数情况都不去使用工具类来创建线程池,除非是非常简单的任务逻辑。
通常使用new来创建线程池,可以根据业务场景来自行配置。
线程池的构造方法

  1. public ThreadPoolExecutor(int corePoolSize,
  2. int maximumPoolSize,
  3. long keepAliveTime,
  4. TimeUnit unit,
  5. BlockingQueue<Runnable> workQueue,
  6. RejectedExecutionHandler handler) {
  7. this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
  8. Executors.defaultThreadFactory(), handler);
  9. }

corePoolSize:核心线程数
maximumPoolSize:最大线程数(非核心线程数 = 最大线程数 - 核心线程数)
keepAliveTime:非核心线程空闲保活时间
unit:时间的单位
workQueue:阻塞队列,用来保存任务
handler:拒绝策略,当没有线程可执行任务且阻塞队列放满的情况下执行的逻辑

  1. ThreadPoolExecutor threadPoolExecutor =
  2. new ThreadPoolExecutor(10, 20, 0L,
  3. TimeUnit.MILLISECONDS,
  4. new LinkedBlockingQueue<Runnable>(10),
  5. new MonkeyRejectedExecutionHandler());
  6. threadPoolExecutor.execute(() -> {
  7. System.out.println("111");
  8. });

二、线程池源码

在线程池提交任务时,首先分配给核心线程来执行任务,如果核心线程没有空闲,那么会先进入阻塞队列,当阻塞队列容量已满,还是没有核心线程来处理时,新提交的任务由非核心线程来处理,如果核心线程和非核心线程的总数到达最大线程数时,执行拒绝策略。
线程池的顶层接口是Executor,这个接口只定义了一个execute()方法,这个方法是没有返回值的。

  1. public interface Executor {
  2. void execute(Runnable command);
  3. }
  1. 通常在线程池提交任务时,还有会使用submit()方法,是在其子接口ExecutorService中定义的,具体的实现实在其实现类中。<br />![2.png](https://cdn.nlark.com/yuque/0/2022/png/21820542/1644999381476-63d6ac2b-c840-42d2-b39e-818c5d39b897.png#clientId=ua323a8d2-b4ea-4&crop=0&crop=0&crop=1&crop=1&from=ui&id=u65e9ddb9&margin=%5Bobject%20Object%5D&name=2.png&originHeight=245&originWidth=709&originalType=binary&ratio=1&rotation=0&showTitle=false&size=38397&status=done&style=none&taskId=uedb7e8fa-fb7b-4fc5-9b81-41880e7a1b4&title=)<br />在ThreadPoolExecutor的父类AbstractExecutorService类中实现的submit方法,其中也是调用了execute()方法。
  1. public <T> Future<T> submit(Callable<T> task) {
  2. if (task == null) throw new NullPointerException();
  3. RunnableFuture<T> ftask = newTaskFor(task);
  4. execute(ftask);
  5. return ftask;
  6. }

线程池定义了一些状态。
RUNNING 状态是正常处理和接受任务的
SHUTDOWN 状态,不接受新的任务,已经接受的任务会继续处理
STOP 状态,不接受新的任务,也不会处理已经接受的任务

  1. private static final int RUNNING = -1 << COUNT_BITS;
  2. private static final int SHUTDOWN = 0 << COUNT_BITS;
  3. private static final int STOP = 1 << COUNT_BITS;
  4. private static final int TIDYING = 2 << COUNT_BITS;
  5. private static final int TERMINATED = 3 << COUNT_BITS;

1、ThreadPoolExecutor.execute()

在addWorker()方法中,创建的worker对象是继承自AQS,实现了Runnable的类。

ThreadPoolExecutor.execute.png2、ScheduledThreadPoolExecutor

常用的定时线程池,周期性执行的线程器,底层使用的是优先级队列来保存,提交的任务先会保存到队列中。

  1. ScheduledThreadPoolExecutor executor =
  2. (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(5)
  3. // 延迟2秒,只会执行一次
  4. executor.schedule(task, 2, TimeUnit.SECONDS);
  5. // 周期性的执行,在每个任务执行完成后,再延迟2秒执行下一个任务
  6. executor.scheduleWithFixedDelay(task, 0, 2, TimeUnit.SECONDS);
  7. // 周期性的执行,在每个任务执行完成后,直接执行下一个任务
  8. executor.scheduleAtFixedRate(task, 0, 1, TimeUnit.SECONDS);

scheduleAtFixedRate源码
ScheduledThreadPoolExecutor-scheduleAtFixedRate.png 使用延迟线程池需要注意,在run方法中要使用try-catch捕获异常,因为在线程池中,如果抛出异常,在调用任务的run方法的上一层就把任务吃掉了,并没有再抛出,所以在run方法中需要把异常处理掉。

  1. // 在myTask封装的类ScheduledFutureTask中的run方法中,调用了其父类的方法
  2. // ScheduledFutureTask.super.runAndReset(),在父类中封装了callable对象
  3. // 其中封装了myTask,在调用callable的call()方法时,即myTask的run方法,如果
  4. // 抛出异常,则直接被catch后保存到了set中,所以需要直接在myTask的run方法中处理掉异常
  5. try {
  6. c.call(); // don't set result
  7. ran = true;
  8. } catch (Throwable ex) {
  9. setException(ex);
  10. }

三、拒绝策略

在线程池的拒绝策略一共有4中实现
4.png
CallerRunsPolicy,由调用execute方法提交任务的线程来执行这个任务。
AbortPolicy,抛出异常RejectedExecutionException拒绝提交任务。
DiscardPolicy,直接抛弃任务,不做任何处理。
DiscardOldestPolicy,去除任务队列中的第一个任务(最旧的),重新提交。
在平时的使用中,可能以上四种实现都不能满足需求,所以可以重写拒绝策略,并当做参数传入,重写拒绝策略有两种方法。
直接写一个RejectedExecutionHandler拒绝策略的实现类

  1. RejectedExecutionHandler rejectedExecutionHandler = new RejectedExecutionHandler() {
  2. @Override
  3. public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
  4. // 自己实现的拒绝策略
  5. }
  6. };
  7. // 把自定义的拒绝策略传入线程池
  8. ThreadPoolExecutor threadPoolExecutor =
  9. new ThreadPoolExecutor(10, 20, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(10),rejectedExecutionHandler);

或者直接写一个类,继承RejectedExecutionHandler来实现逻辑,并传入线程池

  1. public class MonkeyRejectedExecutionHandler implements RejectedExecutionHandler {
  2. @Override
  3. public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
  4. }
  5. }
  6. ThreadPoolExecutor threadPoolExecutor =
  7. new ThreadPoolExecutor(10, 20, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(10),new MonkeyRejectedExecutionHandler());