线程池的好处:
- 降低资源消耗——重复利用已创建的线程避免创建\销毁开销。
- 提高响应速度——任务到达时无需等待线程创建即可运行。
- 提高线程的可管理性——统一调配、调优和监控,提高系统资源利用率和稳定性。
1. 线程池实现原理
处理流程(下图中,右【是】下【否】):
- 如果当前运行线程少于
corePoolSize则创建新线程来执行任务(需要获取全局锁)。 - 否则,将任务加入
BlockingQueue。 - 如果阻塞队列已满,则创建新的工作线程(Worker)来处理任务(需要获取全局锁)。
- 如果超过
maximumPoolSize则调用RejectedExecutionHandler.rejectedExecution()方法进行处理。
2. 线程池的使用
构造器:
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue);public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
下面解释下一下构造器中各个参数的含义:
- corePoolSize:核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了
prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中; - maximumPoolSize:线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程;
- keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于
corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0; unit:参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性:
TimeUnit.DAYSTimeUnit.HOURSTimeUnit.MINUTESTimeUnit.SECONDSTimeUnit.MILLISECONDSTimeUnit.MICROSECONDSTimeUnit.NANOSECONDS
workQueue:一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,这里的阻塞队列有以下几种选择:(
ArrayBlockingQueue和PriorityBlockingQueue使用较少,一般使用LinkedBlockingQueue和Synchronous)。一般来说,线程池的排队策略与BlockingQueue有:ArrayBlockingQueueLinkedBlockingQueueSynchronousQueue
threadFactory:线程工厂,主要用来创建线程;
- handler:表示当拒绝处理任务时的策略,有以下四种取值:
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常【默认】。ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)。ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务。
3. 提交任务
execute(Runnable):无返回值任务submit(Runnable):有返回值任务,返回一个Future<Object>对象,基于get()阻塞直到获取返回值。
需要注意的是, submit 具有三种构建方式:
<T> Future<T> submit(Callable<T> task)<T> Future<T> submit(Runnable task, T result)Future<?> submit(Runnable task)
其中,前两种具有返回值,而最后一种不具备返回值(返回Null)。
4. 关闭线程池
shutdown():将线程池状态设为SHUTDOWN状态,中断非正在执行的线程。shutdownNow():将线程池状态设为STOP状态,尝试停止所有正在执行或暂停的线程,并返回等待执行任务的列表。
一些差异和细节:
isShutdown()方法在所有任务都已关闭后才会返回true。无论是
shutdown()还是shutdownNow(),都是遍历工作线程并调用线程的interrupt()方法,所以不响应中断的任务可能永远无法停止。5. 线程池监控
taskCount:线程池所需要执行的任务数量(历史所有)。completedTaskCount:运行过程中已完成的任务数量。largestPoolSize:线程池曾经创建过的最大线程数量。getPoolSize:当前线程池的线程数量。getActiveCount:当前活动的线程数量。
使用举例
- 调用
execute(Runnable): ```java public static void main(String[] args){ ThreadPoolExecutor pool = new ThreadPoolExecutor(3, 5, 10, TimeUnit.MINUTES, new ArrayBlockingQueue<>(10)); // 最多支持同时提交 10+5 个任务 for(int i=0;i<15;i++){
} }pool.execute(new R(2000, "Task"+i));
static class R implements Runnable{ private int sleepTime; private String task; R(int sleepTime, String task){ this.sleepTime = sleepTime; this.task = task; }
@Overridepublic void run() {try{Thread.sleep(this.sleepTime);System.out.println(this.task + " Done");} catch (InterruptedException ignore){}}
}
2. 调用 `submit(Runnable, Data)` :```javapublic static void main(String[] args){ThreadPoolExecutor pool = new ThreadPoolExecutor(3, 5, 10, TimeUnit.MINUTES, new ArrayBlockingQueue<>(10)); // 最多支持同时提交 10+5 个任务for(int i=0;i<15;i++){Data res = new Data(); // 构建返回值final Future<Data> f = pool.submit(new R(2000, "Task"+i, res), res);new Thread(() -> {try{ System.out.println(f.get().getResult()); } catch (Exception ignore) {}}).start();}}static class R implements Runnable{private int sleepTime;private String task;private Data res;R(int sleepTime, String task, Data res){ this.sleepTime = sleepTime; this.task = task; this.res = res; }@Overridepublic void run() {try{Thread.sleep(this.sleepTime);System.out.println(this.task + " Done");this.res.setResult(this.task + " Done Result");} catch (InterruptedException ignore){}}}static class Data{private String result;public void setResult(String result){ this.result = result; }public String getResult() { return result; }}
但是更推荐调用 submit(Callable) :
public static void main(String[] args){ThreadPoolExecutor pool =new ThreadPoolExecutor(3, 5, 10, TimeUnit.MINUTES,new ArrayBlockingQueue<>(10)); // 最多支持同时提交 10+5 个任务for(int i=0;i<15;i++){final Future<String> f = pool.submit(new R(2000, "Task"+i));new Thread(() -> {try{System.out.println(f.get());} catch (Exception ignore){}}).start();}}static class R implements Callable<String> {private int sleepTime;private String task;R(int sleepTime, String task){ this.sleepTime = sleepTime; this.task = task; }@Overridepublic String call() {try{Thread.sleep(this.sleepTime);System.out.println(this.task + " Done");} catch (InterruptedException ignore){}return this.task + " Done Result";}}
