创建线程池七个参数

参数 解释 示例
int corePoolSize 核心线程的数量 2
int maximumPoolSize 最大线程的数量 5
long keepAliveTime 当前线程数量超过corePoolSize时,空闲线程的存活时间 3
TimeUnit unit 存活时间单位 TimeUnit.SECONDS
BlockingQueue workQueue 阻塞队列 new LinkedBlockingDeque<>(3)
ThreadFactory threadFactory 线程工厂 Executors.defaultThreadFactory()
RejectedExecutionHandler handle 拒绝策略 new ThreadPoolExecutor.AbortPolicy()

最大接收线程 maximumPoolSize + workQueue.size()

  1. public ThreadPoolExecutor(int corePoolSize,
  2. int maximumPoolSize,
  3. long keepAliveTime,
  4. TimeUnit unit,
  5. BlockingQueue<Runnable> workQueue,
  6. ThreadFactory threadFactory,
  7. RejectedExecutionHandler handler) {
  8. if (corePoolSize < 0 ||
  9. maximumPoolSize <= 0 ||
  10. maximumPoolSize < corePoolSize ||
  11. keepAliveTime < 0)
  12. throw new IllegalArgumentException();
  13. if (workQueue == null || threadFactory == null || handler == null)
  14. throw new NullPointerException();
  15. this.acc = System.getSecurityManager() == null ?
  16. null :
  17. AccessController.getContext();
  18. this.corePoolSize = corePoolSize;
  19. this.maximumPoolSize = maximumPoolSize;
  20. this.workQueue = workQueue;
  21. this.keepAliveTime = unit.toNanos(keepAliveTime);
  22. this.threadFactory = threadFactory;
  23. this.handler = handler;
  24. }

使用示例:

  1. public class TestThreadPool {
  2. private static final int CORE_POOL_SIZE = 5;
  3. private static final int MAX_POOL_SIZE = 10;
  4. private static final int QUEUE_CAPACITY = 100;
  5. private static final Long KEEP_ALIVE_TIME = 1L;
  6. public void testPool() {
  7. ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(
  8. CORE_POOL_SIZE,
  9. MAX_POOL_SIZE,
  10. KEEP_ALIVE_TIME,
  11. TimeUnit.SECONDS,
  12. new ArrayBlockingQueue<>(QUEUE_CAPACITY),
  13. new ThreadPoolExecutor.CallerRunsPolicy());
  14. for (int i = 0; i < 10; i++) {
  15. poolExecutor.execute(() -> {
  16. try {
  17. Thread.sleep(2000);
  18. } catch (InterruptedException e) {
  19. e.printStackTrace();
  20. }
  21. System.out.println("CurrentThread name:" + Thread.currentThread().getName() + "date:" + Instant.now());
  22. });
  23. }
  24. //终止线程池
  25. poolExecutor.shutdown();
  26. try {
  27. poolExecutor.awaitTermination(5, TimeUnit.SECONDS);
  28. } catch (InterruptedException e) {
  29. e.printStackTrace();
  30. }
  31. System.out.println("Finished all threads");
  32. }
  33. }

Java 中的线程池相关在 java.util.concurrent 包中 ,核心类有:
Executor、ExecutorService、ThreadPoolExecutor
image.pngimage.png

如何定义最大线程

  • CPU 密集型任务:服务器有有X核,最大线程就是X。
    • Runtime.getRuntime().availableProcessors()

  • IO 密集型任务:也就是大部分状况是 CPU 在等 IO,此时最大线程可以设置为 IO 任务数*2

线程池工作流程

  • 如果正在运行的线程数量少于 corePoolSize ,线程池会立刻创建线程并执行该任务。
  • 如果正在运行的线程数量大于等于 corePoolSize,该任务就会被放入阻塞队列中。
  • 在阻塞队列已满且正在运行的线程数量少于 maximumPoolSize 时,线程池会创建非核心线程,立刻执行该线程任务。
  • 在阻塞队列已满且正在运行的线程数量大于等于 maximumPoolSize 时,线程池会拒绝执行该线程任务,并抛出 RejectExecutionException 异常。
  • 在线程任务执行完毕后,线程池队列会将该任务移除,然后线程池从队列中取下一个线程任务执行。
  • 线程处于空闲状态的时间超过 keepAliveTime 时,如果正在运行的线程数超过 corePoolSize,该线程会被认定为空闲线程并停止。因此,在线程任务都执行完毕后,线程池会收缩到 corePoolSize 大小。

image.png

线程池四种拒绝策略

Java 线程池默认的拒绝策略是 AbortPolicy,直接抛出异常,阻止线程正常运行

private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();

Java 有4中内置拒绝策略

AbortPolicy

直接抛出异常,阻止线程正常运行

/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    throw new RejectedExecutionException("Task " + r.toString() +
                                         " rejected from " +
                                         e.toString());
}

CallerRunsPolicy

不使用线程池,任务交给调用方线程,例如 main 线程执行

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    if (!e.isShutdown()) {
        r.run();
    }
}

DiscardPolicy

直接丢弃任务,不做其他处理

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { }

DiscardOldestPolicy

丢弃队列中等待时间最久的任务,然后重新提交当前任务

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    if (!e.isShutdown()) {
        e.getQueue().poll();
        e.execute(r);
    }
}

线程池异常处理

try-catch 捕获异常

submit 执行,Future.get 接收异常

重写 ThreadPoolExecutor.afterExecute,处理传递的异常引用

自定义 ThreadFactory,设置 Thread.UncaughtExceptionHandler 处理未检测的异常

5种线程池(要使用ThreadPoolExecutor创建线程池)

Java 定义了 Executor 接口并在该接口中定义了 execute() 用于执行一个线程任务,然后通过 ExecutorService 实现 Executor 接口并执行具体的线程操作。
ExecutorService 有多个实现类,可以创建不同的线程池:

名称 说明
newCachedThreadPool 可缓存的线程池,可伸缩
newFixedThreadPool 固定大小的线程池
newScheduledThreadPool 可做任务调度的线程池
newSingleThreadExecutor 单个线程的线程池
newWorkStealingPool 足够大小的线程池(1.8新增)

newCachedThreadPool

缓存线程池
创建线程时如果有可重用的线程,则重用它们,否则重新创建一个新的线程并将其添加到线程池中。
适合执行时间很短的大量任务需求。
在线程池的 keepAliveTime 时间超过默认的 60s 后,该线程会被终止,并从缓存中移除,因此,在没有线程任务运行时,newCachedThreadPool 将不会占用系统的线程资源。

newFixedThreadPool

固定大小的线程池,并将线程资源存放在队列中循环使用。
若处于活动状态的线程数量大于等于核心线程池的数量,则新提交的任务将在阻塞队列中排队,直到有可用的线程资源。

newScheduledThreadPool

可定时调度的线程池。
可设置在给定的延迟时间后或者定期执行某个线程任务

ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);
// 创建一个延迟 3s 执行的线程
scheduledExecutorService.schedule(new Runnable() {
    @Override
    public void run() {
        System.out.println("delay 3s execu.");
    }
}, 3, TimeUnit.SECONDS);

// 创建一个延迟 1s 执行且每 3s 执行一次的线程
scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        System.out.println("delay 1s, repeat execute every 3s");
    }
}, 1, 3, TimeUnit.SECONDS);

输出:
delay 1s, repeat execute every 3s
delay 3s execu.
delay 1s, repeat execute every 3s
delay 1s, repeat execute every 3s
delay 1s, repeat execute every 3s
...

newSingleThreadExecutor

有且只有一个可用线程,该线程停止或一场时,会启动一个新的线程代替该线程继续执行任务。

newWorkStealingPool

创建含有足够多线程的线程池,它会通过工作窃取的方式,使多核 CPU 不会闲置。

足够多就是最大程度的使用系统资源,提高并发效率。
工作窃取指(Work Stealing):闲置的线程会去处理原本不属于它的任务。

每个处理器核,都有一个任务队列,当一个核对应的任务处理完毕时,可以帮助处理其他核的队列任务。