1. 线程池基础介绍
1.1 什么是”池”?
1.2 线程池适合应用的场合
- 服务器接收到大量的请求时,使用线程池技术是非常合适的,它可以大大减少线程的创建和销毁次数,提高服务器的工作效率.
- 实际上,开发中,如果需要创建5个线程以上,那么就可以使用线程池来管理了.
2. 参数说明
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)
| 参数 | 说明 |
|---|---|
| corePoolSize | 核心线程数量,线程池维护线程的最少数量 |
| maximumPoolSize | 线程池维护线程的最大数量 |
| keepAliveTime | 线程池除核心线程外的其他线程的最长空闲时间,超过该时间的空闲线程会被销毁 |
| unit | keepAliveTime的单位,TimeUnit中的几个静态属性:NANOSECONDS、MICROSECONDS、MILLISECONDS、SECONDS |
| workQueue | 线程池所使用的任务缓冲队列 |
| threadFactory | 线程工厂,用于创建线程,一般用默认的即可 |
| handler | 线程池对拒绝任务的处理策略 |
2.1 corePoolSize
核心线程数;线程池在完成初始化后,默认情况下,线程池中并没有任何线程,线程池会等待有任务到来时,再创建新线程去执行任务。
2.2 maximumPoolSize
线程池有可能会在核心线程数的基础上,额外增加一些线程,但是这些新增加的线程数会有一个上限,这个最大值就是 maxinumPoolSize
2.3 keepAliveTime
如果线程池当前的线程数 大于 corePoolSize, 那么如果多余的线程空闲时间超过keepAliveTime,它们就会被终止。
2.3.1 allowCoreThreadTimeOut
这个设置就是说 如果设置为true ,核心线程数 corePoolSize 超过keepAliveTime 空闲也会被回收。
/*** If false (default), core threads stay alive even when idle.* If true, core threads use keepAliveTime to time out waiting* for work.*/private volatile boolean allowCoreThreadTimeOut;
2.4 unit
2.5 workQueue
2.5.1 三种常见的队列类型
2.5.1.1 SynchronousQueue 直接交接
2.5.1.2 LinkedBlockingQueue 无界队列
2.5.1.3 ArrayBlockingQueue 有界队列
2.6 threadFactory
- 新的线程是由 ThreadFactory创建的,默认使用Executors.defaultThreadFactory(),创建出来的线程:
【1】 在同一个线程组
【2】 拥有同样的NORM_PRIORITY优先级并且都不是守护线程。
【3】如果是自己指定的ThreadFactory ,那么就可以改变线程名,线程组,优先级,是否是守护线程等等。
2.7 handler 拒绝策略
2.7.1 AbortPolicy 中止策略
/*** A handler for rejected tasks that throws a* {@code RejectedExecutionException}.*/public static class AbortPolicy implements RejectedExecutionHandler {/*** Creates an {@code AbortPolicy}.*/public AbortPolicy() { }/*** Always throws RejectedExecutionException.** @param r the runnable task requested to be executed* @param e the executor attempting to execute this task* @throws RejectedExecutionException always*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {throw new RejectedExecutionException("Task " + r.toString() +" rejected from " +e.toString());}}
特性
A handler for rejected tasks that throws a RejectedExecutionException.只抛出异常,其余啥也不做. 抛弃异常
-
2.7.1.1 例子说明
public class RunnableStyle {static int corePoolSize = 1;static int maximumPoolSize = 2;static int keepAliveTime = 2;static AtomicInteger count=new AtomicInteger();public static void main(String[] args) {ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 2, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), new ThreadPoolExecutor.AbortPolicy());for (int i = 0; i < 100; i++) {executor.execute(new MyRunnable());}executor.shutdown();}static class MyRunnable implements Runnable {@Overridepublic void run() {try {System.out.println("线程:" + Thread.currentThread() + "开始执行任务ID:" + count.incrementAndGet());Thread.sleep(3000L);System.out.println("线程:" + Thread.currentThread() + "执行完了任务");} catch (InterruptedException e) {e.printStackTrace();}}}}
```java Exception in thread “main” java.util.concurrent.RejectedExecutionException 线程:Thread[pool-1-thread-2,5,main]开始执行任务ID:1 线程:Thread[pool-1-thread-1,5,main]开始执行任务ID:2 线程:Thread[pool-1-thread-1,5,main]执行完了任务 线程:Thread[pool-1-thread-2,5,main]执行完了任务 线程:Thread[pool-1-thread-1,5,main]开始执行任务ID:3 线程:Thread[pool-1-thread-2,5,main]开始执行任务ID:4 线程:Thread[pool-1-thread-1,5,main]执行完了任务 线程:Thread[pool-1-thread-2,5,main]执行完了任务
这个线程池 最大线程数量为2 ,队列长度为2 ,也就是最大能接受四个任务,其余任务直接抛弃,并抛出异常。
<a name="vxGcp"></a>### 2.7.2 DiscardPolicy 丢弃策略```java/*** A handler for rejected tasks that silently discards the* rejected task.*/public static class DiscardPolicy implements RejectedExecutionHandler {/*** Creates a {@code DiscardPolicy}.*/public DiscardPolicy() { }/*** Does nothing, which has the effect of discarding task r.** @param r the runnable task requested to be executed* @param e the executor attempting to execute this task*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}}
特性
拒绝任务的处理程序,以静默方式丢弃被拒绝的任务.
2.7.2.1 举例说明
public class RunnableStyle {static int corePoolSize = 1;static int maximumPoolSize = 2;static int keepAliveTime = 2;static AtomicInteger count=new AtomicInteger();public static void main(String[] args) {ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 2, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), new ThreadPoolExecutor.DiscardPolicy());for (int i = 0; i < 100; i++) {executor.execute(new MyRunnable());}executor.shutdown();}static class MyRunnable implements Runnable {@Overridepublic void run() {try {System.out.println("线程:" + Thread.currentThread() + "开始执行任务ID:" + count.incrementAndGet());Thread.sleep(3000L);System.out.println("线程:" + Thread.currentThread() + "执行完了任务");} catch (InterruptedException e) {e.printStackTrace();}}}}
```java 线程:Thread[pool-1-thread-2,5,main]开始执行任务ID:2 线程:Thread[pool-1-thread-1,5,main]开始执行任务ID:1 线程:Thread[pool-1-thread-2,5,main]执行完了任务 线程:Thread[pool-1-thread-1,5,main]执行完了任务 线程:Thread[pool-1-thread-2,5,main]开始执行任务ID:3 线程:Thread[pool-1-thread-1,5,main]开始执行任务ID:4 线程:Thread[pool-1-thread-2,5,main]执行完了任务 线程:Thread[pool-1-thread-1,5,main]执行完了任务
这个线程池 最大线程数量为2 ,队列长度为2 ,也就是最大能接受四个任务,其余任务直接抛弃,不抛出异常。
<a name="7pMoG"></a>### 2.7.3 CallerRunsPolicy<a name="a7tqq"></a>#### 2.7.3.1 拒绝策略代码```cpppublic void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {r.run();}}将使用当前线程 执行 该任务,一般为主线程,主线程阻塞,新增的任务 阻塞在 计算机内存中。
/*** A handler for rejected tasks that runs the rejected task* directly in the calling thread of the {@code execute} method,* unless the executor has been shut down, in which case the task* is discarded.*/public static class CallerRunsPolicy implements RejectedExecutionHandler {/*** Creates a {@code CallerRunsPolicy}.*/public CallerRunsPolicy() { }/*** Executes task r in the caller's thread, unless the executor* has been shut down, in which case the task is discarded.* 在调用者的线程中(可以理解为 主线程)执行任务,除非** @param r the runnable task requested to be executed* @param e the executor attempting to execute this task*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {r.run();}}}
2.7.3.1 举例说明
public class RunnableStyle {static int corePoolSize = 1;static int maximumPoolSize = 2;static int keepAliveTime = 2;static AtomicInteger count=new AtomicInteger();public static void main(String[] args) {ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 2, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), new ThreadPoolExecutor.CallerRunsPolicy());for (int i = 0; i < 100; i++) {executor.execute(new MyRunnable());}executor.shutdown();}static class MyRunnable implements Runnable {@Overridepublic void run() {try {System.out.println("线程:" + Thread.currentThread() + "开始执行任务ID:" + count.incrementAndGet());Thread.sleep(3000L);System.out.println("线程:" + Thread.currentThread() + "执行完了任务");} catch (InterruptedException e) {e.printStackTrace();}}}}
线程不够用时,主线程也会参与执行任务,所以导致的问题就是 阻塞主线程;
2.7.4 DiscardOldestPolicy
2.7.4.1 拒绝策略代码
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {e.getQueue().poll();e.execute(r);}}获取任务队列,将头部任务移除掉,将新任务加入队列中。
package com.zhangyong.multiThread.multiThread.createThreads;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;import java.util.concurrent.atomic.AtomicInteger;/*** <p>Description: </p>* <p>Company: http://www.dmall.com</p>** @author yong.zhang@dmall.com* @version 1.0.0* @date 2020/12/17 20:18*/public class RunnableStyle {static int corePoolSize = 1;static int maximumPoolSize = 3;static int keepAliveTime = 2;static AtomicInteger count=new AtomicInteger();public static void main(String[] args) {ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 2, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), new ThreadPoolExecutor.DiscardOldestPolicy());for (int i = 0; i < 100; i++) {executor.execute(new MyRunnable());}executor.shutdown();}static class MyRunnable implements Runnable {@Overridepublic void run() {try {System.out.println("线程:" + Thread.currentThread() + "开始执行任务ID:" + count.incrementAndGet());Thread.sleep(3000L);System.out.println("线程:" + Thread.currentThread() + "执行完了任务");} catch (InterruptedException e) {e.printStackTrace();}}}}
线程:Thread[pool-1-thread-1,5,main]开始执行任务ID:2线程:Thread[pool-1-thread-2,5,main]开始执行任务ID:1线程:Thread[pool-1-thread-3,5,main]开始执行任务ID:3线程:Thread[pool-1-thread-2,5,main]执行完了任务线程:Thread[pool-1-thread-1,5,main]执行完了任务线程:Thread[pool-1-thread-2,5,main]开始执行任务ID:4线程:Thread[pool-1-thread-1,5,main]开始执行任务ID:5线程:Thread[pool-1-thread-3,5,main]执行完了任务线程:Thread[pool-1-thread-2,5,main]执行完了任务线程:Thread[pool-1-thread-1,5,main]执行完了任务;新的任务加入到线程中后,如果没有被立刻执行的话,
3.线程池任务执行
3.1 添加线程规则

- 1. 如果线程数小于corePoolSize , 除非设置了allowCoreThreadTimeOut , 即使其他工作线程处于空闲状态,也会创建一个新线程来运行新任务。
- 2. 如果线程数>= corePoolSize 但少于 maximumPoolSize ,则将任务放入队列。
- 3. 如果队列已满,并且线程数小于maxPoolSize, 则创建一个新线程来运行任务。
[x] 4. 如果队列已满,并且线程数大于maxPoolSize,则使用设置的拒绝策略拒绝该任务。
3.2 增减线程的特点
[x] 1. 通过设置corePoolSize 和 maximumPoolSize 相同,就可以创建固定大小的线程池。
- 2. 线程池希望的是 永远保持较少的线程数,并且只有在负载变得很大的时候才去增大它。
- 3. 通过设置maximumPoolSize 为很高的值,例如Integer.MAX_VALUE , 可以允许线程池容纳任意数量的并发任务。但是实际上硬件上的限制不会出现这种情况
- 4. 只有在队列填满时才创建多余corePoolSize 的线程,所以如果你使用的是无界队列(LinkedBlockingQueue) , 那么线程数就不会超过corePoolSize;
4. 线程池创建的方式
4.1 Executors 自动创建线程池
4.1.1 Executors.newFixedThreadPool()
4.1.1.1 构造参数
```java public static ExecutorService newFixedThreadPool(int nThreads) {
}return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
corePoolSize 和 maxPoolSize 相同, 那么keepAliveTime 这个参数就无意义了,因为他不考虑回收多余corePoolSize的线程
<a name="1WTkU"></a>#### 4.1.1.2 原理```javapublic class FixedThreadPoolOOM {public static void main(String[] args) {ExecutorService ex = Executors.newFixedThreadPool(1);for (int i=0;i<Integer.MAX_VALUE;++i){ex.execute(new Task());}}}class Task implements Runnable {@Overridepublic void run() {try {Thread.sleep(100000000000L);} catch (InterruptedException e) {e.printStackTrace();}}}
- 由于LinkedBlockingQueue 是没有容量上限的,所以当请求越来越多的时候,并且无法及时处理完毕的时候,也就是请求堆积的时候,会容易造成占用大量的内存,最终导致OOM.
4.1.2 Executors.newSingleThreadExecutor()
4.1.2.1 构造参数
public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));}corePoolSize 和 maxPoolSize 相同, 那么keepAliveTime 这个参数就无意义了,因为他不考虑回收多余 corePoolSize的线程;
4.1.2.2 原理
缺点和 newFixedThreadPool 一样.
4.1.3 Executors.newCachedThreadPool();
4.1.3.1 构造参数
public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());}corePoolSize 为0maxPoolSize 为Int的最大值;可回收线程;队列采用的是Sync Queue ,它一种阻塞队列,其中每个 put 必须等待一个 take.
4.1.3.2 原理
maxPoolSize设置为Integer.MAX_VALUE, 这可能会创建数量很多的线程,甚至导致OOM.
4.1.4 Executors.newScheduledThreadPool()
定时执行
4.1.4.1 构造参数
4.1.4.2 原理
4.2 手动创建线程池
4.2.1 为啥要手动创建
4.2.2 手动创建
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), defaultHandler);}自定义输入各参数
/*** 用于异步的提交连接请求的线程池*/private ThreadPoolExecutor threadPoolExecutor= new ThreadPoolExecutor(16, 16, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1 << 16));
4.2.3 设置线程池中线程数量的策略

POP 订单批量导入导出中,消耗内存。
配置 coolPoolSize 公式:CPU核数 / (1- 阻塞系数) ---> 阻塞系数在0.8 ~ 0.9之间
例如:8核CPU: 8/1-0.9 = 80个线程数
