ThreadPoolExecutor 和ScheduledExecutorService 介绍
ThreadPoolExecutor :用于创建线程池,控制立即执行的线程
ScheduledExecutorService :创建延迟和循环线程池
public class ThreadPoolManager {private static final String TAG = "ThreadPoolManager";private volatile static ThreadPoolManager manager = null;/*** 任务缓存队列,用来存放等待执行的任务*/private BlockingQueue<Runnable> workQueue = new LinkedBlockingDeque<>();public ThreadPoolManager() {}public static ThreadPoolManager getInstance() {if (manager == null) {synchronized (ThreadPoolManager.class) {if (manager == null) {manager = new ThreadPoolManager();}}}return manager;}private static ThreadPoolExecutor threadPoolExecutor = null;private static ScheduledExecutorService scheduledExecutorService = null;private ThreadPoolExecutor getThreadPoolExecutor() {//核心池的大小(即线程池中的线程数目大于这个参数时,提交的任务会被放进任务缓存队列)int corePoolSize = 2;//线程池最大能容忍的线程数int maximumPoolSize = 5;//线程存货时间long keepAliveTime = 0L;//keepAliveTime的时间单位TimeUnit unit = TimeUnit.MICROSECONDS;if (!isThreadServiceEnable()) {threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);}return threadPoolExecutor;}public void addThreadExecutor(Runnable runnable) {if (isThreadServiceEnable()) {getThreadPoolExecutor().submit(runnable);}}public void shutDownThreadPool() {if (isThreadServiceEnable()) {getThreadPoolExecutor().shutdown();}}public void shutDownNowThreadPool() {if (isThreadServiceEnable()) {getThreadPoolExecutor().shutdownNow();}}private boolean isThreadServiceEnable() {boolean is = threadPoolExecutor != null&& !threadPoolExecutor.isShutdown()&& !threadPoolExecutor.isTerminated();LogUtils.i(TAG, "isThreadServiceEnable =" + is);return is;}private static final int CACHE_THREAD_SIZE = 5;public ScheduledExecutorService getScheduledExecutorService() {if (!isScheduledServiceEnable()) {scheduledExecutorService = new ScheduledThreadPoolExecutor(CACHE_THREAD_SIZE);}return scheduledExecutorService;}/*** 循环执行任务 - 以固定比率执行*/public ScheduledFuture<?> addScheduledExecutor(TimerTask timerTask, long initialDelay, long period, TimeUnit timeUnit) {return getScheduledExecutorService().scheduleAtFixedRate(timerTask, initialDelay, period, timeUnit);}/*** 延迟执行** @param runnable* @param delay* @param timeUnit* @return*/public ScheduledFuture<?> addDelayScheduledExecutor(Runnable runnable, long delay, TimeUnit timeUnit) {return getScheduledExecutorService().schedule(runnable, delay, timeUnit);}/*** 先前提交的任务将会被工作线程执行,新的线程将会被拒绝。* 这个方法不会等待提交的任务执行完,我们可以用awaitTermination来等待任务执行完。*/public void shutDownScheduledExecutor() {if (isScheduledServiceEnable()) {getScheduledExecutorService().shutdown();}}public void shutDownNowScheduledExecutor() {if (isScheduledServiceEnable()) {getScheduledExecutorService().shutdownNow();}}private boolean isScheduledServiceEnable() {boolean is = scheduledExecutorService != null&& !scheduledExecutorService.isShutdown()&& !scheduledExecutorService.isTerminated();LogUtils.i(TAG, "isScheduledServiceEnable =" + is);return is;}public static void cancel(ScheduledFuture<?> scheduledFuture) {if (scheduledFuture != null) {scheduledFuture.cancel(true);scheduledFuture.isCancelled();//避免有取消不掉的现象}}}
ScheduledExecutorService定时周期执行指定的任务
一:简单说明
ScheduleExecutorService接口中有四个重要的方法,其中scheduleAtFixedRate和scheduleWithFixedDelay在实现定时程序时比较方便。
下面是该接口的原型定义
接口scheduleAtFixedRate原型定义及参数说明
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit);
command:执行线程
initialDelay:初始化延时
period:两次开始执行最小间隔时间
unit:计时单位
接口scheduleWithFixedDelay原型定义及参数说明
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit);
command:执行线程
initialDelay:初始化延时
period:前一次执行结束到下一次执行开始的间隔时间(间隔执行延迟时间)
unit:计时单位
二:功能示例
1.按指定频率周期执行某个任务。
初始化延迟0ms开始执行,每隔100ms重新执行一次任务。
/*** 以固定周期频率执行任务*/public static void executeFixedRate() {ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);executor.scheduleAtFixedRate(new EchoServer(),0,100,TimeUnit.MILLISECONDS);}
间隔指的是连续两次任务开始执行的间隔。
2.按指定频率间隔执行某个任务。
初始化时延时0ms开始执行,本次执行结束后延迟100ms开始下次执行。
/*** 以固定延迟时间进行执行* 本次任务执行完成后,需要延迟设定的延迟时间,才会执行新的任务*/public static void executeFixedDelay() {ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);executor.scheduleWithFixedDelay(new EchoServer(),0,100,TimeUnit.MILLISECONDS);}
3.周期定时执行某个任务。
有时候我们希望一个任务被安排在凌晨3点(访问较少时)周期性的执行一个比较耗费资源的任务,可以使用下面方法设定每天在固定时间执行一次任务。
/*** 每天晚上8点执行一次* 每天定时安排任务进行执行*/public static void executeEightAtNightPerDay() {ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);long oneDay = 24 * 60 * 60 * 1000;long initDelay = getTimeMillis("20:00:00") - System.currentTimeMillis();initDelay = initDelay > 0 ? initDelay : oneDay + initDelay;executor.scheduleAtFixedRate(new EchoServer(),initDelay,oneDay,TimeUnit.MILLISECONDS);}
/*** 获取指定时间对应的毫秒数* @param time "HH:mm:ss"* @return*/private static long getTimeMillis(String time) {try {DateFormat dateFormat = new SimpleDateFormat("yy-MM-dd HH:mm:ss");DateFormat dayFormat = new SimpleDateFormat("yy-MM-dd");Date curDate = dateFormat.parse(dayFormat.format(new Date()) + " " + time);return curDate.getTime();} catch (ParseException e) {e.printStackTrace();}return 0;}
4.辅助代码
class EchoServer implements Runnable {@Overridepublic void run() {try {Thread.sleep(50);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("This is a echo server. The current time is " +System.currentTimeMillis() + ".");}}
三:一些问题
上面写的内容有不严谨的地方,比如对于scheduleAtFixedRate方法,当我们要执行的任务大于我们指定的执行间隔时会怎么样呢?
对于中文API中的注释,我们可能会被忽悠,认为无论怎么样,它都会按照我们指定的间隔进行执行,其实当执行任务的时间大于我们指定的间隔时间时,它并不会在指定间隔时开辟一个新的线程并发执行这个任务。而是等待该线程执行完毕。
源码注释如下:
/* Creates and executes a periodic action that becomes enabled first* after the given initial delay, and subsequently with the given* period; that is executions will commence after* <tt>initialDelay</tt> then <tt>initialDelay+period</tt>, then* <tt>initialDelay + 2 * period</tt>, and so on.* If any execution of the task* encounters an exception, subsequent executions are suppressed.* Otherwise, the task will only terminate via cancellation or* termination of the executor. If any execution of this task* takes longer than its period, then subsequent executions* may start late, but will not concurrently execute.*/
根据注释中的内容,我们需要注意的时,我们需要捕获最上层的异常,防止出现异常中止执行,导致周期性的任务不再执行。
线程池之ThreadPoolExecutor使用
ThreadPoolExecutor提供了四个构造方法:
我们以最后一个构造方法(参数最多的那个),对其参数进行解释:
public ThreadPoolExecutor(int corePoolSize, // 1int maximumPoolSize, // 2long keepAliveTime, // 3TimeUnit unit, // 4BlockingQueue<Runnable> workQueue, // 5ThreadFactory threadFactory, // 6RejectedExecutionHandler handler ) { //7if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;}
| 序号 | 名称 | 类型 | 含义 |
|---|---|---|---|
| 1 | corePoolSize | int | 核心线程池大小 |
| 2 | maximumPoolSize | int | 最大线程池大小 |
| 3 | keepAliveTime | long | 线程最大空闲时间 |
| 4 | unit | TimeUnit | 时间单位 |
| 5 | workQueue | BlockingQueue |
线程等待队列 |
| 6 | threadFactory | ThreadFactory | 线程创建工厂 |
| 7 | handler | RejectedExecutionHandler | 拒绝策略 |
知道了各个参数的作用后,我们开始构造符合我们期待的线程池。首先看JDK给我们预定义的几种线程池:
一、预定义线程池
- FixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());}
- corePoolSize与maximumPoolSize相等,即其线程全为核心线程,是一个固定大小的线程池,是其优势;
- keepAliveTime = 0 该参数默认对核心线程无效,而FixedThreadPool全部为核心线程;
- workQueue 为LinkedBlockingQueue(无界阻塞队列),队列最大值为Integer.MAX_VALUE。如果任务提交速度持续大余任务处理速度,会造成队列大量阻塞。因为队列很大,很有可能在拒绝策略前,内存溢出。是其劣势;
- FixedThreadPool的任务执行是无序的;
适用场景:可用于Web服务瞬时削峰,但需注意长时间持续高峰情况造成的队列阻塞。
- CachedThreadPool
public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());}
- corePoolSize = 0,maximumPoolSize = Integer.MAX_VALUE,即线程数量几乎无限制;
- keepAliveTime = 60s,线程空闲60s后自动结束。
- workQueue 为 SynchronousQueue 同步队列,这个队列类似于一个接力棒,入队出队必须同时传递,因为CachedThreadPool线程创建无限制,不会有队列等待,所以使用SynchronousQueue;
SingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));}
咋一瞅,不就是newFixedThreadPool(1)吗?定眼一看,这里多了一层FinalizableDelegatedExecutorService包装,这一层有什么用呢,写个dome来解释一下:
public static void main(String[] args) {ExecutorService fixedExecutorService = Executors.newFixedThreadPool(1);ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) fixedExecutorService;System.out.println(threadPoolExecutor.getMaximumPoolSize());threadPoolExecutor.setCorePoolSize(8);ExecutorService singleExecutorService = Executors.newSingleThreadExecutor();// 运行时异常 java.lang.ClassCastException// ThreadPoolExecutor threadPoolExecutor2 = (ThreadPoolExecutor) singleExecutorService;}
对比可以看出,FixedThreadPool可以向下转型为ThreadPoolExecutor,并对其线程池进行配置,而SingleThreadExecutor被包装后,无法成功向下转型。因此,SingleThreadExecutor被定以后,无法修改,做到了真正的Single。
ScheduledThreadPool
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {return new ScheduledThreadPoolExecutor(corePoolSize);}
newScheduledThreadPool调用的是ScheduledThreadPoolExecutor的构造方法,而ScheduledThreadPoolExecutor继承了ThreadPoolExecutor,构造是还是调用了其父类的构造方法。
public ScheduledThreadPoolExecutor(int corePoolSize) {super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue());}
二、自定义线程池
以下是自定义线程池,使用了有界队列,自定义ThreadFactory和拒绝策略的demo:
public class ThreadTest {public static void main(String[] args) throws InterruptedException, IOException {int corePoolSize = 2;int maximumPoolSize = 4;long keepAliveTime = 10;TimeUnit unit = TimeUnit.SECONDS;BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2);ThreadFactory threadFactory = new NameTreadFactory();RejectedExecutionHandler handler = new MyIgnorePolicy();ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit,workQueue, threadFactory, handler);executor.prestartAllCoreThreads(); // 预启动所有核心线程for (int i = 1; i <= 10; i++) {MyTask task = new MyTask(String.valueOf(i));executor.execute(task);}System.in.read(); //阻塞主线程}static class NameTreadFactory implements ThreadFactory {private final AtomicInteger mThreadNum = new AtomicInteger(1);@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(r, "my-thread-" + mThreadNum.getAndIncrement());System.out.println(t.getName() + " has been created");return t;}}public static class MyIgnorePolicy implements RejectedExecutionHandler {public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {doLog(r, e);}private void doLog(Runnable r, ThreadPoolExecutor e) {// 可做日志记录等System.err.println( r.toString() + " rejected");// System.out.println("completedTaskCount: " + e.getCompletedTaskCount());}}static class MyTask implements Runnable {private String name;public MyTask(String name) {this.name = name;}@Overridepublic void run() {try {System.out.println(this.toString() + " is running!");Thread.sleep(3000); //让任务执行慢点} catch (InterruptedException e) {e.printStackTrace();}}public String getName() {return name;}@Overridepublic String toString() {return "MyTask [name=" + name + "]";}}}
输出结果如下:
其中线程线程1-4先占满了核心线程和最大线程数量,然后4、5线程进入等待队列,7-10线程被直接忽略拒绝执行,等1-4线程中有线程执行完后通知4、5线程继续执行。
