1、概述

使用线程池,是Java中创建多线程的一种方式,在实际工作中,我们通常都是使用线程池来使用多线程。线程池控制运行的线程数量,处理过程中,将任务放入队列,然后在线程创建后启动这些任务,如果线程量超过了最大数量,超出数量的线程排队等候,等其他线程执行完毕,再从队列中取出任务来执行。
线程池涉及到如下类或接口:
image.png

1.1 Executor

Execuotr是JDK1.5中,随着JUC引入的一个接口,该接口的主要目的是解耦任务本身和任务的执行,其定义如下:

  1. public interface Executor {
  2. /**
  3. * Executes the given command at some time in the future. The command
  4. * may execute in a new thread, in a pooled thread, or in the calling
  5. * thread, at the discretion of the {@code Executor} implementation.
  6. 在将来的某个时间执行给定的命令,根据Executor实现的判断,改命令就可以在新线程、
  7. 线程池或调用线程中执行
  8. *
  9. * 执行给定的Runnable任务.
  10. * 根据Executor的实现不同, 具体执行方式也不相同
  11. *
  12. * @param command the runnable task
  13. * @throws RejectedExecutionException if this task cannot be
  14. * accepted for execution
  15. * @throws NullPointerException if command is null
  16. */
  17. void execute(Runnable command);
  18. }

1.2 ExecutorService

ExecutorService也是一个接口,它继承了Executor接口,在Executor的基础上,增加了一些功能,比如增强了对任务的控制,同时包括对自身(这个自身,就是线程池自身)生命周期的管理:

  • 关闭执行器,禁止任务的提交;
  • 监视执行器的状态;
  • 提供对异步任务的支持;
  • 提供对批处理任务的支持;

其定义如下:

  1. public interface ExecutorService extends Executor {
  2. /**
  3. * 关闭执行器, 主要有以下特点:
  4. * 1. 已经提交给该执行器的任务将会继续执行, 但是不再接受新任务的提交;
  5. * 2. 如果执行器已经关闭了, 则再次调用没有副作用.
  6. */
  7. void shutdown();
  8. /**
  9. * 立即关闭执行器, 主要有以下特点:
  10. * 1. 尝试停止所有正在执行的任务, 无法保证能够停止成功, 但会尽力尝试(例如, 通过 Thread.interrupt中断任务, 但是不响应中断的任务可能无法终止);
  11. * 2. 暂停处理已经提交但未执行的任务;
  12. *
  13. * @return 返回已经提交但未执行的任务列表
  14. */
  15. List<Runnable> shutdownNow();
  16. /**
  17. * 如果该执行器已经关闭, 则返回true.
  18. */
  19. boolean isShutdown();
  20. /**
  21. * 判断执行器是否已经【终止】.
  22. * <p>
  23. * 仅当执行器已关闭且所有任务都已经执行完成, 才返回true.
  24. * 注意: 除非首先调用 shutdown 或 shutdownNow, 否则该方法永远返回false.
  25. */
  26. boolean isTerminated();
  27. /**
  28. * 阻塞调用线程, 等待执行器到达【终止】状态.
  29. *
  30. * @return {@code true} 如果执行器最终到达终止状态, 则返回true; 否则返回false
  31. * @throws InterruptedException if interrupted while waiting
  32. */
  33. boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
  34. /**
  35. * 提交一个具有返回值的任务用于执行.
  36. * 注意: Future的get方法在成功完成时将会返回task的返回值.
  37. *
  38. * @param task 待提交的任务
  39. * @param <T> 任务的返回值类型
  40. * @return 返回该任务的Future对象
  41. * @throws RejectedExecutionException 如果任务无法安排执行
  42. * @throws NullPointerException if the task is null
  43. */
  44. <T> Future<T> submit(Callable<T> task);
  45. /**
  46. * 提交一个 Runnable 任务用于执行.
  47. * 注意: Future的get方法在成功完成时将会返回给定的结果(入参时指定).
  48. *
  49. * @param task 待提交的任务
  50. * @param result 返回的结果
  51. * @param <T> 返回的结果类型
  52. * @return 返回该任务的Future对象
  53. * @throws RejectedExecutionException 如果任务无法安排执行
  54. * @throws NullPointerException if the task is null
  55. */
  56. <T> Future<T> submit(Runnable task, T result);
  57. /**
  58. * 提交一个 Runnable 任务用于执行.
  59. * 注意: Future的get方法在成功完成时将会返回null.
  60. *
  61. * @param task 待提交的任务
  62. * @return 返回该任务的Future对象
  63. * @throws RejectedExecutionException 如果任务无法安排执行
  64. * @throws NullPointerException if the task is null
  65. */
  66. Future<?> submit(Runnable task);
  67. /**
  68. * 执行给定集合中的所有任务, 当所有任务都执行完成后, 返回保持任务状态和结果的 Future 列表.
  69. * <p>
  70. * 注意: 该方法为同步方法. 返回列表中的所有元素的Future.isDone() 为 true.
  71. *
  72. * @param tasks 任务集合
  73. * @param <T> 任务的返回结果类型
  74. * @return 任务的Future对象列表,列表顺序与集合中的迭代器所生成的顺序相同,
  75. * @throws InterruptedException 如果等待时发生中断, 会将所有未完成的任务取消.
  76. * @throws NullPointerException 任一任务为 null
  77. * @throws RejectedExecutionException 如果任一任务无法安排执行
  78. */
  79. <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
  80. /**
  81. * 执行给定集合中的所有任务, 当所有任务都执行完成后或超时期满时(无论哪个首先发生), 返回保持任务状态和结果的 Future 列表.
  82. */
  83. <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;
  84. /**
  85. * 执行给定集合中的任务, 只有其中某个任务率先成功完成(未抛出异常), 则返回其结果.
  86. * 一旦正常或异常返回后, 则取消尚未完成的任务.
  87. */
  88. <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
  89. /**
  90. * 执行给定集合中的任务, 如果在给定的超时期满前, 某个任务已成功完成(未抛出异常), 则返回其结果.
  91. * 一旦正常或异常返回后, 则取消尚未完成的任务.
  92. */
  93. <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
  94. throws InterruptedException, ExecutionException, TimeoutException;
  95. }

1.3 AbstractExecuotrService抽象类

AbstractExecuotrService是一个抽象类,它提供了对ExecutorService的默认实现。

1.4 ThreadPoolExecuotr类

ThreadPoolExecuotr继承了AbstractExecutorService,是最常用的线程池,我们正常使用线程池时,也是通过调用ThreadPoolExecuotr的构造方法,传入参数,来创建线程池。
这里传入的参数就是线程池的七大参数,后面具体描述。

比如:

package com.atguigu.gulimall.product.config;

import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * 本类说明:
 * 线程池配置类
 * @author yuanhai
 * @date 2022年04月14日
 */
// 如果没有把ThreadPoolConfigProperties通过@Component注解加到容器中,就要通过@EnableConfigurationProperties注解开启这个类的配置
//@EnableConfigurationProperties(ThreadPoolConfigProperties.class)
@Configuration
public class MyThreadConfig {

    @Bean
    public ThreadPoolExecutor threadPoolExecutor (ThreadPoolConfigProperties pool) {
        return new ThreadPoolExecutor(pool.getCoreSize(),
                pool.getMaxSize(),
                pool.getKeepAliveTime(),
                TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(100000),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy());
    }

}

或者这样:

public class MyThreadPoolDemo {

    public static void main(String[] args) {

        ExecutorService threadPool =
                new ThreadPoolExecutor(2,
                            5,
                            2L,
                            TimeUnit.SECONDS,
                            new LinkedBlockingDeque<>(3),
                            Executors.defaultThreadFactory(),
                            new ThreadPoolExecutor.AbortPolicy());
    }

1.5 ScheduledExecutorService

此接口继承于ExecutorService,提供了一系列schedule方法,可以在给定的延迟后执行提交的任务,或者每个指定的周期执行一次提交的任务。
定义如下:

public interface ScheduledExecutorService extends ExecutorService {

    /**
     * 提交一个待执行的任务, 并在给定的延迟后执行该任务.
     *
     * @param command 待执行的任务
     * @param delay   延迟时间
     * @param unit    延迟时间的单位
     */
    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);

    /**
     * 提交一个待执行的任务(具有返回值), 并在给定的延迟后执行该任务.
     *
     * @param command 待执行的任务
     * @param delay   延迟时间
     * @param unit    延迟时间的单位
     * @param <V>     返回值类型
     */
    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);

    /**
     * 提交一个待执行的任务.
     * 该任务在 initialDelay 后开始执行, 然后在 initialDelay+period 后执行, 接着在 initialDelay + 2 * period 后执行, 依此类推.
     *
     * @param command      待执行的任务
     * @param initialDelay 首次执行的延迟时间
     * @param period       连续执行之间的周期
     * @param unit         延迟时间的单位
     */
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);

    /**
     * 提交一个待执行的任务.
     * 该任务在 initialDelay 后开始执行, 随后在每一次执行终止和下一次执行开始之间都存在给定的延迟.
     * 如果任务的任一执行遇到异常, 就会取消后续执行. 否则, 只能通过执行程序的取消或终止方法来终止该任务.
     *
     * @param command      待执行的任务
     * @param initialDelay 首次执行的延迟时间
     * @param delay        一次执行终止和下一次执行开始之间的延迟
     * @param unit         延迟时间的单位
     */
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
}

2、线程池的特点和优势

线程池的特点:

  • 线程复用;
  • 控制最大并发数;
  • 管理线程;

线程池的优势:

  • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  • 提高响应速度。当任务到达时,任务可以不需要等待线程创建就能立即执行。
  • 提高线程的可管理性。线程时稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。

3 如何使创建线程池

创建线程池,主要有两种方式:

  • 使用jdk提供的Executors工具类;
  • 使用构造方法,直接new ThreadPoolExecutor(),传入线程池参数;

实际工作中,建议自己使用构造方法,传参去自己创建线程池,因为使用Executors工具类创建出的线程池有这样那样的缺陷。

3.1 Executors工具类

Executors可以有选择的按照下面方式创建出不同类型的线程池:

// 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待
ExecutorService threadPool1 = Executors.newFixedThreadPool(5);  // 一池五个受理线程

// 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序执行
ExecutorService threadPool2 = Executors.newSingleThreadExecutor();  // 一池只有一个受理线程

// 创建一个可缓存的线程池,如果线程池长度超过需要处理的任务,可灵活回收空闲;若无可回收,则新建线程
ExecutorService threadPool3 = Executors.newCachedThreadPool();  // 一池n个线程,可扩容、伸缩

// 创建一个可定期或者延时执行任务的定长线程池,支持定时及周期性任务执行
ExecutorService threadPool4 = Executors.newScheduledThreadPool(5);

这里面,他们都是有各自的缺陷的,实际工作中,我们创建线程池,会使用第二种方式,下面讲到。Executors工具类创建的线程池有什么缺陷的呢(缺陷涉及到线程池中的参数,线程池的参数在下面篇幅中叙述)?

  • 对于FixedThreadPool和SingleThreadPool:

它们允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM;

  • 对于CacheThreadPool和ScheduleThreadPool:

它们允许的创建线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM;
-> OOM:即OutOfMemory,内存溢出错误。

3.2 new ThreadPoolExecutor()传入参数

    ExecutorService threadPool =
                new ThreadPoolExecutor(2,
                            5,
                            2L,
                            TimeUnit.SECONDS,
                            new LinkedBlockingDeque<>(3),
                            Executors.defaultThreadFactory(),
                            new ThreadPoolExecutor.AbortPolicy());

其中传入的参数后面描述。

4、线程池的七大参数

先来看下创建线程池的构造方法和一个例子:

      public ThreadPoolExecutor(int corePoolSize,
                                int maximumPoolSize,
                                long keepAliveTime,
                                TimeUnit unit,
                                BlockingQueue<Runnable> workQueue,
                                ThreadFactory threadFactory,
                                RejectedExecutionHandler handler){
                                   ...
                                }
public class MyThreadPoolDemo {

    public static void main(String[] args) {

        ExecutorService threadPool =
                new ThreadPoolExecutor(2,
                            5,
                            2L,
                            TimeUnit.SECONDS,
                            new LinkedBlockingDeque<>(3),
                            Executors.defaultThreadFactory(),
                            new ThreadPoolExecutor.AbortPolicy());

    }
}

这里面就涉及到线程池的七大参数:

  • int corePoolSize:核心线程数;线程池中的常驻核心线程数。
  • int maximunPoolSize:最大线程数;线程池中能够容纳同时执行的最大线程数,此值必须大于等于1。
  • long keepAliveTime:多余的空闲线程的存活时间;当前线程池中线程数超过corePoolSize时,当空闲时间达到keepAliveTime时,多余的线程会被销毁,直到只剩下corePoolSize个线程位置。
  • TimeUnit unit:keepAliveTime的时间单位。
  • BlockingQUeue workQueue:阻塞队列(任务队列);里面是被提交但是尚未被执行的任务。阻塞队列用来存储等待执行的任务,如果当前对线程的需求超过了corePoolSize大小,就会放在这里等待空闲线程执行。
  • ThreadFactory threadFactory:线程的创建工厂;用来创建线程,一般使用默认的即可。
  • RejectedExecutHandler handler:拒绝策略;当队列满了,并且工作线程大于等于线程池的最大线程数(maximunPoolSize)时,线程池就会使用决绝策略,按照指定的拒绝策略拒绝执行任务。

其中,一些阻塞队列:

拒绝策略:

  • AbortPolicy:默认的拒绝策略,直接抛出RejectedExecutionException异常组织系统正常运行。
  • CallerRunsPolicy:调用者运行;一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量。
  • DiscardPolicy:该策略默默的丢弃无法处理的任务,不予任何处理,也不抛出异常。如果任务允许丢失,这是最好的一种策略。
  • DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加入到队列中,尝试再次提交当前任务。

5、线程池底层工作原理(工作流程)

  1. 线程池创建后,准备好核心线程数量的核心线程,准备接受任务;
  2. 新的任务进来(调用executor()方法添加一个请求任务时),用核心线程来执行任务;
    1. 如果核心线程满了,就将再进来的任务放入阻塞队列中;
    2. 如果核心线程有空闲考虑,就会自己去阻塞队列获取任务执行;
    3. 阻塞队列如果满了,就直接开新线程执行,但是新县城最大只能开到最大线程数量maximumPoolSize指定的熟练;
    4. 如果最大线程数量满了,就用RejectedExecutionHandler拒绝任务;
    5. 如果最大线程数量没满,都执行完成,有很多空闲了,则在指定的存活时间以后,释放maximumPoolSiize-corePoolSize这些线程。

一些细节:

  • 阻塞队列我们可以选用new LinkedBlockingDeque();不过它默认保存的任务数量是Integer的最大值,这可能会导致内存不够,所以我们传入我们业务定制的数量,比如压力测试的结果,比如:

new LikedBlockingDeque(1000);

  • 线程工厂可以使用Executors.defaultThreadFactory();
  • 拒绝策略可以使用new ThreadPoolExecutor.AbortPolicy();

6、线程池提交任务(执行线程)的方式

线程池提交任务有两个方法:execute()和submit();
execute(Runnable command);
submit(Runable task);
submit(Callable task);
例如:

public static ExecutorService service = Executors.newFixedThreadPool(10);
// execute()没有返回值;
service.execute(new Runnable01());  
// submit()可以获取返回值;
Future<Integer> integerFuture = service.submit(new Callable01());

execute()和submit()的区别:

  • execute()方法没有返回值,用于提交不需要返回值的任务,无法判断任务是否被线程池执行成功。
  • submit()有返回值,用于提交需要返回值的任务。线程池会返回一个Future类型的对象,可以通过Future对象判断任务是否执行成功,并且可以通过Future的get()来获取返回值。get()方法会阻塞当前线程直到任务完成。而get(long timeout,TimeUnit unit)方法会阻塞当前线程一段时间后立即返回,这时候可能任务没有执行完成。

7、生产中如何使用线程池

线程池用哪个?生产中如何设置合理的参数(如何自定义线程池)?
Executors工具类创建的,我们工作中,都不使用,只使用自己自定义的;
因为:FixedThreadPool和SingleThreadPool:
它们允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM;
CachedThreadPool和ScheduledThreadPool:
它们允许的创建线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM;
-> OOM,即OutOfMemoryError,内存溢出错误


生产中如何设置合理的参数:
最大线程数的配置:
如果业务是CPU密集型:
Runtime.getRuntime().availableProcessors(); 此方法用来获取可用的计算资源,也可简单理解为获取电脑cpu的逻辑处理器个
数(即内核数);设置线程池最大线程数maximumPoolSize时,就用这个方法动态获取逻辑处理器个数,然后在获取到的值的基础上+1或+2;
如果业务是IO密集型:
则设置为 Runtime.getRuntime().availableProcessors() / 阻塞系数
* 其他的百度吧,视频里没讲;