在面向对象的编程过程中,创建对象和销毁对象是非常消耗时间和资源的。因此想要最小化这种消耗的一种思想就是『池化资源』,线程池就是这样的一种思想,我们通过重用线程池中的资源来减少创建和销毁线程所需要耗费的时间和资源。

new Thread 的弊端

  • 每次new Thread新建对象性能差
  • 线程缺乏统一管理,可能无限制新建线程,相互之间竞争,及可能占用过多系统资源导致死机或oom
  • 缺乏更多功能,如定时执行、定期执行、线程中断

    线程池的好处

  • 降低资源消耗,通过重复利用已创建的线程降低线程创建和销毁造成的消耗

  • 提高响应速度,当任务到达时,任务可以不需要的等到线程创建就能立即执行
  • 提高线程的可管理性,线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控

    什么时候使用线程池

  • 单个任务处理事件较短

  • 需要处理的任务数量很大

    线程池的组成

    一般的线程池主要分为以下4个组成部分:

  • 线程池管理器:用于创建并管理线程池

  • 工作线程:线程池中的线程
  • 任务接口:每个任务必须实现的接口,用于工作线程调度其运行
  • 任务队列:用于存放待处理的任务,提供一种缓冲机制

Java 线程池框架

Executor接口

Executor 接口只有一个 execute 方法,用来替代通常创建或启动线程的方法,通常具体功能由不同的子类实现决定。

ExecutorService接口

ExecutorService 接口继承自 Executor 接口,提供了管理终止的方法,以及可为跟踪一个或多个异步任务执行状况而生成 Future 的方法。增加了shutDown(),shutDownNow(),invokeAll(),invokeAny() 和 submit()等方法。如果需要支持即时关闭,也就是shutDownNow()方法,则任务需要正确处理中断。

ScheduledExecutorService接口

ScheduledExecutorService 扩展 ExecutorService 接口并增加了schedule 方法。调用 schedule 方法可以在指定的延时后执行一个 Runnable 或者 Callable 任务。ScheduledExecutorService 接口还定义了按照指定时间间隔定期执行任务的 scheduleAtFixedRate() 方法和scheduleWithFixedDelay() 方法。

ThreadPoolExecutor

ThreadPoolExecutor 继承自 AbstractExecutorService,也是实现了ExecutorService接口,一般所有线程池都是基于这个类实现。

构造方法
  1. public ThreadPoolExecutor(int corePoolSize,
  2. int maximumPoolSize,
  3. long keepAliveTime,
  4. TimeUnit unit,
  5. BlockingQueue<Runnable> workQueue,
  6. ThreadFactory threadFactory,
  7. RejectedExecutionHandler handler) {
  8. // 具体实现
  9. }
  • corePoolSize:线程池的核心线程数,线程池中运行的线程数也永远不会超过 corePoolSize 个,默认情况下会永远存活
    • 如果运行的线程少于 corePoolSize,则创建新线程来处理任务,即使线程池中的其他线程是空闲的
    • 如果线程池中的线程数量大于等于 corePoolSize 且小于 maximumPoolSize,则只有当workQueue满时才创建新的线程去处理任务
    • 如果设置的 corePoolSize 和 maximumPoolSize 相同,则创建的线程池的大小是固定的,这时如果有新任务提交,若 workQueue 未满,则将请求放入 workQueue 中,等待有空闲的线程去从 workQueue 中取任务并处理
    • 如果运行的线程数量大于等于 maximumPoolSize,这时如果workQueue已经满了,则通过handler所指定的策略来处理任务
  • maximumPoolSize:线程池中允许的最大线程数
  • keepAliveTime:空闲线程结束的超时时间,当线程池中的线程数量大于 corePoolSize的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了 keepAliveTime
  • unit:是一个枚举,它表示的是 keepAliveTime 的单位
  • workQueue:工作队列,用于任务的存放,当任务提交时,如果线程池中的线程数量大于等于corePoolSize 的时候,把该任务封装成一个 Worker 对象放入等待队列
  • threadFactory:它是ThreadFactory类型的变量,用来创建新线程。默认使用Executors.defaultThreadFactory() 来创建线程。使用默认的ThreadFactory来创建线程时,会使新创建的线程具有相同的NORM_PRIORITY优先级并且是非守护线程,同时也设置了线程的名称。
  • handler:它是RejectedExecutionHandler类型的变量,表示线程池的饱和策略。如果阻塞队列满了并且没有空闲的线程,这时如果继续提交任务,就需要采取一种策略处理该任务。线程池提供了4种策略:
    • AbortPolicy:直接抛出异常,这是默认策略;
    • CallerRunsPolicy:用调用者所在的线程来执行任务;
    • DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
    • DiscardPolicy:直接丢弃任务;

execute方法

execute()方法用来提交任务,代码如下

  1. public void execute(Runnable command) {
  2. if (command == null)
  3. throw new NullPointerException();
  4. int c = ctl.get();
  5. if (workerCountOf(c) < corePoolSize) {
  6. if (addWorker(command, true))
  7. return;
  8. c = ctl.get();
  9. }
  10. if (isRunning(c) && workQueue.offer(command)) {
  11. int recheck = ctl.get();
  12. if (! isRunning(recheck) && remove(command))
  13. reject(command);
  14. else if (workerCountOf(recheck) == 0)
  15. addWorker(null, false);
  16. }
  17. else if (!addWorker(command, false))
  18. reject(command);
  19. }
  1. 如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务;
  2. 如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中;
  3. 如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务;
  4. 如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。

addWorker方法

addWorker方法的主要工作是在线程池中创建一个新的线程并执行,firstTask参数 用于指定新增的线程执行的第一个任务,core参数为true表示在新增线程时会判断当前活动线程数是否少于corePoolSize,false表示新增线程前需要判断当前活动线程数是否少于maximumPoolSize

  1. private boolean addWorker(Runnable firstTask, boolean core) {
  2. retry:
  3. for (;;) {
  4. int c = ctl.get();
  5. int rs = runStateOf(c);
  6. // Check if queue empty only if necessary.
  7. if (rs >= SHUTDOWN &&
  8. ! (rs == SHUTDOWN &&
  9. firstTask == null &&
  10. ! workQueue.isEmpty()))
  11. return false;
  12. for (;;) {
  13. int wc = workerCountOf(c);
  14. if (wc >= CAPACITY ||
  15. wc >= (core ? corePoolSize : maximumPoolSize))
  16. return false;
  17. if (compareAndIncrementWorkerCount(c))
  18. break retry;
  19. c = ctl.get(); // Re-read ctl
  20. if (runStateOf(c) != rs)
  21. continue retry;
  22. // else CAS failed due to workerCount change; retry inner loop
  23. }
  24. }
  25. boolean workerStarted = false;
  26. boolean workerAdded = false;
  27. Worker w = null;
  28. try {
  29. w = new Worker(firstTask);
  30. final Thread t = w.thread;
  31. if (t != null) {
  32. final ReentrantLock mainLock = this.mainLock;
  33. mainLock.lock();
  34. try {
  35. // Recheck while holding lock.
  36. // Back out on ThreadFactory failure or if
  37. // shut down before lock acquired.
  38. int rs = runStateOf(ctl.get());
  39. if (rs < SHUTDOWN ||
  40. (rs == SHUTDOWN && firstTask == null)) {
  41. if (t.isAlive()) // precheck that t is startable
  42. throw new IllegalThreadStateException();
  43. workers.add(w);
  44. int s = workers.size();
  45. if (s > largestPoolSize)
  46. largestPoolSize = s;
  47. workerAdded = true;
  48. }
  49. } finally {
  50. mainLock.unlock();
  51. }
  52. if (workerAdded) {
  53. t.start();
  54. workerStarted = true;
  55. }
  56. }
  57. } finally {
  58. if (! workerStarted)
  59. addWorkerFailed(w);
  60. }
  61. return workerStarted;
  62. }

Worker类

线程池中的每一个线程被封装成一个Worker对象,ThreadPool维护的其实就是一组Worker对象

  1. private final class Worker extends AbstractQueuedSynchronizer implements Runnable
  2. {
  3. /** Thread this worker is running in. Null if factory fails. */
  4. final Thread thread;
  5. /** Initial task to run. Possibly null. */
  6. Runnable firstTask;
  7. ...
  8. Worker(Runnable firstTask) {
  9. setState(-1); // inhibit interrupts until runWorker
  10. this.firstTask = firstTask;
  11. this.thread = getThreadFactory().newThread(this);
  12. }
  13. }

其中的firstTask和thread属性:firstTask用它来保存传入的任务;thread是在调用构造方法时通过ThreadFactory来创建的线程,是用来处理任务的线程。

在调用构造方法时,需要把任务传入,这里通过getThreadFactory().newThread(this);来新建一个线程,newThread方法传入的参数是this,因为Worker本身继承了Runnable接口,也就是一个线程,所以一个Worker对象在启动的时候会调用Worker类中的run方法

常见的 Java 线程池

生成线程池使用的是Executors的工厂方法

SingleThreadExecutor

SingleThreadExecutor 是单个线程的线程池,即线程池中每次只有一个线程在运行,单线程串行执行任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。

  1. public static ExecutorService newSingleThreadExecutor() {
  2. return new FinalizableDelegatedExecutorService
  3. (new ThreadPoolExecutor(1, 1,
  4. 0L, TimeUnit.MILLISECONDS,
  5. new LinkedBlockingQueue<Runnable>()));
  6. }

FixedThreadPool

FixedThreadPool是固定数量的线程池,只有核心线程,每提交一个任务就是一个线程,直到达到线程池的最大数量,然后后面进入等待队列,直到前面的任务完成才继续执行。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。

  1. public static ExecutorService newFixedThreadPool(int nThreads) {
  2. return new ThreadPoolExecutor(nThreads, nThreads,
  3. 0L, TimeUnit.MILLISECONDS,
  4. new LinkedBlockingQueue<Runnable>());
  5. }

CachedThreadPool

CachedThreadPool是可缓存线程池。如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。其中,SynchronousQueue是一个是缓冲区为1的阻塞队列。

  1. public static ExecutorService newCachedThreadPool() {
  2. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
  3. 60L, TimeUnit.SECONDS,
  4. new SynchronousQueue<Runnable>());
  5. }

ScheduledThreadPool

ScheduledThreadPool是核心线程池固定,大小无限制的线程池,支持定时和周期性的执行线程。创建一个周期性执行任务的线程池。如果闲置,非核心线程池会在DEFAULT_KEEPALIVEMILLIS时间内回收。

  1. public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
  2. return new ScheduledThreadPoolExecutor(corePoolSize);
  3. }
  4. // ScheduledThreadPoolExecutor
  5. public ScheduledThreadPoolExecutor(int corePoolSize) {
  6. super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
  7. new DelayedWorkQueue());
  8. }

线程池的监控

通过线程池提供的参数进行监控。线程池里有一些属性在监控线程池的时候可以使用

  • getTaskCount:线程池已经执行的和未执行的任务总数;
  • getCompletedTaskCount:线程池已完成的任务数量,该值小于等于taskCount;
  • getLargestPoolSize:线程池曾经创建过的最大线程数量。通过这个数据可以知道线程池是否满过,也就是达到了maximumPoolSize;
  • getPoolSize:线程池当前的线程数量;
  • getActiveCount:当前线程池中正在执行任务的线程数量。

通过这些方法,可以对线程池进行监控,在ThreadPoolExecutor类中提供了几个空方法,如beforeExecute方法,afterExecute方法和terminated方法,可以扩展这些方法在执行前或执行后增加一些新的操作,例如统计线程池的执行任务的时间等,可以继承自ThreadPoolExecutor来进行扩展。

致谢

内容部分取自网络,有修改,感谢原作者,如有侵权请联系删除,谢谢。