Executors 是为了方便创建线程池而提供的一个工具类,目前提供了 5 种不同的线程池创建配置。不过不推荐使用,因为 Executors 提供的工厂类会忽略很多线程池的参数设置,容易产生性能问题或资源浪费。

newFixedThreadPool

  1. public static ExecutorService newFixedThreadPool(int nThreads) {
  2. return new ThreadPoolExecutor(nThreads, nThreads,
  3. 0L, TimeUnit.MILLISECONDS,
  4. new LinkedBlockingQueue<Runnable>());
  5. }
  6. public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
  7. return new ThreadPoolExecutor(nThreads, nThreads,
  8. 0L, TimeUnit.MILLISECONDS,
  9. new LinkedBlockingQueue<Runnable>(),
  10. threadFactory);
  11. }

该方法返回一个固定线程数量的线程池,核心线程数和最大线程数都是 nThreads。采用 LinkedBlockingQueue来存放任务,LinkedBlockingQueue 是基于链表的无界阻塞队列,当达到最大线程数且没有空闲线程时,任务会进入阻塞队列中排队。因此在并发很高时,LinkedBlockingQueue 可能会抛出 OOM 异常。

newSingleThreadExecutor

  1. public static ExecutorService newSingleThreadExecutor() {
  2. return new FinalizableDelegatedExecutorService
  3. (new ThreadPoolExecutor(1, 1,
  4. 0L, TimeUnit.MILLISECONDS,
  5. new LinkedBlockingQueue<Runnable>()));
  6. }
  7. public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
  8. return new FinalizableDelegatedExecutorService
  9. (new ThreadPoolExecutor(1, 1,
  10. 0L, TimeUnit.MILLISECONDS,
  11. new LinkedBlockingQueue<Runnable>(),
  12. threadFactory));
  13. }

该方法返回一个只有 1 个线程的线程池。若多余一个任务被提交到该线程池,任务会被保存在任务队列中,待线程空闲,按先入先出的顺序执行队列中的任务。该方法适用于任务需要被顺序执行的场景。

newCachedThreadPool

  1. public static ExecutorService newCachedThreadPool() {
  2. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
  3. 60L, TimeUnit.SECONDS,
  4. new SynchronousQueue<Runnable>());
  5. }
  6. public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
  7. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
  8. 60L, TimeUnit.SECONDS,
  9. new SynchronousQueue<Runnable>(),
  10. threadFactory);
  11. }

它是一种用来处理大量短时间工作任务的线程池,具有几个鲜明特点:它会试图缓存线程并重用,当无缓存线程可用时,就会创建新的工作线程;如果线程闲置的时间超过 60 秒,则被终止并移出缓存;长时间闲置时,这种线程池不会消耗什么资源。

其内部使用 SynchronousQueue 作为工作队列,它是一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作。因此,当有任务要执行又没有空闲线程时,会直接增加线程执行任务,由于我们将最大线程数设定为 Integer.MAX_VALUE,所以可创建的线程数理论上没有上线。
Executors - 图1

newScheduledThreadPool

  1. public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
  2. return new ScheduledThreadPoolExecutor(corePoolSize);
  3. }
  4. public static ScheduledExecutorService newScheduledThreadPool(
  5. int corePoolSize, ThreadFactory threadFactory) {
  6. return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
  7. }
  8. public ScheduledThreadPoolExecutor(int corePoolSize) {
  9. super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
  10. new DelayedWorkQueue());
  11. }
  12. public ScheduledThreadPoolExecutor(int corePoolSize,
  13. ThreadFactory threadFactory) {
  14. super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
  15. new DelayedWorkQueue(), threadFactory);
  16. }

这个线程池一般用来执行定时任务,适用于需要多个后台线程执行周期任务,同时为了满足资源管理的需求而需要限制后台线程的数量的应用场景。

调度程序实际上并不保证任务会无限期地持续调用。如果任务本身抛出了异常,那么后续的所有任务执行都会被中断,因此必须保证异常被及时处理,为周期性任务的稳定调度提供条件。

  1. // 在delay时间后,对任务进行一次调度
  2. public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) ;
  3. // 周期性调度,它以上一个任务开始执行时间为起点,在经过period时间调度下一次任务调度
  4. public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay,
  5. long period, TimeUnit unit);
  6. // 周期性调度,它以上一个任务结束后的时间为起点,再经过delay时间进行下一次任务调度
  7. public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay,
  8. long delay, TimeUnit unit);

Executors - 图2
DelayQueue 封装了一个 PriorityQueue,这个 PriorityQueue 会对队列中的 RunnableScheduledFuture 进行排序,时间早的任务将被先执行。如果两个 ScheduledFutureTask 的 time 相同,就比较 sequenceNumber,sequenceNumber小的排在前面。

newWorkStealingPool

这是一个经常被人忽略的线程池,Java 8 才加入这个创建方法,其内部会构建 ForkJoinPool,利用任务窃取(Work-Stealing)算法,并行地处理任务,但是不保证处理的顺序。

  1. public static ExecutorService newWorkStealingPool() {
  2. return new ForkJoinPool
  3. (Runtime.getRuntime().availableProcessors(),
  4. ForkJoinPool.defaultForkJoinWorkerThreadFactory,
  5. null, true);
  6. }
  7. public static ExecutorService newWorkStealingPool(int parallelism) {
  8. return new ForkJoinPool
  9. (parallelism,
  10. ForkJoinPool.defaultForkJoinWorkerThreadFactory,
  11. null, true);
  12. }