线程池状态
ThreadPoolExecutor
使用 int
的高 3 位来表示线程池状态,低 29 位表示线程数量。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
//CAPACITY的前三位为0,后29位为1
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 高三位存储线程池运行状态该
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
//计算线程状态,CAPACITY取反后再进行与计算
private static int runStateOf(int c) { return c & ~CAPACITY; }
//计算线程数
private static int workerCountOf(int c) { return c & CAPACITY; }
// 将线程池状态与线程个数合二为一
private static int ctlOf(int rs, int wc) { return rs | wc; }
状态名 | 高3**位** | 接收新任务 | 处理阻塞队列任务 | 说明 |
---|---|---|---|---|
RUNNING | 111 | Y | Y | |
SHUTDOWN | 000 | N | Y | 不会接收新任务,但会处理阻塞队列剩余 任务 |
STOP | 001 | N | N | 会中断正在执行的任务,并抛弃阻塞队列 任务 |
TIDYING | 010 | - | - | 任务全执行完毕,活动线程为 0 即将进入 终结 |
TERMINATED | 011 | - | - | 终结状态 |
从数字上比较,TERMINATED > TIDYING > STOP > SHUTDOWN > RUNNING
,这些信息存储在一个原子变量 ctl
中,目的是将线程池状态与线程个数合二为一,这样就可以用一次 cas 原子操作进行赋值
构造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
corePoolSize
核心线程数目 (最多保留的线程数)maximumPoolSize
最大线程数目keepAliveTime
生存时间 - 针对救急线程unit
时间单位 - 针对救急线程workQueue
阻塞队列threadFactory
线程工厂 - 可以为线程创建时起个好名字handler
拒绝策略
根据这个构造方法,JDK Executors
类中提供了众多工厂方法来创建各种用途的线程池
工作模式
- 线程池中刚开始没有线程,当一个任务提交给线程池后,线程池会创建一个新线程来执行任务。
- 当线程数达到
corePoolSize
并没有线程空闲,这时再加入任务,新加的任务会被加入workQueue
队列排队,直到有空闲的线程。 - 如果队列选择了有界队列,那么任务超过了队列大小时,最多会创建
maximumPoolSize - corePoolSize
数目的线程来救急。 - 如果线程到达
maximumPoolSize
仍然有新任务这时会执行拒绝策略。拒绝策略 jdk 提供了 4 种实现,其它著名框架也提供了实现
AbortPolicy
让调用者抛出RejectedExecutionException
异常,这是默认策略CallerRunsPolicy
让调用者运行任务DiscardPolicy
放弃本次任务DiscardOldestPolicy
放弃队列中最早的任务,本任务取而代之Dubbo
的实现,在抛出RejectedExecutionException
异常之前会记录日志,并dump
线程栈信息,方
便定位问题Netty
的实现,是创建一个新线程来执行任务ActiveMQ
的实现,带超时等待(60s)尝试放入队列,类似我们之前自定义的拒绝策略PinPoint
的实现,它使用了一个拒绝策略链,会逐一尝试策略链中每种拒绝策略- 当高峰过去后,超过
corePoolSize
的救急线程如果一段时间没有任务做,需要结束节省资源,这个时间由keepAliveTime
和unit
来控制
- 当高峰过去后,超过
newFixedThreadPool
创建一个固定大小的线程池。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
- 特点
- 核心线程数 == 最大线程数(没有救急线程被创建),因此也无需超时时间
- 阻塞队列是无界的,可以放任意数量的任务
- 如果有一个线程挂了,那么线程池会重新创建一个线程来代替挂了的线程
- 评价
newCachedThreadPool
创建一个带缓冲区的线程池
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
- 特点
- 核心线程数是 0, 最大线程数是
Integer.MAX_VALUE
,救急线程的空闲生存时间是 60s,意味着全部都是救急线程(60s 后可以回收) - 救急线程可以无限创建
- 队列采用了 SynchronousQueue 实现特点是,它没有容量,没有线程来取是放不进去的
- 评价
整个线程池表现为线程数会根据任务量不断增长,没有上限,当任务执行完毕,空闲 1分钟后释放线程。 适合任务数比较密集,但每个任务执行时间较短的情况
**
newSingleThreadExecutor
创建只有一个线程的线程池
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
- 使用场景:
希望多个任务排队执行。线程数固定为 1,任务数多于 1 时,会放入无界队列排队。任务执行完毕,这唯一的线程也不会被释放。 - 区别:
- 自己创建一个单线程串行执行任务,如果任务执行失败而终止那么没有任何补救措施,而线程池还会新建一个线程,保证池的正常工作
Executors.newSingleThreadExecutor()
线程个数始终为1,不能修改,FinalizableDelegatedExecutorService
应用的是装饰器模式,只对外暴露了ExecutorService
接口,因此不能调用ThreadPoolExecutor
中特有的方法Executors.newFixedThreadPool(1)
初始时为1,以后还可以修改,它对外暴露的是ThreadPoolExecutor
对象,可以强转后调用setCorePoolSize
等方法进行修改提交任务
```java // 执行任务 void execute(Runnable command);
// 有返回值的提交任务 task,用返回值 Future 获得任务执行结果
// 提交 tasks 中所有任务
// 提交 tasks 中所有任务,带超时时间
// 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消,只要有一个成功了就返回,其余的丢弃
// 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消,带超时时间
<a name="jaHjU"></a>
# 任务调度线程池
【任务调度线程池】表现为:线程数固定,任务数多于线程数时,会放入无界队列排队。任务执行完毕,这些线程也不会被释放。用来执行延迟或反复执行的任务<br />在【任务调度线程池】功能加入之前,可以使用`java.util.Timer `来实现定时功能,`Timer`的优点在于简单易用,但由于所有任务都是由同一个线程来调度,因此所有任务都是串行执行的,同一时间只能有一个任务在执行,前一个任务的延迟或异常都将会影响到之后的任务
```java
public static void testTime() {
Timer timer = new Timer();
TimerTask task1 = new TimerTask() {
@Override
public void run() {
log.info("task 1");
sleep(2);
}
};
TimerTask task2 = new TimerTask() {
@Override
public void run() {
log.info("task 2");
}
};
// 使用 timer 添加两个任务,希望它们都在 1s 后执行
// 但由于 timer 内只有一个线程来顺序执行队列中的任务,因此『任务1』的延时,影响了『任务2』的执行
timer.schedule(task1, 1000);
timer.schedule(task2, 1000);
}
// 程序希望1s后两个任务同时执行,但是最终结果是任务1先执行完毕,才能执行任务2
2021-10-04 20:50:07.090 [Timer-0] INFO - task 1
2021-10-04 20:50:09.092 [Timer-0] INFO - task 2
使用
ScheduledExecutorService
改写public static void testScheduled() {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
// 添加两个任务,希望它们都在 1s 后执行
executor.schedule(() -> {
System.out.println("任务1,执行时间:" + new Date());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
}, 1000, TimeUnit.MILLISECONDS);
executor.schedule(() -> {
System.out.println("任务2,执行时间:" + new Date());
}, 1000, TimeUnit.MILLISECONDS);
}
//任务1 和 任务2几乎同时执行
任务1,执行时间:Mon Oct 04 20:52:33 CST 2021
任务2,执行时间:Mon Oct 04 20:52:33 CST 2021
scheduleAtFixedRate
用法public static void testScheduleAtFixedRate() {
ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
log.info("start...");
pool.scheduleAtFixedRate(() -> {
log.info("running...");
}, 1, 1, TimeUnit.SECONDS);
}
// 程序每隔1s执行一次,首次执行是在主程序启动后1s
2021-10-04 20:54:38.153 [main] INFO - start...
2021-10-04 20:54:39.185 [pool-1-thread-1] INFO - running...
2021-10-04 20:54:40.185 [pool-1-thread-1] INFO - running...
2021-10-04 20:54:41.184 [pool-1-thread-1] INFO - running...
2021-10-04 20:54:42.185 [pool-1-thread-1] INFO - running...
2021-10-04 20:54:43.184 [pool-1-thread-1] INFO - running...
如果任务执行耗时超过了间隔时间
public static void testScheduleAtFixedRateTimeOut() {
ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
log.info("start...");
pool.scheduleAtFixedRate(() -> {
log.info("running...");
sleep(2);
}, 1, 1, TimeUnit.SECONDS);
}
//程序本意是希望每隔1s执行一次,但是任务的执行耗时是2s,大于时间间隔,所以采用的是任务执行完毕后再执行
//输出分析:一开始,延时 1s,接下来,由于任务执行时间 > 间隔时间,间隔被『撑』到了 2s
2021-10-04 20:56:08.081 [main] INFO - start...
2021-10-04 20:56:09.113 [pool-1-thread-1] INFO - running...
2021-10-04 20:56:11.116 [pool-1-thread-1] INFO - running...
2021-10-04 20:56:13.117 [pool-1-thread-1] INFO - running...
2021-10-04 20:56:15.117 [pool-1-thread-1] INFO - running...
scheduleWithFixedDelay
用法public static void testScheduleWithFixedDelay() {
ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
log.info("start...");
pool.scheduleWithFixedDelay(() -> {
log.info("running...");
sleep(2);
}, 1, 1, TimeUnit.SECONDS);
}
//输出分析:一开始,延时 1s,scheduleWithFixedDelay 的间隔是 上一个任务结束 <--> 延时 <--> 下一个任务开始 所以间隔都是 3s
2021-10-04 20:58:04.276 [main] INFO - start...
2021-10-04 20:58:05.307 [pool-1-thread-1] INFO - running...
2021-10-04 20:58:08.311 [pool-1-thread-1] INFO - running...
2021-10-04 20:58:11.312 [pool-1-thread-1] INFO - running...
2021-10-04 20:58:14.313 [pool-1-thread-1] INFO - running...
处理任务执行异常
主动捕捉异常
ExecutorService pool = Executors.newFixedThreadPool(1);
pool.submit(() -> {
try {
log.debug("task1");
int i = 1 / 0;
} catch (Exception e) {
log.error("error:", e);
}
});
使用
Future
f.get
会抛出InterruptedException
和ExecutionException
异常
public void test() throws ExecutionException, InterruptedException {
ExecutorService pool = Executors.newFixedThreadPool(1);
Future<Boolean> f = pool.submit(() -> {
log.debug("task1");
int i = 1 / 0;
return true;
});
log.debug("result:{}", f.get());
}