ThreadPoolExecutor提供了四个构造方法:
26d3258d5dc827a4c8fc0307b3aea2e5_webp.webp

ThreadPoolExecutor构造方法.png

我们以最后一个构造方法(参数最多的那个),对其参数进行解释:

  1. public ThreadPoolExecutor(int corePoolSize, // 1
  2. int maximumPoolSize, // 2
  3. long keepAliveTime, // 3
  4. TimeUnit unit, // 4
  5. BlockingQueue<Runnable> workQueue, // 5
  6. ThreadFactory threadFactory, // 6
  7. RejectedExecutionHandler handler ) { //7
  8. if (corePoolSize < 0 ||
  9. maximumPoolSize <= 0 ||
  10. maximumPoolSize < corePoolSize ||
  11. keepAliveTime < 0)
  12. throw new IllegalArgumentException();
  13. if (workQueue == null || threadFactory == null || handler == null)
  14. throw new NullPointerException();
  15. this.corePoolSize = corePoolSize;
  16. this.maximumPoolSize = maximumPoolSize;
  17. this.workQueue = workQueue;
  18. this.keepAliveTime = unit.toNanos(keepAliveTime);
  19. this.threadFactory = threadFactory;
  20. this.handler = handler;
  21. }
序号 名称 类型 含义
1 corePoolSize int 核心线程池大小
2 maximumPoolSize int 最大线程池大小
3 keepAliveTime long 线程最大空闲时间
4 unit TimeUnit 时间单位
5 workQueue BlockingQueue 线程等待队列
6 threadFactory ThreadFactory 线程创建工厂
7 handler RejectedExecutionHandler 拒绝策略

如果对这些参数作用有疑惑的请看 ThreadPoolExecutor概述
知道了各个参数的作用后,我们开始构造符合我们期待的线程池。首先看JDK给我们预定义的几种线程池:

一、预定义线程池

1. FixedThreadPool

  1. public static ExecutorService newFixedThreadPool(int nThreads) {
  2. return new ThreadPoolExecutor(nThreads, nThreads,
  3. 0L, TimeUnit.MILLISECONDS,
  4. new LinkedBlockingQueue<Runnable>());
  5. }
  • corePoolSize与maximumPoolSize相等,即其线程全为核心线程,是一个固定大小的线程池,是其优势;
  • keepAliveTime = 0 该参数默认对核心线程无效,而FixedThreadPool全部为核心线程;
  • workQueue 为LinkedBlockingQueue(无界阻塞队列),队列最大值为Integer.MAX_VALUE。如果任务提交速度持续大余任务处理速度,会造成队列大量阻塞。因为队列很大,很有可能在拒绝策略前,内存溢出。是其劣势;
  • FixedThreadPool的任务执行是无序的;

适用场景:可用于Web服务瞬时削峰,但需注意长时间持续高峰情况造成的队列阻塞。

2. CachedThreadPool

  1. public static ExecutorService newCachedThreadPool() {
  2. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
  3. 60L, TimeUnit.SECONDS,
  4. new SynchronousQueue<Runnable>());
  5. }
  • corePoolSize = 0,maximumPoolSize = Integer.MAX_VALUE,即线程数量几乎无限制;
  • keepAliveTime = 60s,线程空闲60s后自动结束。
  • workQueue 为 SynchronousQueue 同步队列,这个队列类似于一个接力棒,入队出队必须同时传递,因为CachedThreadPool线程创建无限制,不会有队列等待,所以使用SynchronousQueue;

适用场景:快速处理大量耗时较短的任务,如Netty的NIO接受请求时,可使用CachedThreadPool。

3. SingleThreadExecutor

  1. public static ExecutorService newSingleThreadExecutor() {
  2. return new FinalizableDelegatedExecutorService
  3. (new ThreadPoolExecutor(1, 1,
  4. 0L, TimeUnit.MILLISECONDS,
  5. new LinkedBlockingQueue<Runnable>()));
  6. }

咋一瞅,不就是newFixedThreadPool(1)吗?定眼一看,这里多了一层FinalizableDelegatedExecutorService包装,这一层有什么用呢,写个dome来解释一下:

  1. public static void main(String[] args) {
  2. ExecutorService fixedExecutorService = Executors.newFixedThreadPool(1);
  3. ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) fixedExecutorService;
  4. System.out.println(threadPoolExecutor.getMaximumPoolSize());
  5. threadPoolExecutor.setCorePoolSize(8);
  6. ExecutorService singleExecutorService = Executors.newSingleThreadExecutor();
  7. // 运行时异常 java.lang.ClassCastException
  8. // ThreadPoolExecutor threadPoolExecutor2 = (ThreadPoolExecutor) singleExecutorService;
  9. }

对比可以看出,FixedThreadPool可以向下转型为ThreadPoolExecutor,并对其线程池进行配置,而SingleThreadExecutor被包装后,无法成功向下转型。因此,SingleThreadExecutor被定以后,无法修改,做到了真正的Single。
使用场景
希望多个任务排队执行。线程数固定为 1,任务数多于 1 时,会放入无界队列排队。任务执行完毕,这唯一的线程也不会被释放

4. ScheduledThreadPool

  1. public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
  2. return new ScheduledThreadPoolExecutor(corePoolSize);
  3. }

newScheduledThreadPool调用的是ScheduledThreadPoolExecutor的构造方法,而ScheduledThreadPoolExecutor继承了ThreadPoolExecutor,构造是还是调用了其父类的构造方法。

  1. public ScheduledThreadPoolExecutor(int corePoolSize) {
  2. super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
  3. new DelayedWorkQueue());
  4. }

整个线程池表现为:线程数固定,任务数多于线程数时,会放入无界队列排队。任务执行完毕,这些线程也不会被释放。用来执行延迟或反复执行的任务

二、自定义线程池

以下是自定义线程池,使用了有界队列,自定义ThreadFactory和拒绝策略的demo:

  1. public class ThreadTest {
  2. public static void main(String[] args) throws InterruptedException, IOException {
  3. int corePoolSize = 2;
  4. int maximumPoolSize = 4;
  5. long keepAliveTime = 10;
  6. TimeUnit unit = TimeUnit.SECONDS;
  7. BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2);
  8. ThreadFactory threadFactory = new NameTreadFactory();
  9. RejectedExecutionHandler handler = new MyIgnorePolicy();
  10. ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit,
  11. workQueue, threadFactory, handler);
  12. executor.prestartAllCoreThreads(); // 预启动所有核心线程
  13. for (int i = 1; i <= 10; i++) {
  14. MyTask task = new MyTask(String.valueOf(i));
  15. executor.execute(task);
  16. }
  17. System.in.read(); //阻塞主线程
  18. }
  19. static class NameTreadFactory implements ThreadFactory {
  20. private final AtomicInteger mThreadNum = new AtomicInteger(1);
  21. @Override
  22. public Thread newThread(Runnable r) {
  23. Thread t = new Thread(r, "my-thread-" + mThreadNum.getAndIncrement());
  24. System.out.println(t.getName() + " has been created");
  25. return t;
  26. }
  27. }
  28. public static class MyIgnorePolicy implements RejectedExecutionHandler {
  29. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  30. doLog(r, e);
  31. }
  32. private void doLog(Runnable r, ThreadPoolExecutor e) {
  33. // 可做日志记录等
  34. System.err.println( r.toString() + " rejected");
  35. // System.out.println("completedTaskCount: " + e.getCompletedTaskCount());
  36. }
  37. }
  38. static class MyTask implements Runnable {
  39. private String name;
  40. public MyTask(String name) {
  41. this.name = name;
  42. }
  43. @Override
  44. public void run() {
  45. try {
  46. System.out.println(this.toString() + " is running!");
  47. Thread.sleep(3000); //让任务执行慢点
  48. } catch (InterruptedException e) {
  49. e.printStackTrace();
  50. }
  51. }
  52. public String getName() {
  53. return name;
  54. }
  55. @Override
  56. public String toString() {
  57. return "MyTask [name=" + name + "]";
  58. }
  59. }
  60. }

输出结果如下:
eafa327e8c58cafccd02693cd89e5410_webp.webp
image.png
其中线程线程1-4先占满了核心线程和最大线程数量,然后4、5线程进入等待队列,7-10线程被直接忽略拒绝执行,等1-4线程中有线程执行完后通知4、5线程继续执行。
总结,通过自定义线程池,我们可以更好的让线程池为我们所用,更加适应我的实际场景。

作者:徐志毅
链接:https://www.jianshu.com/p/f030aa5d7a28
来源:简书