ThreadPoolExecutor是JDK默认的线程池实现类。
构造函数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
构造函数入参
- corePoolSize:最大核心线程数量
- maximumPoolSize:最大线程数量
- keepAliveTime:最大空闲时间
- unit:空闲时间单位
- workQueue:等待队列
- threadFactory:线程工厂
- handler:拒绝策略
每个新任务都要使用一个线程执行。当没有空闲线程时,如果当前线程池中线程数量没有超过最大线程数量,则会使用线程工厂新创建一个线程加入线程池来执行任务。任务执行完成后,线程空闲。当存在空闲线程时,如果线程池中线程数量超过最大核心线程数量,则会等待。当等待时间超过最大空闲时间(空闲时间单位)后,超出最大核心线程数量的空闲线程数量会被释放。新来一个任务,但线程池中线程数量为最大线程数量,则将新任务放入等待队列。当有空闲线程时,根据等待队列中的优先规则,执行队列中保存的任务。当等待队列也存满之后,则会调用拒绝策略。
PS:这只是线程池大概的执行过程,是为了说明这几个参数的作用,而不是线程池实际的执行逻辑。
四种线程池
JDK还提供了所谓的四种线程池。实际上它们底层都使用的是ThreadPoolExecutor,只是各种参数不对,主要是等待队列不同。它们分别针对不同的应用场景。详情介绍在这里。
实践使用
任务提交
ThreadPoolExecutor中提交任务有两种方法,execute和submit
execute方法
如果只需要执行任务,不需要知道任务返回值,则使用execute方法(当然也可以用submit方法,但execute性能上会好很多)
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>(16));
Runnable task = () -> {
int x = 1;
int y = 2;
int result = x + y;
System.out.println("执行计算任务,结果是 " + result);
};
threadPoolExecutor.execute(task);
submit方法
submit有三个重载方法。
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
可以看到三种submit都是new了一个ftask然后执行。其中的execute方法,就是前面提到的execute。而newTaskFor也有两种重载方法。
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
所以submit实际上是execute了FutureTask(FutureTask是啥?点这里)。
第一种submit
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>(16));
Runnable task = () -> {
int x = 1;
int y = 2;
int result = x + y;
System.out.println("执行计算任务,结果是 " + result);
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
};
Future x = threadPoolExecutor.submit(task);
try {
Object obj = x.get();
System.out.println(obj == null);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
threadPoolExecutor.shutdown();
// 执行结果
执行计算任务,结果是 3
// 等待十秒
true
Runnable接口是没有返回值的,因此即便封装到FutrueTask中,future中get到的也是null。
其中,future的get方法会一直阻塞直到任务执行完成或者抛出异常
注意最后一定要关掉线程池,否则java程序不会退出
第二种submit
public class Result {
private int result;
public int getResult() {
return result;
}
public void setResult(int result) {
this.result = result;
}
}
public class Task implements Runnable{
private Result result;
public Task(Result result) {
this.result = result;
}
@Override
public void run() {
int x = 1;
int y = 2;
result.setResult(x + y);
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>(16));
Runnable task = () -> {
int x = 1;
int y = 2;
int result = x + y;
System.out.println("执行计算任务,结果是 " + result);
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
};
Result result = new Result();
Future<Result> x = threadPoolExecutor.submit(new Task(result), result);
try {
Result calculateResult = x.get();
System.out.println(calculateResult.getResult());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
threadPoolExecutor.shutdown();
// 执行结果
// 等待10秒后
3
第三种submit
Callable<Integer> task = () -> {
int x = 1;
int y = 2;
int result = x + y;
System.out.println("执行计算任务,结果是 " + result);
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return result;
};
Future<Integer> x = threadPoolExecutor.submit(task);
try {
Integer calculateResult = x.get();
System.out.println(calculateResult);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
threadPoolExecutor.shutdown();
// 执行结果
// 等待10秒后
3