线程池的好处:

  1. 降低资源消耗——重复利用已创建的线程避免创建\销毁开销。
  2. 提高响应速度——任务到达时无需等待线程创建即可运行。
  3. 提高线程的可管理性——统一调配、调优和监控,提高系统资源利用率和稳定性。

1. 线程池实现原理

处理流程(下图中,右【是】下【否】):

  1. 如果当前运行线程少于 corePoolSize 则创建新线程来执行任务(需要获取全局锁)。
  2. 否则,将任务加入 BlockingQueue
  3. 如果阻塞队列已满,则创建新的工作线程(Worker)来处理任务(需要获取全局锁)。
  4. 如果超过 maximumPoolSize 则调用 RejectedExecutionHandler.rejectedExecution() 方法进行处理。

2. 线程池的使用

构造器:

  1. public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
  2. BlockingQueue<Runnable> workQueue);
  3. public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
  4. BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
  5. public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
  6. BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
  7. public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
  8. BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);

下面解释下一下构造器中各个参数的含义:

  • corePoolSize:核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了 prestartAllCoreThreads() 或者 prestartCoreThread() 方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建 corePoolSize 个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到 corePoolSize 后,就会把到达的任务放到缓存队列当中;
  • maximumPoolSize:线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程;
  • keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于 corePoolSize 时, keepAliveTime 才会起作用,直到线程池中的线程数不大于 corePoolSize ,即当线程池中的线程数大于 corePoolSize 时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了 allowCoreThreadTimeOut(boolean) 方法,在线程池中的线程数不大于 corePoolSize 时, keepAliveTime 参数也会起作用,直到线程池中的线程数为0;
  • unit:参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性:

    1. TimeUnit.DAYS
    2. TimeUnit.HOURS
    3. TimeUnit.MINUTES
    4. TimeUnit.SECONDS
    5. TimeUnit.MILLISECONDS
    6. TimeUnit.MICROSECONDS
    7. TimeUnit.NANOSECONDS

    workQueue:一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,这里的阻塞队列有以下几种选择:( ArrayBlockingQueue和PriorityBlockingQueue 使用较少,一般使用 LinkedBlockingQueueSynchronous )。一般来说,线程池的排队策略与 BlockingQueue 有:

    1. ArrayBlockingQueue
    2. LinkedBlockingQueue
    3. SynchronousQueue
  • threadFactory:线程工厂,主要用来创建线程;

  • handler:表示当拒绝处理任务时的策略,有以下四种取值:
  1. ThreadPoolExecutor.AbortPolicy :丢弃任务并抛出 RejectedExecutionException 异常【默认】。
  2. ThreadPoolExecutor.DiscardPolicy :也是丢弃任务,但是不抛出异常。
  3. ThreadPoolExecutor.DiscardOldestPolicy :丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)。
  4. ThreadPoolExecutor.CallerRunsPolicy :由调用线程处理该任务。

3. 提交任务

  • execute(Runnable) :无返回值任务
  • submit(Runnable) :有返回值任务,返回一个 Future<Object> 对象,基于 get() 阻塞直到获取返回值。

需要注意的是, submit 具有三种构建方式:

  1. <T> Future<T> submit(Callable<T> task)
  2. <T> Future<T> submit(Runnable task, T result)
  3. Future<?> submit(Runnable task)

其中,前两种具有返回值,而最后一种不具备返回值(返回Null)。

4. 关闭线程池

  • shutdown() :将线程池状态设为 SHUTDOWN 状态,中断非正在执行的线程。
  • shutdownNow() :将线程池状态设为 STOP 状态,尝试停止所有正在执行或暂停的线程,并返回等待执行任务的列表。

一些差异和细节:

  1. isShutdown() 方法在所有任务都已关闭后才会返回 true
  2. 无论是shutdown()还是shutdownNow(),都是遍历工作线程并调用线程的 interrupt() 方法,所以不响应中断的任务可能永远无法停止。

    5. 线程池监控

  3. taskCount :线程池所需要执行的任务数量(历史所有)。

  4. completedTaskCount :运行过程中已完成的任务数量。
  5. largestPoolSize :线程池曾经创建过的最大线程数量。
  6. getPoolSize :当前线程池的线程数量。
  7. getActiveCount :当前活动的线程数量。

使用举例

  1. 调用execute(Runnable) : ```java public static void main(String[] args){ ThreadPoolExecutor pool = new ThreadPoolExecutor(3, 5, 10, TimeUnit.MINUTES, new ArrayBlockingQueue<>(10)); // 最多支持同时提交 10+5 个任务 for(int i=0;i<15;i++){
    1. pool.execute(new R(2000, "Task"+i));
    } }

static class R implements Runnable{ private int sleepTime; private String task; R(int sleepTime, String task){ this.sleepTime = sleepTime; this.task = task; }

  1. @Override
  2. public void run() {
  3. try{
  4. Thread.sleep(this.sleepTime);
  5. System.out.println(this.task + " Done");
  6. } catch (InterruptedException ignore){}
  7. }

}

  1. 2. 调用 `submit(Runnable, Data)`
  2. ```java
  3. public static void main(String[] args){
  4. ThreadPoolExecutor pool = new ThreadPoolExecutor(3, 5, 10, TimeUnit.MINUTES, new ArrayBlockingQueue<>(10)); // 最多支持同时提交 10+5 个任务
  5. for(int i=0;i<15;i++){
  6. Data res = new Data(); // 构建返回值
  7. final Future<Data> f = pool.submit(new R(2000, "Task"+i, res), res);
  8. new Thread(() -> {
  9. try{ System.out.println(f.get().getResult()); } catch (Exception ignore) {}
  10. }).start();
  11. }
  12. }
  13. static class R implements Runnable{
  14. private int sleepTime;
  15. private String task;
  16. private Data res;
  17. R(int sleepTime, String task, Data res){ this.sleepTime = sleepTime; this.task = task; this.res = res; }
  18. @Override
  19. public void run() {
  20. try{
  21. Thread.sleep(this.sleepTime);
  22. System.out.println(this.task + " Done");
  23. this.res.setResult(this.task + " Done Result");
  24. } catch (InterruptedException ignore){}
  25. }
  26. }
  27. static class Data{
  28. private String result;
  29. public void setResult(String result){ this.result = result; }
  30. public String getResult() { return result; }
  31. }

但是更推荐调用 submit(Callable)

  1. public static void main(String[] args){
  2. ThreadPoolExecutor pool =
  3. new ThreadPoolExecutor(3, 5, 10, TimeUnit.MINUTES,
  4. new ArrayBlockingQueue<>(10)); // 最多支持同时提交 10+5 个任务
  5. for(int i=0;i<15;i++){
  6. final Future<String> f = pool.submit(new R(2000, "Task"+i));
  7. new Thread(() -> {
  8. try{
  9. System.out.println(f.get());
  10. } catch (Exception ignore){}
  11. }).start();
  12. }
  13. }
  14. static class R implements Callable<String> {
  15. private int sleepTime;
  16. private String task;
  17. R(int sleepTime, String task){ this.sleepTime = sleepTime; this.task = task; }
  18. @Override
  19. public String call() {
  20. try{
  21. Thread.sleep(this.sleepTime);
  22. System.out.println(this.task + " Done");
  23. } catch (InterruptedException ignore){}
  24. return this.task + " Done Result";
  25. }
  26. }