线程池的好处:
- 降低资源消耗——重复利用已创建的线程避免创建\销毁开销。
- 提高响应速度——任务到达时无需等待线程创建即可运行。
- 提高线程的可管理性——统一调配、调优和监控,提高系统资源利用率和稳定性。
1. 线程池实现原理
处理流程(下图中,右【是】下【否】):
- 如果当前运行线程少于
corePoolSize
则创建新线程来执行任务(需要获取全局锁)。 - 否则,将任务加入
BlockingQueue
。 - 如果阻塞队列已满,则创建新的工作线程(Worker)来处理任务(需要获取全局锁)。
- 如果超过
maximumPoolSize
则调用RejectedExecutionHandler.rejectedExecution()
方法进行处理。
2. 线程池的使用
构造器:
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
下面解释下一下构造器中各个参数的含义:
- corePoolSize:核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了
prestartAllCoreThreads()
或者prestartCoreThread()
方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize
个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize
后,就会把到达的任务放到缓存队列当中; - maximumPoolSize:线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程;
- keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于
corePoolSize
时,keepAliveTime
才会起作用,直到线程池中的线程数不大于corePoolSize
,即当线程池中的线程数大于corePoolSize
时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)
方法,在线程池中的线程数不大于corePoolSize
时,keepAliveTime
参数也会起作用,直到线程池中的线程数为0; unit:参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性:
TimeUnit.DAYS
TimeUnit.HOURS
TimeUnit.MINUTES
TimeUnit.SECONDS
TimeUnit.MILLISECONDS
TimeUnit.MICROSECONDS
TimeUnit.NANOSECONDS
workQueue:一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,这里的阻塞队列有以下几种选择:(
ArrayBlockingQueue和PriorityBlockingQueue
使用较少,一般使用LinkedBlockingQueue
和Synchronous
)。一般来说,线程池的排队策略与BlockingQueue
有:ArrayBlockingQueue
LinkedBlockingQueue
SynchronousQueue
threadFactory:线程工厂,主要用来创建线程;
- handler:表示当拒绝处理任务时的策略,有以下四种取值:
ThreadPoolExecutor.AbortPolicy
:丢弃任务并抛出RejectedExecutionException
异常【默认】。ThreadPoolExecutor.DiscardPolicy
:也是丢弃任务,但是不抛出异常。ThreadPoolExecutor.DiscardOldestPolicy
:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)。ThreadPoolExecutor.CallerRunsPolicy
:由调用线程处理该任务。
3. 提交任务
execute(Runnable)
:无返回值任务submit(Runnable)
:有返回值任务,返回一个Future<Object>
对象,基于get()
阻塞直到获取返回值。
需要注意的是, submit
具有三种构建方式:
<T> Future<T> submit(Callable<T> task)
<T> Future<T> submit(Runnable task, T result)
Future<?> submit(Runnable task)
其中,前两种具有返回值,而最后一种不具备返回值(返回Null)。
4. 关闭线程池
shutdown()
:将线程池状态设为SHUTDOWN
状态,中断非正在执行的线程。shutdownNow()
:将线程池状态设为STOP
状态,尝试停止所有正在执行或暂停的线程,并返回等待执行任务的列表。
一些差异和细节:
isShutdown()
方法在所有任务都已关闭后才会返回true
。无论是
shutdown()
还是shutdownNow()
,都是遍历工作线程并调用线程的interrupt()
方法,所以不响应中断的任务可能永远无法停止。5. 线程池监控
taskCount
:线程池所需要执行的任务数量(历史所有)。completedTaskCount
:运行过程中已完成的任务数量。largestPoolSize
:线程池曾经创建过的最大线程数量。getPoolSize
:当前线程池的线程数量。getActiveCount
:当前活动的线程数量。
使用举例
- 调用
execute(Runnable)
: ```java public static void main(String[] args){ ThreadPoolExecutor pool = new ThreadPoolExecutor(3, 5, 10, TimeUnit.MINUTES, new ArrayBlockingQueue<>(10)); // 最多支持同时提交 10+5 个任务 for(int i=0;i<15;i++){
} }pool.execute(new R(2000, "Task"+i));
static class R implements Runnable{ private int sleepTime; private String task; R(int sleepTime, String task){ this.sleepTime = sleepTime; this.task = task; }
@Override
public void run() {
try{
Thread.sleep(this.sleepTime);
System.out.println(this.task + " Done");
} catch (InterruptedException ignore){}
}
}
2. 调用 `submit(Runnable, Data)` :
```java
public static void main(String[] args){
ThreadPoolExecutor pool = new ThreadPoolExecutor(3, 5, 10, TimeUnit.MINUTES, new ArrayBlockingQueue<>(10)); // 最多支持同时提交 10+5 个任务
for(int i=0;i<15;i++){
Data res = new Data(); // 构建返回值
final Future<Data> f = pool.submit(new R(2000, "Task"+i, res), res);
new Thread(() -> {
try{ System.out.println(f.get().getResult()); } catch (Exception ignore) {}
}).start();
}
}
static class R implements Runnable{
private int sleepTime;
private String task;
private Data res;
R(int sleepTime, String task, Data res){ this.sleepTime = sleepTime; this.task = task; this.res = res; }
@Override
public void run() {
try{
Thread.sleep(this.sleepTime);
System.out.println(this.task + " Done");
this.res.setResult(this.task + " Done Result");
} catch (InterruptedException ignore){}
}
}
static class Data{
private String result;
public void setResult(String result){ this.result = result; }
public String getResult() { return result; }
}
但是更推荐调用 submit(Callable)
:
public static void main(String[] args){
ThreadPoolExecutor pool =
new ThreadPoolExecutor(3, 5, 10, TimeUnit.MINUTES,
new ArrayBlockingQueue<>(10)); // 最多支持同时提交 10+5 个任务
for(int i=0;i<15;i++){
final Future<String> f = pool.submit(new R(2000, "Task"+i));
new Thread(() -> {
try{
System.out.println(f.get());
} catch (Exception ignore){}
}).start();
}
}
static class R implements Callable<String> {
private int sleepTime;
private String task;
R(int sleepTime, String task){ this.sleepTime = sleepTime; this.task = task; }
@Override
public String call() {
try{
Thread.sleep(this.sleepTime);
System.out.println(this.task + " Done");
} catch (InterruptedException ignore){}
return this.task + " Done Result";
}
}