1. 线程池基础介绍
1.1 什么是”池”?
1.2 线程池适合应用的场合
- 服务器接收到大量的请求时,使用线程池技术是非常合适的,它可以大大减少线程的创建和销毁次数,提高服务器的工作效率.
- 实际上,开发中,如果需要创建5个线程以上,那么就可以使用线程池来管理了.
2. 参数说明
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
参数 | 说明 |
---|---|
corePoolSize | 核心线程数量,线程池维护线程的最少数量 |
maximumPoolSize | 线程池维护线程的最大数量 |
keepAliveTime | 线程池除核心线程外的其他线程的最长空闲时间,超过该时间的空闲线程会被销毁 |
unit | keepAliveTime的单位,TimeUnit中的几个静态属性:NANOSECONDS、MICROSECONDS、MILLISECONDS、SECONDS |
workQueue | 线程池所使用的任务缓冲队列 |
threadFactory | 线程工厂,用于创建线程,一般用默认的即可 |
handler | 线程池对拒绝任务的处理策略 |
2.1 corePoolSize
核心线程数;
线程池在完成初始化后,默认情况下,线程池中并没有任何线程,线程池会等待有任务到来时,
再创建新线程去执行任务。
2.2 maximumPoolSize
线程池有可能会在核心线程数的基础上,额外增加一些线程,但是这些新增加的线程数会有一个上限,这个最大值就是 maxinumPoolSize
2.3 keepAliveTime
如果线程池当前的线程数 大于 corePoolSize, 那么如果多余的线程空闲时间超过keepAliveTime,它们就会被终止。
2.3.1 allowCoreThreadTimeOut
这个设置就是说 如果设置为true ,核心线程数 corePoolSize 超过keepAliveTime 空闲也会被回收。
/**
* If false (default), core threads stay alive even when idle.
* If true, core threads use keepAliveTime to time out waiting
* for work.
*/
private volatile boolean allowCoreThreadTimeOut;
2.4 unit
2.5 workQueue
2.5.1 三种常见的队列类型
2.5.1.1 SynchronousQueue 直接交接
2.5.1.2 LinkedBlockingQueue 无界队列
2.5.1.3 ArrayBlockingQueue 有界队列
2.6 threadFactory
- 新的线程是由 ThreadFactory创建的,默认使用Executors.defaultThreadFactory(),创建出来的线程:
【1】 在同一个线程组
【2】 拥有同样的NORM_PRIORITY优先级并且都不是守护线程。
【3】如果是自己指定的ThreadFactory ,那么就可以改变线程名,线程组,优先级,是否是守护线程等等。
2.7 handler 拒绝策略
2.7.1 AbortPolicy 中止策略
/**
* A handler for rejected tasks that throws a
* {@code RejectedExecutionException}.
*/
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
特性
A handler for rejected tasks that throws a RejectedExecutionException.
只抛出异常,其余啥也不做. 抛弃异常
-
2.7.1.1 例子说明
public class RunnableStyle {
static int corePoolSize = 1;
static int maximumPoolSize = 2;
static int keepAliveTime = 2;
static AtomicInteger count=new AtomicInteger();
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 2, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), new ThreadPoolExecutor.AbortPolicy());
for (int i = 0; i < 100; i++) {
executor.execute(new MyRunnable());
}
executor.shutdown();
}
static class MyRunnable implements Runnable {
@Override
public void run() {
try {
System.out.println("线程:" + Thread.currentThread() + "开始执行任务ID:" + count.incrementAndGet());
Thread.sleep(3000L);
System.out.println("线程:" + Thread.currentThread() + "执行完了任务");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
```java Exception in thread “main” java.util.concurrent.RejectedExecutionException 线程:Thread[pool-1-thread-2,5,main]开始执行任务ID:1 线程:Thread[pool-1-thread-1,5,main]开始执行任务ID:2 线程:Thread[pool-1-thread-1,5,main]执行完了任务 线程:Thread[pool-1-thread-2,5,main]执行完了任务 线程:Thread[pool-1-thread-1,5,main]开始执行任务ID:3 线程:Thread[pool-1-thread-2,5,main]开始执行任务ID:4 线程:Thread[pool-1-thread-1,5,main]执行完了任务 线程:Thread[pool-1-thread-2,5,main]执行完了任务
这个线程池 最大线程数量为2 ,队列长度为2 ,也就是最大能接受四个任务,其余任务直接抛弃,并抛出异常。
<a name="vxGcp"></a>
### 2.7.2 DiscardPolicy 丢弃策略
```java
/**
* A handler for rejected tasks that silently discards the
* rejected task.
*/
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy() { }
/**
* Does nothing, which has the effect of discarding task r.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
特性
拒绝任务的处理程序,以静默方式丢弃被拒绝的任务.
2.7.2.1 举例说明
public class RunnableStyle {
static int corePoolSize = 1;
static int maximumPoolSize = 2;
static int keepAliveTime = 2;
static AtomicInteger count=new AtomicInteger();
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 2, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), new ThreadPoolExecutor.DiscardPolicy());
for (int i = 0; i < 100; i++) {
executor.execute(new MyRunnable());
}
executor.shutdown();
}
static class MyRunnable implements Runnable {
@Override
public void run() {
try {
System.out.println("线程:" + Thread.currentThread() + "开始执行任务ID:" + count.incrementAndGet());
Thread.sleep(3000L);
System.out.println("线程:" + Thread.currentThread() + "执行完了任务");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
```java 线程:Thread[pool-1-thread-2,5,main]开始执行任务ID:2 线程:Thread[pool-1-thread-1,5,main]开始执行任务ID:1 线程:Thread[pool-1-thread-2,5,main]执行完了任务 线程:Thread[pool-1-thread-1,5,main]执行完了任务 线程:Thread[pool-1-thread-2,5,main]开始执行任务ID:3 线程:Thread[pool-1-thread-1,5,main]开始执行任务ID:4 线程:Thread[pool-1-thread-2,5,main]执行完了任务 线程:Thread[pool-1-thread-1,5,main]执行完了任务
这个线程池 最大线程数量为2 ,队列长度为2 ,也就是最大能接受四个任务,其余任务直接抛弃,不抛出异常。
<a name="7pMoG"></a>
### 2.7.3 CallerRunsPolicy
<a name="a7tqq"></a>
#### 2.7.3.1 拒绝策略代码
```cpp
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
将使用当前线程 执行 该任务,一般为主线程,主线程阻塞,新增的任务 阻塞在 计算机内存中。
/**
* A handler for rejected tasks that runs the rejected task
* directly in the calling thread of the {@code execute} method,
* unless the executor has been shut down, in which case the task
* is discarded.
*/
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code CallerRunsPolicy}.
*/
public CallerRunsPolicy() { }
/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
* 在调用者的线程中(可以理解为 主线程)执行任务,除非
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
2.7.3.1 举例说明
public class RunnableStyle {
static int corePoolSize = 1;
static int maximumPoolSize = 2;
static int keepAliveTime = 2;
static AtomicInteger count=new AtomicInteger();
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 2, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), new ThreadPoolExecutor.CallerRunsPolicy());
for (int i = 0; i < 100; i++) {
executor.execute(new MyRunnable());
}
executor.shutdown();
}
static class MyRunnable implements Runnable {
@Override
public void run() {
try {
System.out.println("线程:" + Thread.currentThread() + "开始执行任务ID:" + count.incrementAndGet());
Thread.sleep(3000L);
System.out.println("线程:" + Thread.currentThread() + "执行完了任务");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
线程不够用时,主线程也会参与执行任务,所以导致的问题就是 阻塞主线程;
2.7.4 DiscardOldestPolicy
2.7.4.1 拒绝策略代码
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
获取任务队列,将头部任务移除掉,将新任务加入队列中。
package com.zhangyong.multiThread.multiThread.createThreads;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* <p>Description: </p>
* <p>Company: http://www.dmall.com</p>
*
* @author yong.zhang@dmall.com
* @version 1.0.0
* @date 2020/12/17 20:18
*/
public class RunnableStyle {
static int corePoolSize = 1;
static int maximumPoolSize = 3;
static int keepAliveTime = 2;
static AtomicInteger count=new AtomicInteger();
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 2, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), new ThreadPoolExecutor.DiscardOldestPolicy());
for (int i = 0; i < 100; i++) {
executor.execute(new MyRunnable());
}
executor.shutdown();
}
static class MyRunnable implements Runnable {
@Override
public void run() {
try {
System.out.println("线程:" + Thread.currentThread() + "开始执行任务ID:" + count.incrementAndGet());
Thread.sleep(3000L);
System.out.println("线程:" + Thread.currentThread() + "执行完了任务");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
线程:Thread[pool-1-thread-1,5,main]开始执行任务ID:2
线程:Thread[pool-1-thread-2,5,main]开始执行任务ID:1
线程:Thread[pool-1-thread-3,5,main]开始执行任务ID:3
线程:Thread[pool-1-thread-2,5,main]执行完了任务
线程:Thread[pool-1-thread-1,5,main]执行完了任务
线程:Thread[pool-1-thread-2,5,main]开始执行任务ID:4
线程:Thread[pool-1-thread-1,5,main]开始执行任务ID:5
线程:Thread[pool-1-thread-3,5,main]执行完了任务
线程:Thread[pool-1-thread-2,5,main]执行完了任务
线程:Thread[pool-1-thread-1,5,main]执行完了任务;
新的任务加入到线程中后,如果没有被立刻执行的话,
3.线程池任务执行
3.1 添加线程规则
- 1. 如果线程数小于corePoolSize , 除非设置了allowCoreThreadTimeOut , 即使其他工作线程处于空闲状态,也会创建一个新线程来运行新任务。
- 2. 如果线程数>= corePoolSize 但少于 maximumPoolSize ,则将任务放入队列。
- 3. 如果队列已满,并且线程数小于maxPoolSize, 则创建一个新线程来运行任务。
[x] 4. 如果队列已满,并且线程数大于maxPoolSize,则使用设置的拒绝策略拒绝该任务。
3.2 增减线程的特点
[x] 1. 通过设置corePoolSize 和 maximumPoolSize 相同,就可以创建固定大小的线程池。
- 2. 线程池希望的是 永远保持较少的线程数,并且只有在负载变得很大的时候才去增大它。
- 3. 通过设置maximumPoolSize 为很高的值,例如Integer.MAX_VALUE , 可以允许线程池容纳任意数量的并发任务。但是实际上硬件上的限制不会出现这种情况
- 4. 只有在队列填满时才创建多余corePoolSize 的线程,所以如果你使用的是无界队列(LinkedBlockingQueue) , 那么线程数就不会超过corePoolSize;
4. 线程池创建的方式
4.1 Executors 自动创建线程池
4.1.1 Executors.newFixedThreadPool()
4.1.1.1 构造参数
```java public static ExecutorService newFixedThreadPool(int nThreads) {
}return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
corePoolSize 和 maxPoolSize 相同, 那么keepAliveTime 这个参数就无意义了,因为他不考虑回收多余corePoolSize的线程
<a name="1WTkU"></a>
#### 4.1.1.2 原理
```java
public class FixedThreadPoolOOM {
public static void main(String[] args) {
ExecutorService ex = Executors.newFixedThreadPool(1);
for (int i=0;i<Integer.MAX_VALUE;++i){
ex.execute(new Task());
}
}
}
class Task implements Runnable {
@Override
public void run() {
try {
Thread.sleep(100000000000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
- 由于LinkedBlockingQueue 是没有容量上限的,所以当请求越来越多的时候,并且无法及时处理完毕的时候,也就是请求堆积的时候,会容易造成占用大量的内存,最终导致OOM.
4.1.2 Executors.newSingleThreadExecutor()
4.1.2.1 构造参数
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
corePoolSize 和 maxPoolSize 相同, 那么keepAliveTime 这个参数就无意义了,因为他不考虑回收多余 corePoolSize的线程;
4.1.2.2 原理
缺点和 newFixedThreadPool 一样.
4.1.3 Executors.newCachedThreadPool();
4.1.3.1 构造参数
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
corePoolSize 为0
maxPoolSize 为Int的最大值;
可回收线程;
队列采用的是Sync Queue ,它一种阻塞队列,其中每个 put 必须等待一个 take.
4.1.3.2 原理
maxPoolSize设置为Integer.MAX_VALUE, 这可能会创建数量很多的线程,甚至导致OOM.
4.1.4 Executors.newScheduledThreadPool()
定时执行
4.1.4.1 构造参数
4.1.4.2 原理
4.2 手动创建线程池
4.2.1 为啥要手动创建
4.2.2 手动创建
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
自定义输入各参数
/**
* 用于异步的提交连接请求的线程池
*/
private ThreadPoolExecutor threadPoolExecutor
= new ThreadPoolExecutor(16, 16, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1 << 16));
4.2.3 设置线程池中线程数量的策略
POP 订单批量导入导出中,消耗内存。
配置 coolPoolSize 公式:CPU核数 / (1- 阻塞系数) ---> 阻塞系数在0.8 ~ 0.9之间
例如:8核CPU: 8/1-0.9 = 80个线程数