4种线程池

Executor:Java里面线程池的顶级接口是 Executor,但是严格意义上讲 Executor 并不是一个线程池,而只是一个执行线程的工具。真正的线程池接口是ExecutorService。

  1. newCachedThreadPool:创建一个可缓存的线程池,如果线程可用,则调用 execute 将重用以前构造的线程。如果现有线程没有可用的,则创建一个新线程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。因此,长时间保持空闲的线程池不会使用任何资源。创建的都是救急线程
    1. 缺点:SynchronousQueue 是一个不存储元素的队列,可以理解为队里永远是满的,因此会最终创建非核心线程来执行任务,对于非核心线程空闲60s时将被回收。因为Integer.MAX_VALUE非常大,可以认为是无限创建线程的,在资源有限的情况下容易一起OOM;
  2. newFixedThreadPool:创建一个固定线程数的线程池,如果在所有线程处于活动状态时提交附加任务,则在有可用线程之前,附加任务将在队列中等待。如果在关闭前的执行期间由于失败而导致任何线程终止,那么一个新线程将代替它执行后续的任务(如果需要)。在某个线程被显式地关闭之前,池中的线程将一直存在。核心线程数==最大线程数,没有救急线程被创建。
    1. 缺点:LinkedBlockingQueue阻塞队列没有设置大小,默认容易打爆内存Integer.MAX_VALUE,可以无限存放任务,容易发生OOM;
  3. newScheduledThreadPool:创建一个线程池,它可安排在给定延迟后运行命令或者定期地执行。
  4. newSingleThreadExecutor: Executors.newSingleThreadExecutor()返回一个线程池(这个线程池只有一个线程),这个线程池可以在线程死后(或发生异常时)重新启动一个线程来替代原来的线程继续执行下去!
    1. 缺点:LinkedBlockingQueue阻塞队列没有设置大小,默认容易打爆内存Integer.MAX_VALUE,可以无限存放任务,容易发生OOM;
      1. Executor executor = Executors.newCachedThreadPool();
      2. ExecutorService executorService = Executors.newFixedThreadPool(int nThreads);
      3. ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(int corePoolSize);
      4. ExecutorService executorService = Executors.newSingleThreadExecutor();

      线程池状态

      ThreadPoolExecutor 使用 int 的高 3 位来表示线程池状态,低 29 位表示线程数量;
状态名 高3位 接受新任务 处理阻塞队列任务 说明
RUNING 111 Y Y
SHUTDOWN 000 N Y 不会接受新任务,但会处理阻塞队列剩余任务
STOP 001 N N 会中断正在执行的任务,并抛弃阻塞队列任务
TIDYING 010 - - 任务全执行完毕,活动线程为0即将进入终结
TERMINATED 011 - - 终结状态

ThreadPoolExecutor构造方法

  1. // ThreadPoolExecutor 类的构造方法
  2. public ThreadPoolExecutor(int corePoolSize,
  3. int maximumPoolSize,
  4. long keepAliveTime,
  5. TimeUnit unit,
  6. BlockingQueue<Runnable> workQueue,
  7. ThreadFactory threadFactory,
  8. RejectedExecutionHandler handler) {}
  • corePoolSize:核心线程数目(最多保留的线程数)
  • maximumPoolSize:最大线程数目
  • keepAliveTime:生存时间,针对救急线程
  • unit:时间单位,针对救急线程
  • workQueue:阻塞队列
  • threadFactory:线程工厂,创建线程,为线程起名字
  • handler:拒绝策略

    步骤

  1. 线程池中刚开始没有线程,当一个任务提交给线程池后,线程池会创建一个线程来执行任务。
  2. 当前线程数达到 corePoolSize 并且没有线程空闲,这时再加入任务,新加的任务被加入 workQueue 队列排队,直到有空闲的线程。
  3. 如果队列选择了有界队列,那么任务超过了队列大小时,会创建 maximumPoolSize - corePoolSize 数目的线程来救急。
  4. 如果线程达到 maximumPoolSize仍有新任务,这时会执行拒绝策略。JDK 提供了4种拒绝策略,4种拒绝策略实现接口 RejectedExecutionHandler
    1. AbortPolicy:让调用者抛出 RejectedExecutionException 异常,也是默认策略
    2. CallerRunsPolicy:让调用者运行任务
    3. DiscardPolicy:放弃本次任务
    4. DiscardOldestPolicy:放弃队列中最早的任务,本任务取而代之
  5. 当高峰过去后,超过的corePoolSize 的救急线程如果一段时间没有任务做,需要结束节省资源,这个时间由 keepAliveTime 和 unit 控制

    提交任务给线程池

    1. ExecutorService executorService = Executors.newFixedThreadPool(10);
    2. // 执行任务
    3. void execute(Runnable command);
    4. // 提交任务,用返回值 Future 获取任务执行结果
    5. <T> Future<T> submit(Callable<T> task);
    6. <T> Future<T> submit(Runnable task, T result);
    7. Future<?> submit(Runnable task);
    8. // 提交 task 中所有任务
    9. <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks);
    10. // 提交 task 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其他任务取消
    11. executorService.invokeAny();

    shutdown 方法

    shutdown 将线程池状态变为 SHUTDOWN 状态
  • 不会接受新任务
  • 但已提交任务会执行完成
  • 此方法不会阻塞调用线程的执行

    创建多少线程池合适

    注意点
    a. 过小会导致程序不能充分的利用系统资源,容易导致饥饿
    b. 过大会导致更多的线程上下文切换,占用更多内存

    CPU密集型运算服务器

    通常采用 CPU核数+1 能够实现最优的CPU利用率,+1是保证当线程由于页缺失故障(操作系统)或者其他原因导致暂停时,额外的整个线程就能顶上去,保证CPU时钟周期不被浪费。

    I/O密集型运算服务器

    CPU不总是处于繁忙状态,例如,当你执行业务计算时,这时候会使用CPU资源,但当执行IO操作时、远程RPC调用时,包括进行数据库操作时,这时候CPU就闲下来了,你可以利用多线程提高它的利用率。
    经营公式如下:
    线程数= 核数 * 期望CPU利用率 * 总时间(CPU计算时间+等待时间)/ CPU计算时间

    任务调度线程池

    在【任务调度线程池】功能加入之前,可以使用 java.util.Timer 来实现定时功能,Timer 的优点在于简单易用,但由于所有任务都是由同一个线程来调度,因此所有任务都是串行执行的,同一时间只能有一个任务在执行,前一个任务的延迟或者异常都讲会影响之后的任务。

    Timer 简单示例

    ```java package top.simba1949;

import java.util.Timer; import java.util.TimerTask;

/**

  • @author Anthony */ public class Application {

    public static void main(String[] args) {

    1. Timer timer = new Timer();
    2. TimerTask timerTask = new TimerTask() {
    3. @Override
    4. public void run() {
    5. // 处理调度逻辑
    6. System.out.println(System.currentTimeMillis());
    7. }
    8. };
    9. // 延迟1000毫秒后执行
    10. System.out.println(System.currentTimeMillis());
    11. timer.schedule(timerTask, 1000);

    } } ```

    ScheduledExecutorService 简单示例

    延迟执行

    ```java package top.simba1949;

import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit;

/**

  • @author Anthony */ public class Application {

    public static void main(String[] args) {

    1. ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);
    2. System.out.println(System.currentTimeMillis());
    3. scheduledExecutorService.schedule(() -> {
    4. // 任务调度业务逻辑
    5. System.out.println(System.currentTimeMillis());
    6. }, 1, TimeUnit.SECONDS); // 时间延迟单位和延迟时间数

    } } ```

    定时执行

    ```java package top.simba1949;

import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit;

/**

  • @author Anthony */ public class Application {

    public static void main(String[] args) {

    1. ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);
    2. System.out.println(System.currentTimeMillis());
    3. // 定时执行任务
    4. scheduledExecutorService.scheduleAtFixedRate(() -> {
    5. // 任务调度处理逻辑
    6. System.out.println(System.currentTimeMillis());
    7. }, 1, 2, TimeUnit.SECONDS); // 1表示延迟时间数,2表示每间隔时间数,时间单位

    } } ```

    Fork/Join

    概念

    Fork/Join 是 JDK1.7 加入的新的线程池实现,它体现的是以种分治四项,适用于能够进行任务拆分的 CPU 密集型运算。
    所谓的任务拆分,是将一个大任务拆分为算法相同的小任务,直至不能拆分可以直接求解。
    Fork/Join 在分治的基础上加入了多线程,可以把每个任务的分解和合并交给不同的线程来完成,进一步提升了运算效率。
    Fork/Join 默认会创建与 CPU 核心数大小相同的线程池。

    使用

    提交给 Fork/Join 线程池的任务需要继承 RecursiveTask(有返回值)或者 RecursiveAction(无返回值)

    demo

    ```java package top.simba1949;

import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveTask;

/**

  • @author Anthony
  • @date 2020/11/1 9:00 */ public class TaskApp1 { public static void main(String[] args) {
    1. // 创建Fork/Join线程池
    2. ForkJoinPool forkJoinPool = new ForkJoinPool();
    3. // 打印最终结果
    4. System.out.println(forkJoinPool.invoke(new MyTask(5)));
    } }

/**

  • 计算 1-n 之间的整数之和 */ class MyTask extends RecursiveTask{

    private int val;

    public MyTask(int val) {

    1. this.val = val;

    }

    /**

    • 做任务拆分
    • @return */ @Override protected Integer compute() { if (val == 1){

      1. return val;

      } MyTask t1 = new MyTask(val - 1); t1.fork(); // 让一个线程去执行任务

      return val + t1.join(); } } ```