4种线程池
Executor:Java里面线程池的顶级接口是 Executor,但是严格意义上讲 Executor 并不是一个线程池,而只是一个执行线程的工具。真正的线程池接口是ExecutorService。
- newCachedThreadPool:创建一个可缓存的线程池,如果线程可用,则调用 execute 将重用以前构造的线程。如果现有线程没有可用的,则创建一个新线程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。因此,长时间保持空闲的线程池不会使用任何资源。创建的都是救急线程
- 缺点:SynchronousQueue 是一个不存储元素的队列,可以理解为队里永远是满的,因此会最终创建非核心线程来执行任务,对于非核心线程空闲60s时将被回收。因为Integer.MAX_VALUE非常大,可以认为是无限创建线程的,在资源有限的情况下容易一起OOM;
- newFixedThreadPool:创建一个固定线程数的线程池,如果在所有线程处于活动状态时提交附加任务,则在有可用线程之前,附加任务将在队列中等待。如果在关闭前的执行期间由于失败而导致任何线程终止,那么一个新线程将代替它执行后续的任务(如果需要)。在某个线程被显式地关闭之前,池中的线程将一直存在。核心线程数==最大线程数,没有救急线程被创建。
- 缺点:LinkedBlockingQueue阻塞队列没有设置大小,默认容易打爆内存Integer.MAX_VALUE,可以无限存放任务,容易发生OOM;
- newScheduledThreadPool:创建一个线程池,它可安排在给定延迟后运行命令或者定期地执行。
- newSingleThreadExecutor: Executors.newSingleThreadExecutor()返回一个线程池(这个线程池只有一个线程),这个线程池可以在线程死后(或发生异常时)重新启动一个线程来替代原来的线程继续执行下去!
- 缺点:LinkedBlockingQueue阻塞队列没有设置大小,默认容易打爆内存Integer.MAX_VALUE,可以无限存放任务,容易发生OOM;
Executor executor = Executors.newCachedThreadPool();
ExecutorService executorService = Executors.newFixedThreadPool(int nThreads);
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(int corePoolSize);
ExecutorService executorService = Executors.newSingleThreadExecutor();
线程池状态
ThreadPoolExecutor 使用 int 的高 3 位来表示线程池状态,低 29 位表示线程数量;
- 缺点:LinkedBlockingQueue阻塞队列没有设置大小,默认容易打爆内存Integer.MAX_VALUE,可以无限存放任务,容易发生OOM;
状态名 | 高3位 | 接受新任务 | 处理阻塞队列任务 | 说明 |
---|---|---|---|---|
RUNING | 111 | Y | Y | |
SHUTDOWN | 000 | N | Y | 不会接受新任务,但会处理阻塞队列剩余任务 |
STOP | 001 | N | N | 会中断正在执行的任务,并抛弃阻塞队列任务 |
TIDYING | 010 | - | - | 任务全执行完毕,活动线程为0即将进入终结 |
TERMINATED | 011 | - | - | 终结状态 |
ThreadPoolExecutor构造方法
// ThreadPoolExecutor 类的构造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {}
- corePoolSize:核心线程数目(最多保留的线程数)
- maximumPoolSize:最大线程数目
- keepAliveTime:生存时间,针对救急线程
- unit:时间单位,针对救急线程
- workQueue:阻塞队列
- threadFactory:线程工厂,创建线程,为线程起名字
- handler:拒绝策略
步骤
- 线程池中刚开始没有线程,当一个任务提交给线程池后,线程池会创建一个线程来执行任务。
- 当前线程数达到
corePoolSize
并且没有线程空闲,这时再加入任务,新加的任务被加入workQueue
队列排队,直到有空闲的线程。 - 如果队列选择了有界队列,那么任务超过了队列大小时,会创建
maximumPoolSize - corePoolSize
数目的线程来救急。 - 如果线程达到
maximumPoolSize
仍有新任务,这时会执行拒绝策略。JDK 提供了4种拒绝策略,4种拒绝策略实现接口RejectedExecutionHandler
- AbortPolicy:让调用者抛出 RejectedExecutionException 异常,也是默认策略
- CallerRunsPolicy:让调用者运行任务
- DiscardPolicy:放弃本次任务
- DiscardOldestPolicy:放弃队列中最早的任务,本任务取而代之
- 当高峰过去后,超过的
corePoolSize
的救急线程如果一段时间没有任务做,需要结束节省资源,这个时间由 keepAliveTime 和 unit 控制提交任务给线程池
ExecutorService executorService = Executors.newFixedThreadPool(10);
// 执行任务
void execute(Runnable command);
// 提交任务,用返回值 Future 获取任务执行结果
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
// 提交 task 中所有任务
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks);
// 提交 task 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其他任务取消
executorService.invokeAny();
shutdown 方法
shutdown 将线程池状态变为 SHUTDOWN 状态
- 不会接受新任务
- 但已提交任务会执行完成
- 此方法不会阻塞调用线程的执行
创建多少线程池合适
注意点
a. 过小会导致程序不能充分的利用系统资源,容易导致饥饿
b. 过大会导致更多的线程上下文切换,占用更多内存CPU密集型运算服务器
通常采用CPU核数+1
能够实现最优的CPU利用率,+1是保证当线程由于页缺失故障(操作系统)或者其他原因导致暂停时,额外的整个线程就能顶上去,保证CPU时钟周期不被浪费。I/O密集型运算服务器
CPU不总是处于繁忙状态,例如,当你执行业务计算时,这时候会使用CPU资源,但当执行IO操作时、远程RPC调用时,包括进行数据库操作时,这时候CPU就闲下来了,你可以利用多线程提高它的利用率。
经营公式如下:线程数= 核数 * 期望CPU利用率 * 总时间(CPU计算时间+等待时间)/ CPU计算时间
任务调度线程池
在【任务调度线程池】功能加入之前,可以使用java.util.Timer
来实现定时功能,Timer 的优点在于简单易用,但由于所有任务都是由同一个线程来调度,因此所有任务都是串行执行的,同一时间只能有一个任务在执行,前一个任务的延迟或者异常都讲会影响之后的任务。Timer 简单示例
```java package top.simba1949;
import java.util.Timer; import java.util.TimerTask;
/**
@author Anthony */ public class Application {
public static void main(String[] args) {
Timer timer = new Timer();
TimerTask timerTask = new TimerTask() {
@Override
public void run() {
// 处理调度逻辑
System.out.println(System.currentTimeMillis());
}
};
// 延迟1000毫秒后执行
System.out.println(System.currentTimeMillis());
timer.schedule(timerTask, 1000);
ScheduledExecutorService 简单示例
延迟执行
```java package top.simba1949;
import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit;
/**
@author Anthony */ public class Application {
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);
System.out.println(System.currentTimeMillis());
scheduledExecutorService.schedule(() -> {
// 任务调度业务逻辑
System.out.println(System.currentTimeMillis());
}, 1, TimeUnit.SECONDS); // 时间延迟单位和延迟时间数
定时执行
```java package top.simba1949;
import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit;
/**
@author Anthony */ public class Application {
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);
System.out.println(System.currentTimeMillis());
// 定时执行任务
scheduledExecutorService.scheduleAtFixedRate(() -> {
// 任务调度处理逻辑
System.out.println(System.currentTimeMillis());
}, 1, 2, TimeUnit.SECONDS); // 1表示延迟时间数,2表示每间隔时间数,时间单位
Fork/Join
概念
Fork/Join 是 JDK1.7 加入的新的线程池实现,它体现的是以种分治四项,适用于能够进行任务拆分的 CPU 密集型运算。
所谓的任务拆分,是将一个大任务拆分为算法相同的小任务,直至不能拆分可以直接求解。
Fork/Join 在分治的基础上加入了多线程,可以把每个任务的分解和合并交给不同的线程来完成,进一步提升了运算效率。
Fork/Join 默认会创建与 CPU 核心数大小相同的线程池。使用
提交给 Fork/Join 线程池的任务需要继承 RecursiveTask(有返回值)或者 RecursiveAction(无返回值)
demo
```java package top.simba1949;
import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveTask;
/**
- @author Anthony
- @date 2020/11/1 9:00
*/
public class TaskApp1 {
public static void main(String[] args) {
} }// 创建Fork/Join线程池
ForkJoinPool forkJoinPool = new ForkJoinPool();
// 打印最终结果
System.out.println(forkJoinPool.invoke(new MyTask(5)));
/**
计算 1-n 之间的整数之和 */ class MyTask extends RecursiveTask
{ private int val;
public MyTask(int val) {
this.val = val;
}
/**
- 做任务拆分
@return */ @Override protected Integer compute() { if (val == 1){
return val;
} MyTask t1 = new MyTask(val - 1); t1.fork(); // 让一个线程去执行任务
return val + t1.join(); } } ```