ThreadPoolExecutor 和ScheduledExecutorService 介绍
ThreadPoolExecutor :用于创建线程池,控制立即执行的线程
ScheduledExecutorService :创建延迟和循环线程池
public class ThreadPoolManager {
private static final String TAG = "ThreadPoolManager";
private volatile static ThreadPoolManager manager = null;
/**
* 任务缓存队列,用来存放等待执行的任务
*/
private BlockingQueue<Runnable> workQueue = new LinkedBlockingDeque<>();
public ThreadPoolManager() {
}
public static ThreadPoolManager getInstance() {
if (manager == null) {
synchronized (ThreadPoolManager.class) {
if (manager == null) {
manager = new ThreadPoolManager();
}
}
}
return manager;
}
private static ThreadPoolExecutor threadPoolExecutor = null;
private static ScheduledExecutorService scheduledExecutorService = null;
private ThreadPoolExecutor getThreadPoolExecutor() {
//核心池的大小(即线程池中的线程数目大于这个参数时,提交的任务会被放进任务缓存队列)
int corePoolSize = 2;
//线程池最大能容忍的线程数
int maximumPoolSize = 5;
//线程存货时间
long keepAliveTime = 0L;
//keepAliveTime的时间单位
TimeUnit unit = TimeUnit.MICROSECONDS;
if (!isThreadServiceEnable()) {
threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
return threadPoolExecutor;
}
public void addThreadExecutor(Runnable runnable) {
if (isThreadServiceEnable()) {
getThreadPoolExecutor().submit(runnable);
}
}
public void shutDownThreadPool() {
if (isThreadServiceEnable()) {
getThreadPoolExecutor().shutdown();
}
}
public void shutDownNowThreadPool() {
if (isThreadServiceEnable()) {
getThreadPoolExecutor().shutdownNow();
}
}
private boolean isThreadServiceEnable() {
boolean is = threadPoolExecutor != null
&& !threadPoolExecutor.isShutdown()
&& !threadPoolExecutor.isTerminated();
LogUtils.i(TAG, "isThreadServiceEnable =" + is);
return is;
}
private static final int CACHE_THREAD_SIZE = 5;
public ScheduledExecutorService getScheduledExecutorService() {
if (!isScheduledServiceEnable()) {
scheduledExecutorService = new ScheduledThreadPoolExecutor(CACHE_THREAD_SIZE);
}
return scheduledExecutorService;
}
/**
* 循环执行任务 - 以固定比率执行
*/
public ScheduledFuture<?> addScheduledExecutor(TimerTask timerTask, long initialDelay, long period, TimeUnit timeUnit) {
return getScheduledExecutorService().scheduleAtFixedRate(timerTask, initialDelay, period, timeUnit);
}
/**
* 延迟执行
*
* @param runnable
* @param delay
* @param timeUnit
* @return
*/
public ScheduledFuture<?> addDelayScheduledExecutor(Runnable runnable, long delay, TimeUnit timeUnit) {
return getScheduledExecutorService().schedule(runnable, delay, timeUnit);
}
/**
* 先前提交的任务将会被工作线程执行,新的线程将会被拒绝。
* 这个方法不会等待提交的任务执行完,我们可以用awaitTermination来等待任务执行完。
*/
public void shutDownScheduledExecutor() {
if (isScheduledServiceEnable()) {
getScheduledExecutorService().shutdown();
}
}
public void shutDownNowScheduledExecutor() {
if (isScheduledServiceEnable()) {
getScheduledExecutorService().shutdownNow();
}
}
private boolean isScheduledServiceEnable() {
boolean is = scheduledExecutorService != null
&& !scheduledExecutorService.isShutdown()
&& !scheduledExecutorService.isTerminated();
LogUtils.i(TAG, "isScheduledServiceEnable =" + is);
return is;
}
public static void cancel(ScheduledFuture<?> scheduledFuture) {
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
scheduledFuture.isCancelled();//避免有取消不掉的现象
}
}
}
ScheduledExecutorService定时周期执行指定的任务
一:简单说明
ScheduleExecutorService接口中有四个重要的方法,其中scheduleAtFixedRate和scheduleWithFixedDelay在实现定时程序时比较方便。
下面是该接口的原型定义
接口scheduleAtFixedRate原型定义及参数说明
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
command:执行线程
initialDelay:初始化延时
period:两次开始执行最小间隔时间
unit:计时单位
接口scheduleWithFixedDelay原型定义及参数说明
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
command:执行线程
initialDelay:初始化延时
period:前一次执行结束到下一次执行开始的间隔时间(间隔执行延迟时间)
unit:计时单位
二:功能示例
1.按指定频率周期执行某个任务。
初始化延迟0ms开始执行,每隔100ms重新执行一次任务。
/**
* 以固定周期频率执行任务
*/
public static void executeFixedRate() {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
executor.scheduleAtFixedRate(
new EchoServer(),
0,
100,
TimeUnit.MILLISECONDS);
}
间隔指的是连续两次任务开始执行的间隔。
2.按指定频率间隔执行某个任务。
初始化时延时0ms开始执行,本次执行结束后延迟100ms开始下次执行。
/**
* 以固定延迟时间进行执行
* 本次任务执行完成后,需要延迟设定的延迟时间,才会执行新的任务
*/
public static void executeFixedDelay() {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
executor.scheduleWithFixedDelay(
new EchoServer(),
0,
100,
TimeUnit.MILLISECONDS);
}
3.周期定时执行某个任务。
有时候我们希望一个任务被安排在凌晨3点(访问较少时)周期性的执行一个比较耗费资源的任务,可以使用下面方法设定每天在固定时间执行一次任务。
/**
* 每天晚上8点执行一次
* 每天定时安排任务进行执行
*/
public static void executeEightAtNightPerDay() {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
long oneDay = 24 * 60 * 60 * 1000;
long initDelay = getTimeMillis("20:00:00") - System.currentTimeMillis();
initDelay = initDelay > 0 ? initDelay : oneDay + initDelay;
executor.scheduleAtFixedRate(
new EchoServer(),
initDelay,
oneDay,
TimeUnit.MILLISECONDS);
}
/**
* 获取指定时间对应的毫秒数
* @param time "HH:mm:ss"
* @return
*/
private static long getTimeMillis(String time) {
try {
DateFormat dateFormat = new SimpleDateFormat("yy-MM-dd HH:mm:ss");
DateFormat dayFormat = new SimpleDateFormat("yy-MM-dd");
Date curDate = dateFormat.parse(dayFormat.format(new Date()) + " " + time);
return curDate.getTime();
} catch (ParseException e) {
e.printStackTrace();
}
return 0;
}
4.辅助代码
class EchoServer implements Runnable {
@Override
public void run() {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("This is a echo server. The current time is " +
System.currentTimeMillis() + ".");
}
}
三:一些问题
上面写的内容有不严谨的地方,比如对于scheduleAtFixedRate方法,当我们要执行的任务大于我们指定的执行间隔时会怎么样呢?
对于中文API中的注释,我们可能会被忽悠,认为无论怎么样,它都会按照我们指定的间隔进行执行,其实当执行任务的时间大于我们指定的间隔时间时,它并不会在指定间隔时开辟一个新的线程并发执行这个任务。而是等待该线程执行完毕。
源码注释如下:
/* Creates and executes a periodic action that becomes enabled first
* after the given initial delay, and subsequently with the given
* period; that is executions will commence after
* <tt>initialDelay</tt> then <tt>initialDelay+period</tt>, then
* <tt>initialDelay + 2 * period</tt>, and so on.
* If any execution of the task
* encounters an exception, subsequent executions are suppressed.
* Otherwise, the task will only terminate via cancellation or
* termination of the executor. If any execution of this task
* takes longer than its period, then subsequent executions
* may start late, but will not concurrently execute.
*/
根据注释中的内容,我们需要注意的时,我们需要捕获最上层的异常,防止出现异常中止执行,导致周期性的任务不再执行。
线程池之ThreadPoolExecutor使用
ThreadPoolExecutor提供了四个构造方法:
我们以最后一个构造方法(参数最多的那个),对其参数进行解释:
public ThreadPoolExecutor(int corePoolSize, // 1
int maximumPoolSize, // 2
long keepAliveTime, // 3
TimeUnit unit, // 4
BlockingQueue<Runnable> workQueue, // 5
ThreadFactory threadFactory, // 6
RejectedExecutionHandler handler ) { //7
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
序号 | 名称 | 类型 | 含义 |
---|---|---|---|
1 | corePoolSize | int | 核心线程池大小 |
2 | maximumPoolSize | int | 最大线程池大小 |
3 | keepAliveTime | long | 线程最大空闲时间 |
4 | unit | TimeUnit | 时间单位 |
5 | workQueue | BlockingQueue |
线程等待队列 |
6 | threadFactory | ThreadFactory | 线程创建工厂 |
7 | handler | RejectedExecutionHandler | 拒绝策略 |
知道了各个参数的作用后,我们开始构造符合我们期待的线程池。首先看JDK给我们预定义的几种线程池:
一、预定义线程池
- FixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
- corePoolSize与maximumPoolSize相等,即其线程全为核心线程,是一个固定大小的线程池,是其优势;
- keepAliveTime = 0 该参数默认对核心线程无效,而FixedThreadPool全部为核心线程;
- workQueue 为LinkedBlockingQueue(无界阻塞队列),队列最大值为Integer.MAX_VALUE。如果任务提交速度持续大余任务处理速度,会造成队列大量阻塞。因为队列很大,很有可能在拒绝策略前,内存溢出。是其劣势;
- FixedThreadPool的任务执行是无序的;
适用场景:可用于Web服务瞬时削峰,但需注意长时间持续高峰情况造成的队列阻塞。
- CachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
- corePoolSize = 0,maximumPoolSize = Integer.MAX_VALUE,即线程数量几乎无限制;
- keepAliveTime = 60s,线程空闲60s后自动结束。
- workQueue 为 SynchronousQueue 同步队列,这个队列类似于一个接力棒,入队出队必须同时传递,因为CachedThreadPool线程创建无限制,不会有队列等待,所以使用SynchronousQueue;
SingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
咋一瞅,不就是newFixedThreadPool(1)吗?定眼一看,这里多了一层FinalizableDelegatedExecutorService包装,这一层有什么用呢,写个dome来解释一下:
public static void main(String[] args) {
ExecutorService fixedExecutorService = Executors.newFixedThreadPool(1);
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) fixedExecutorService;
System.out.println(threadPoolExecutor.getMaximumPoolSize());
threadPoolExecutor.setCorePoolSize(8);
ExecutorService singleExecutorService = Executors.newSingleThreadExecutor();
// 运行时异常 java.lang.ClassCastException
// ThreadPoolExecutor threadPoolExecutor2 = (ThreadPoolExecutor) singleExecutorService;
}
对比可以看出,FixedThreadPool可以向下转型为ThreadPoolExecutor,并对其线程池进行配置,而SingleThreadExecutor被包装后,无法成功向下转型。因此,SingleThreadExecutor被定以后,无法修改,做到了真正的Single。
ScheduledThreadPool
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
newScheduledThreadPool调用的是ScheduledThreadPoolExecutor的构造方法,而ScheduledThreadPoolExecutor继承了ThreadPoolExecutor,构造是还是调用了其父类的构造方法。
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
二、自定义线程池
以下是自定义线程池,使用了有界队列,自定义ThreadFactory和拒绝策略的demo:
public class ThreadTest {
public static void main(String[] args) throws InterruptedException, IOException {
int corePoolSize = 2;
int maximumPoolSize = 4;
long keepAliveTime = 10;
TimeUnit unit = TimeUnit.SECONDS;
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2);
ThreadFactory threadFactory = new NameTreadFactory();
RejectedExecutionHandler handler = new MyIgnorePolicy();
ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit,
workQueue, threadFactory, handler);
executor.prestartAllCoreThreads(); // 预启动所有核心线程
for (int i = 1; i <= 10; i++) {
MyTask task = new MyTask(String.valueOf(i));
executor.execute(task);
}
System.in.read(); //阻塞主线程
}
static class NameTreadFactory implements ThreadFactory {
private final AtomicInteger mThreadNum = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "my-thread-" + mThreadNum.getAndIncrement());
System.out.println(t.getName() + " has been created");
return t;
}
}
public static class MyIgnorePolicy implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
doLog(r, e);
}
private void doLog(Runnable r, ThreadPoolExecutor e) {
// 可做日志记录等
System.err.println( r.toString() + " rejected");
// System.out.println("completedTaskCount: " + e.getCompletedTaskCount());
}
}
static class MyTask implements Runnable {
private String name;
public MyTask(String name) {
this.name = name;
}
@Override
public void run() {
try {
System.out.println(this.toString() + " is running!");
Thread.sleep(3000); //让任务执行慢点
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public String getName() {
return name;
}
@Override
public String toString() {
return "MyTask [name=" + name + "]";
}
}
}
输出结果如下:
其中线程线程1-4先占满了核心线程和最大线程数量,然后4、5线程进入等待队列,7-10线程被直接忽略拒绝执行,等1-4线程中有线程执行完后通知4、5线程继续执行。