ThreadPoolExecutor是JDK默认的线程池实现类。

构造函数

  1. public ThreadPoolExecutor(int corePoolSize,
  2. int maximumPoolSize,
  3. long keepAliveTime,
  4. TimeUnit unit,
  5. BlockingQueue<Runnable> workQueue,
  6. ThreadFactory threadFactory,
  7. RejectedExecutionHandler handler) {
  8. if (corePoolSize < 0 ||
  9. maximumPoolSize <= 0 ||
  10. maximumPoolSize < corePoolSize ||
  11. keepAliveTime < 0)
  12. throw new IllegalArgumentException();
  13. if (workQueue == null || threadFactory == null || handler == null)
  14. throw new NullPointerException();
  15. this.acc = System.getSecurityManager() == null ?
  16. null :
  17. AccessController.getContext();
  18. this.corePoolSize = corePoolSize;
  19. this.maximumPoolSize = maximumPoolSize;
  20. this.workQueue = workQueue;
  21. this.keepAliveTime = unit.toNanos(keepAliveTime);
  22. this.threadFactory = threadFactory;
  23. this.handler = handler;
  24. }

其余的构造函数也是调用该它,只是使用了默认的参数值。

构造函数入参

  • corePoolSize:最大核心线程数量
  • maximumPoolSize:最大线程数量
  • keepAliveTime:最大空闲时间
  • unit:空闲时间单位
  • workQueue:等待队列
  • threadFactory:线程工厂
  • handler:拒绝策略

每个新任务都要使用一个线程执行。当没有空闲线程时,如果当前线程池中线程数量没有超过最大线程数量,则会使用线程工厂新创建一个线程加入线程池来执行任务。任务执行完成后,线程空闲。当存在空闲线程时,如果线程池中线程数量超过最大核心线程数量,则会等待。当等待时间超过最大空闲时间(空闲时间单位)后,超出最大核心线程数量的空闲线程数量会被释放。新来一个任务,但线程池中线程数量为最大线程数量,则将新任务放入等待队列。当有空闲线程时,根据等待队列中的优先规则,执行队列中保存的任务。当等待队列也存满之后,则会调用拒绝策略。
PS:这只是线程池大概的执行过程,是为了说明这几个参数的作用,而不是线程池实际的执行逻辑。

四种线程池

JDK还提供了所谓的四种线程池。实际上它们底层都使用的是ThreadPoolExecutor,只是各种参数不对,主要是等待队列不同。它们分别针对不同的应用场景。详情介绍在这里

实践使用

任务提交

ThreadPoolExecutor中提交任务有两种方法,execute和submit

execute方法

如果只需要执行任务,不需要知道任务返回值,则使用execute方法(当然也可以用submit方法,但execute性能上会好很多)

  1. ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>(16));
  2. Runnable task = () -> {
  3. int x = 1;
  4. int y = 2;
  5. int result = x + y;
  6. System.out.println("执行计算任务,结果是 " + result);
  7. };
  8. threadPoolExecutor.execute(task);

submit方法

submit有三个重载方法。

  1. public Future<?> submit(Runnable task) {
  2. if (task == null) throw new NullPointerException();
  3. RunnableFuture<Void> ftask = newTaskFor(task, null);
  4. execute(ftask);
  5. return ftask;
  6. }
  7. public <T> Future<T> submit(Runnable task, T result) {
  8. if (task == null) throw new NullPointerException();
  9. RunnableFuture<T> ftask = newTaskFor(task, result);
  10. execute(ftask);
  11. return ftask;
  12. }
  13. public <T> Future<T> submit(Callable<T> task) {
  14. if (task == null) throw new NullPointerException();
  15. RunnableFuture<T> ftask = newTaskFor(task);
  16. execute(ftask);
  17. return ftask;
  18. }

可以看到三种submit都是new了一个ftask然后执行。其中的execute方法,就是前面提到的execute。而newTaskFor也有两种重载方法。

    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }

    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }

所以submit实际上是execute了FutureTask(FutureTask是啥?点这里)。

第一种submit

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>(16));
Runnable task = () -> {
    int x = 1;
    int y = 2;
    int result = x + y;
    System.out.println("执行计算任务,结果是 " + result);
    try {
        Thread.sleep(10000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
};
Future x = threadPoolExecutor.submit(task);
try {
    Object obj = x.get();
    System.out.println(obj == null);
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}
threadPoolExecutor.shutdown();
// 执行结果
执行计算任务,结果是 3
// 等待十秒
true

Runnable接口是没有返回值的,因此即便封装到FutrueTask中,future中get到的也是null。
其中,future的get方法会一直阻塞直到任务执行完成或者抛出异常
注意最后一定要关掉线程池,否则java程序不会退出

第二种submit

public class Result {
    private int result;

    public int getResult() {
        return result;
    }

    public void setResult(int result) {
        this.result = result;
    }
}

public class Task implements Runnable{
    private Result result;

    public Task(Result result) {
        this.result = result;
    }

    @Override
    public void run() {
        int x = 1;
        int y = 2;
        result.setResult(x + y);
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>(16));
Runnable task = () -> {
    int x = 1;
    int y = 2;
    int result = x + y;
    System.out.println("执行计算任务,结果是 " + result);
    try {
        Thread.sleep(10000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
};
Result result = new Result();
Future<Result> x = threadPoolExecutor.submit(new Task(result), result);
try {
    Result calculateResult = x.get();
    System.out.println(calculateResult.getResult());
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}
threadPoolExecutor.shutdown();

// 执行结果
// 等待10秒后
3

第三种submit

Callable<Integer> task = () -> {
    int x = 1;
    int y = 2;
    int result = x + y;
    System.out.println("执行计算任务,结果是 " + result);
    try {
        Thread.sleep(10000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return result;
};
Future<Integer> x = threadPoolExecutor.submit(task);
try {
    Integer calculateResult = x.get();
    System.out.println(calculateResult);
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}
threadPoolExecutor.shutdown();

// 执行结果
// 等待10秒后
3