1、概述
使用线程池,是Java中创建多线程的一种方式,在实际工作中,我们通常都是使用线程池来使用多线程。线程池控制运行的线程数量,处理过程中,将任务放入队列,然后在线程创建后启动这些任务,如果线程量超过了最大数量,超出数量的线程排队等候,等其他线程执行完毕,再从队列中取出任务来执行。
线程池涉及到如下类或接口:
1.1 Executor
Execuotr是JDK1.5中,随着JUC引入的一个接口,该接口的主要目的是解耦任务本身和任务的执行,其定义如下:
public interface Executor {
/**
* Executes the given command at some time in the future. The command
* may execute in a new thread, in a pooled thread, or in the calling
* thread, at the discretion of the {@code Executor} implementation.
在将来的某个时间执行给定的命令,根据Executor实现的判断,改命令就可以在新线程、
线程池或调用线程中执行
*
* 执行给定的Runnable任务.
* 根据Executor的实现不同, 具体执行方式也不相同
*
* @param command the runnable task
* @throws RejectedExecutionException if this task cannot be
* accepted for execution
* @throws NullPointerException if command is null
*/
void execute(Runnable command);
}
1.2 ExecutorService
ExecutorService也是一个接口,它继承了Executor接口,在Executor的基础上,增加了一些功能,比如增强了对任务的控制,同时包括对自身(这个自身,就是线程池自身)生命周期的管理:
- 关闭执行器,禁止任务的提交;
- 监视执行器的状态;
- 提供对异步任务的支持;
- 提供对批处理任务的支持;
其定义如下:
public interface ExecutorService extends Executor {
/**
* 关闭执行器, 主要有以下特点:
* 1. 已经提交给该执行器的任务将会继续执行, 但是不再接受新任务的提交;
* 2. 如果执行器已经关闭了, 则再次调用没有副作用.
*/
void shutdown();
/**
* 立即关闭执行器, 主要有以下特点:
* 1. 尝试停止所有正在执行的任务, 无法保证能够停止成功, 但会尽力尝试(例如, 通过 Thread.interrupt中断任务, 但是不响应中断的任务可能无法终止);
* 2. 暂停处理已经提交但未执行的任务;
*
* @return 返回已经提交但未执行的任务列表
*/
List<Runnable> shutdownNow();
/**
* 如果该执行器已经关闭, 则返回true.
*/
boolean isShutdown();
/**
* 判断执行器是否已经【终止】.
* <p>
* 仅当执行器已关闭且所有任务都已经执行完成, 才返回true.
* 注意: 除非首先调用 shutdown 或 shutdownNow, 否则该方法永远返回false.
*/
boolean isTerminated();
/**
* 阻塞调用线程, 等待执行器到达【终止】状态.
*
* @return {@code true} 如果执行器最终到达终止状态, 则返回true; 否则返回false
* @throws InterruptedException if interrupted while waiting
*/
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
/**
* 提交一个具有返回值的任务用于执行.
* 注意: Future的get方法在成功完成时将会返回task的返回值.
*
* @param task 待提交的任务
* @param <T> 任务的返回值类型
* @return 返回该任务的Future对象
* @throws RejectedExecutionException 如果任务无法安排执行
* @throws NullPointerException if the task is null
*/
<T> Future<T> submit(Callable<T> task);
/**
* 提交一个 Runnable 任务用于执行.
* 注意: Future的get方法在成功完成时将会返回给定的结果(入参时指定).
*
* @param task 待提交的任务
* @param result 返回的结果
* @param <T> 返回的结果类型
* @return 返回该任务的Future对象
* @throws RejectedExecutionException 如果任务无法安排执行
* @throws NullPointerException if the task is null
*/
<T> Future<T> submit(Runnable task, T result);
/**
* 提交一个 Runnable 任务用于执行.
* 注意: Future的get方法在成功完成时将会返回null.
*
* @param task 待提交的任务
* @return 返回该任务的Future对象
* @throws RejectedExecutionException 如果任务无法安排执行
* @throws NullPointerException if the task is null
*/
Future<?> submit(Runnable task);
/**
* 执行给定集合中的所有任务, 当所有任务都执行完成后, 返回保持任务状态和结果的 Future 列表.
* <p>
* 注意: 该方法为同步方法. 返回列表中的所有元素的Future.isDone() 为 true.
*
* @param tasks 任务集合
* @param <T> 任务的返回结果类型
* @return 任务的Future对象列表,列表顺序与集合中的迭代器所生成的顺序相同,
* @throws InterruptedException 如果等待时发生中断, 会将所有未完成的任务取消.
* @throws NullPointerException 任一任务为 null
* @throws RejectedExecutionException 如果任一任务无法安排执行
*/
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
/**
* 执行给定集合中的所有任务, 当所有任务都执行完成后或超时期满时(无论哪个首先发生), 返回保持任务状态和结果的 Future 列表.
*/
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;
/**
* 执行给定集合中的任务, 只有其中某个任务率先成功完成(未抛出异常), 则返回其结果.
* 一旦正常或异常返回后, 则取消尚未完成的任务.
*/
<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
/**
* 执行给定集合中的任务, 如果在给定的超时期满前, 某个任务已成功完成(未抛出异常), 则返回其结果.
* 一旦正常或异常返回后, 则取消尚未完成的任务.
*/
<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
1.3 AbstractExecuotrService抽象类
AbstractExecuotrService是一个抽象类,它提供了对ExecutorService的默认实现。
1.4 ThreadPoolExecuotr类
ThreadPoolExecuotr继承了AbstractExecutorService,是最常用的线程池,我们正常使用线程池时,也是通过调用ThreadPoolExecuotr的构造方法,传入参数,来创建线程池。
这里传入的参数就是线程池的七大参数,后面具体描述。
比如:
package com.atguigu.gulimall.product.config;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 本类说明:
* 线程池配置类
* @author yuanhai
* @date 2022年04月14日
*/
// 如果没有把ThreadPoolConfigProperties通过@Component注解加到容器中,就要通过@EnableConfigurationProperties注解开启这个类的配置
//@EnableConfigurationProperties(ThreadPoolConfigProperties.class)
@Configuration
public class MyThreadConfig {
@Bean
public ThreadPoolExecutor threadPoolExecutor (ThreadPoolConfigProperties pool) {
return new ThreadPoolExecutor(pool.getCoreSize(),
pool.getMaxSize(),
pool.getKeepAliveTime(),
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(100000),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
}
}
或者这样:
public class MyThreadPoolDemo {
public static void main(String[] args) {
ExecutorService threadPool =
new ThreadPoolExecutor(2,
5,
2L,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
}
1.5 ScheduledExecutorService
此接口继承于ExecutorService,提供了一系列schedule方法,可以在给定的延迟后执行提交的任务,或者每个指定的周期执行一次提交的任务。
定义如下:
public interface ScheduledExecutorService extends ExecutorService {
/**
* 提交一个待执行的任务, 并在给定的延迟后执行该任务.
*
* @param command 待执行的任务
* @param delay 延迟时间
* @param unit 延迟时间的单位
*/
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
/**
* 提交一个待执行的任务(具有返回值), 并在给定的延迟后执行该任务.
*
* @param command 待执行的任务
* @param delay 延迟时间
* @param unit 延迟时间的单位
* @param <V> 返回值类型
*/
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
/**
* 提交一个待执行的任务.
* 该任务在 initialDelay 后开始执行, 然后在 initialDelay+period 后执行, 接着在 initialDelay + 2 * period 后执行, 依此类推.
*
* @param command 待执行的任务
* @param initialDelay 首次执行的延迟时间
* @param period 连续执行之间的周期
* @param unit 延迟时间的单位
*/
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
/**
* 提交一个待执行的任务.
* 该任务在 initialDelay 后开始执行, 随后在每一次执行终止和下一次执行开始之间都存在给定的延迟.
* 如果任务的任一执行遇到异常, 就会取消后续执行. 否则, 只能通过执行程序的取消或终止方法来终止该任务.
*
* @param command 待执行的任务
* @param initialDelay 首次执行的延迟时间
* @param delay 一次执行终止和下一次执行开始之间的延迟
* @param unit 延迟时间的单位
*/
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
}
2、线程池的特点和优势
线程池的特点:
- 线程复用;
- 控制最大并发数;
- 管理线程;
线程池的优势:
- 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
- 提高响应速度。当任务到达时,任务可以不需要等待线程创建就能立即执行。
- 提高线程的可管理性。线程时稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。
3 如何使创建线程池
创建线程池,主要有两种方式:
- 使用jdk提供的Executors工具类;
- 使用构造方法,直接new ThreadPoolExecutor(),传入线程池参数;
实际工作中,建议自己使用构造方法,传参去自己创建线程池,因为使用Executors工具类创建出的线程池有这样那样的缺陷。
3.1 Executors工具类
Executors可以有选择的按照下面方式创建出不同类型的线程池:
// 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待
ExecutorService threadPool1 = Executors.newFixedThreadPool(5); // 一池五个受理线程
// 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序执行
ExecutorService threadPool2 = Executors.newSingleThreadExecutor(); // 一池只有一个受理线程
// 创建一个可缓存的线程池,如果线程池长度超过需要处理的任务,可灵活回收空闲;若无可回收,则新建线程
ExecutorService threadPool3 = Executors.newCachedThreadPool(); // 一池n个线程,可扩容、伸缩
// 创建一个可定期或者延时执行任务的定长线程池,支持定时及周期性任务执行
ExecutorService threadPool4 = Executors.newScheduledThreadPool(5);
这里面,他们都是有各自的缺陷的,实际工作中,我们创建线程池,会使用第二种方式,下面讲到。Executors工具类创建的线程池有什么缺陷的呢(缺陷涉及到线程池中的参数,线程池的参数在下面篇幅中叙述)?
- 对于FixedThreadPool和SingleThreadPool:
它们允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM;
- 对于CacheThreadPool和ScheduleThreadPool:
它们允许的创建线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM;
-> OOM:即OutOfMemory,内存溢出错误。
3.2 new ThreadPoolExecutor()传入参数
ExecutorService threadPool =
new ThreadPoolExecutor(2,
5,
2L,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
其中传入的参数后面描述。
4、线程池的七大参数
先来看下创建线程池的构造方法和一个例子:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler){
...
}
public class MyThreadPoolDemo {
public static void main(String[] args) {
ExecutorService threadPool =
new ThreadPoolExecutor(2,
5,
2L,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
}
}
这里面就涉及到线程池的七大参数:
- int corePoolSize:核心线程数;线程池中的常驻核心线程数。
- int maximunPoolSize:最大线程数;线程池中能够容纳同时执行的最大线程数,此值必须大于等于1。
- long keepAliveTime:多余的空闲线程的存活时间;当前线程池中线程数超过corePoolSize时,当空闲时间达到keepAliveTime时,多余的线程会被销毁,直到只剩下corePoolSize个线程位置。
- TimeUnit unit:keepAliveTime的时间单位。
- BlockingQUeue
workQueue:阻塞队列(任务队列);里面是被提交但是尚未被执行的任务。阻塞队列用来存储等待执行的任务,如果当前对线程的需求超过了corePoolSize大小,就会放在这里等待空闲线程执行。 - ThreadFactory threadFactory:线程的创建工厂;用来创建线程,一般使用默认的即可。
- RejectedExecutHandler handler:拒绝策略;当队列满了,并且工作线程大于等于线程池的最大线程数(maximunPoolSize)时,线程池就会使用决绝策略,按照指定的拒绝策略拒绝执行任务。
其中,一些阻塞队列:
拒绝策略:
- AbortPolicy:默认的拒绝策略,直接抛出RejectedExecutionException异常组织系统正常运行。
- CallerRunsPolicy:调用者运行;一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量。
- DiscardPolicy:该策略默默的丢弃无法处理的任务,不予任何处理,也不抛出异常。如果任务允许丢失,这是最好的一种策略。
- DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加入到队列中,尝试再次提交当前任务。
5、线程池底层工作原理(工作流程)
- 线程池创建后,准备好核心线程数量的核心线程,准备接受任务;
- 新的任务进来(调用executor()方法添加一个请求任务时),用核心线程来执行任务;
- 如果核心线程满了,就将再进来的任务放入阻塞队列中;
- 如果核心线程有空闲考虑,就会自己去阻塞队列获取任务执行;
- 阻塞队列如果满了,就直接开新线程执行,但是新县城最大只能开到最大线程数量maximumPoolSize指定的熟练;
- 如果最大线程数量满了,就用RejectedExecutionHandler拒绝任务;
- 如果最大线程数量没满,都执行完成,有很多空闲了,则在指定的存活时间以后,释放maximumPoolSiize-corePoolSize这些线程。
一些细节:
- 阻塞队列我们可以选用new LinkedBlockingDeque();不过它默认保存的任务数量是Integer的最大值,这可能会导致内存不够,所以我们传入我们业务定制的数量,比如压力测试的结果,比如:
new LikedBlockingDeque(1000);
- 线程工厂可以使用Executors.defaultThreadFactory();
- 拒绝策略可以使用new ThreadPoolExecutor.AbortPolicy();
6、线程池提交任务(执行线程)的方式
线程池提交任务有两个方法:execute()和submit();
execute(Runnable command);
submit(Runable task);
submit(Callable
例如:
public static ExecutorService service = Executors.newFixedThreadPool(10);
// execute()没有返回值;
service.execute(new Runnable01());
// submit()可以获取返回值;
Future<Integer> integerFuture = service.submit(new Callable01());
execute()和submit()的区别:
- execute()方法没有返回值,用于提交不需要返回值的任务,无法判断任务是否被线程池执行成功。
- submit()有返回值,用于提交需要返回值的任务。线程池会返回一个Future类型的对象,可以通过Future对象判断任务是否执行成功,并且可以通过Future的get()来获取返回值。get()方法会阻塞当前线程直到任务完成。而get(long timeout,TimeUnit unit)方法会阻塞当前线程一段时间后立即返回,这时候可能任务没有执行完成。
7、生产中如何使用线程池
线程池用哪个?生产中如何设置合理的参数(如何自定义线程池)?
Executors工具类创建的,我们工作中,都不使用,只使用自己自定义的;
因为:FixedThreadPool和SingleThreadPool:
它们允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM;
CachedThreadPool和ScheduledThreadPool:
它们允许的创建线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM;
-> OOM,即OutOfMemoryError,内存溢出错误
生产中如何设置合理的参数:
最大线程数的配置:
如果业务是CPU密集型:
Runtime.getRuntime().availableProcessors(); 此方法用来获取可用的计算资源,也可简单理解为获取电脑cpu的逻辑处理器个
数(即内核数);设置线程池最大线程数maximumPoolSize时,就用这个方法动态获取逻辑处理器个数,然后在获取到的值的基础上+1或+2;
如果业务是IO密集型:
则设置为 Runtime.getRuntime().availableProcessors() / 阻塞系数
* 其他的百度吧,视频里没讲;