ThreadPoolExecutor 和ScheduledExecutorService 介绍

ThreadPoolExecutor :用于创建线程池,控制立即执行的线程
ScheduledExecutorService :创建延迟和循环线程池

  1. public class ThreadPoolManager {
  2. private static final String TAG = "ThreadPoolManager";
  3. private volatile static ThreadPoolManager manager = null;
  4. /**
  5. * 任务缓存队列,用来存放等待执行的任务
  6. */
  7. private BlockingQueue<Runnable> workQueue = new LinkedBlockingDeque<>();
  8. public ThreadPoolManager() {
  9. }
  10. public static ThreadPoolManager getInstance() {
  11. if (manager == null) {
  12. synchronized (ThreadPoolManager.class) {
  13. if (manager == null) {
  14. manager = new ThreadPoolManager();
  15. }
  16. }
  17. }
  18. return manager;
  19. }
  20. private static ThreadPoolExecutor threadPoolExecutor = null;
  21. private static ScheduledExecutorService scheduledExecutorService = null;
  22. private ThreadPoolExecutor getThreadPoolExecutor() {
  23. //核心池的大小(即线程池中的线程数目大于这个参数时,提交的任务会被放进任务缓存队列)
  24. int corePoolSize = 2;
  25. //线程池最大能容忍的线程数
  26. int maximumPoolSize = 5;
  27. //线程存货时间
  28. long keepAliveTime = 0L;
  29. //keepAliveTime的时间单位
  30. TimeUnit unit = TimeUnit.MICROSECONDS;
  31. if (!isThreadServiceEnable()) {
  32. threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
  33. }
  34. return threadPoolExecutor;
  35. }
  36. public void addThreadExecutor(Runnable runnable) {
  37. if (isThreadServiceEnable()) {
  38. getThreadPoolExecutor().submit(runnable);
  39. }
  40. }
  41. public void shutDownThreadPool() {
  42. if (isThreadServiceEnable()) {
  43. getThreadPoolExecutor().shutdown();
  44. }
  45. }
  46. public void shutDownNowThreadPool() {
  47. if (isThreadServiceEnable()) {
  48. getThreadPoolExecutor().shutdownNow();
  49. }
  50. }
  51. private boolean isThreadServiceEnable() {
  52. boolean is = threadPoolExecutor != null
  53. && !threadPoolExecutor.isShutdown()
  54. && !threadPoolExecutor.isTerminated();
  55. LogUtils.i(TAG, "isThreadServiceEnable =" + is);
  56. return is;
  57. }
  58. private static final int CACHE_THREAD_SIZE = 5;
  59. public ScheduledExecutorService getScheduledExecutorService() {
  60. if (!isScheduledServiceEnable()) {
  61. scheduledExecutorService = new ScheduledThreadPoolExecutor(CACHE_THREAD_SIZE);
  62. }
  63. return scheduledExecutorService;
  64. }
  65. /**
  66. * 循环执行任务 - 以固定比率执行
  67. */
  68. public ScheduledFuture<?> addScheduledExecutor(TimerTask timerTask, long initialDelay, long period, TimeUnit timeUnit) {
  69. return getScheduledExecutorService().scheduleAtFixedRate(timerTask, initialDelay, period, timeUnit);
  70. }
  71. /**
  72. * 延迟执行
  73. *
  74. * @param runnable
  75. * @param delay
  76. * @param timeUnit
  77. * @return
  78. */
  79. public ScheduledFuture<?> addDelayScheduledExecutor(Runnable runnable, long delay, TimeUnit timeUnit) {
  80. return getScheduledExecutorService().schedule(runnable, delay, timeUnit);
  81. }
  82. /**
  83. * 先前提交的任务将会被工作线程执行,新的线程将会被拒绝。
  84. * 这个方法不会等待提交的任务执行完,我们可以用awaitTermination来等待任务执行完。
  85. */
  86. public void shutDownScheduledExecutor() {
  87. if (isScheduledServiceEnable()) {
  88. getScheduledExecutorService().shutdown();
  89. }
  90. }
  91. public void shutDownNowScheduledExecutor() {
  92. if (isScheduledServiceEnable()) {
  93. getScheduledExecutorService().shutdownNow();
  94. }
  95. }
  96. private boolean isScheduledServiceEnable() {
  97. boolean is = scheduledExecutorService != null
  98. && !scheduledExecutorService.isShutdown()
  99. && !scheduledExecutorService.isTerminated();
  100. LogUtils.i(TAG, "isScheduledServiceEnable =" + is);
  101. return is;
  102. }
  103. public static void cancel(ScheduledFuture<?> scheduledFuture) {
  104. if (scheduledFuture != null) {
  105. scheduledFuture.cancel(true);
  106. scheduledFuture.isCancelled();//避免有取消不掉的现象
  107. }
  108. }
  109. }

ScheduledExecutorService定时周期执行指定的任务

一:简单说明
ScheduleExecutorService接口中有四个重要的方法,其中scheduleAtFixedRate和scheduleWithFixedDelay在实现定时程序时比较方便。
下面是该接口的原型定义

接口scheduleAtFixedRate原型定义及参数说明

  1. public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
  2. long initialDelay,
  3. long period,
  4. TimeUnit unit);

command:执行线程
initialDelay:初始化延时
period:两次开始执行最小间隔时间
unit:计时单位

接口scheduleWithFixedDelay原型定义及参数说明

  1. public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
  2. long initialDelay,
  3. long delay,
  4. TimeUnit unit);

command:执行线程
initialDelay:初始化延时
period:前一次执行结束到下一次执行开始的间隔时间(间隔执行延迟时间)
unit:计时单位

二:功能示例
1.按指定频率周期执行某个任务。
初始化延迟0ms开始执行,每隔100ms重新执行一次任务。

  1. /**
  2. * 以固定周期频率执行任务
  3. */
  4. public static void executeFixedRate() {
  5. ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
  6. executor.scheduleAtFixedRate(
  7. new EchoServer(),
  8. 0,
  9. 100,
  10. TimeUnit.MILLISECONDS);
  11. }

间隔指的是连续两次任务开始执行的间隔。

2.按指定频率间隔执行某个任务。
初始化时延时0ms开始执行,本次执行结束后延迟100ms开始下次执行。

  1. /**
  2. * 以固定延迟时间进行执行
  3. * 本次任务执行完成后,需要延迟设定的延迟时间,才会执行新的任务
  4. */
  5. public static void executeFixedDelay() {
  6. ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
  7. executor.scheduleWithFixedDelay(
  8. new EchoServer(),
  9. 0,
  10. 100,
  11. TimeUnit.MILLISECONDS);
  12. }

3.周期定时执行某个任务。
有时候我们希望一个任务被安排在凌晨3点(访问较少时)周期性的执行一个比较耗费资源的任务,可以使用下面方法设定每天在固定时间执行一次任务。

  1. /**
  2. * 每天晚上8点执行一次
  3. * 每天定时安排任务进行执行
  4. */
  5. public static void executeEightAtNightPerDay() {
  6. ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
  7. long oneDay = 24 * 60 * 60 * 1000;
  8. long initDelay = getTimeMillis("20:00:00") - System.currentTimeMillis();
  9. initDelay = initDelay > 0 ? initDelay : oneDay + initDelay;
  10. executor.scheduleAtFixedRate(
  11. new EchoServer(),
  12. initDelay,
  13. oneDay,
  14. TimeUnit.MILLISECONDS);
  15. }
  1. /**
  2. * 获取指定时间对应的毫秒数
  3. * @param time "HH:mm:ss"
  4. * @return
  5. */
  6. private static long getTimeMillis(String time) {
  7. try {
  8. DateFormat dateFormat = new SimpleDateFormat("yy-MM-dd HH:mm:ss");
  9. DateFormat dayFormat = new SimpleDateFormat("yy-MM-dd");
  10. Date curDate = dateFormat.parse(dayFormat.format(new Date()) + " " + time);
  11. return curDate.getTime();
  12. } catch (ParseException e) {
  13. e.printStackTrace();
  14. }
  15. return 0;
  16. }

4.辅助代码

  1. class EchoServer implements Runnable {
  2. @Override
  3. public void run() {
  4. try {
  5. Thread.sleep(50);
  6. } catch (InterruptedException e) {
  7. e.printStackTrace();
  8. }
  9. System.out.println("This is a echo server. The current time is " +
  10. System.currentTimeMillis() + ".");
  11. }
  12. }

三:一些问题
上面写的内容有不严谨的地方,比如对于scheduleAtFixedRate方法,当我们要执行的任务大于我们指定的执行间隔时会怎么样呢?

对于中文API中的注释,我们可能会被忽悠,认为无论怎么样,它都会按照我们指定的间隔进行执行,其实当执行任务的时间大于我们指定的间隔时间时,它并不会在指定间隔时开辟一个新的线程并发执行这个任务。而是等待该线程执行完毕。


源码注释如下:

  1. /* Creates and executes a periodic action that becomes enabled first
  2. * after the given initial delay, and subsequently with the given
  3. * period; that is executions will commence after
  4. * <tt>initialDelay</tt> then <tt>initialDelay+period</tt>, then
  5. * <tt>initialDelay + 2 * period</tt>, and so on.
  6. * If any execution of the task
  7. * encounters an exception, subsequent executions are suppressed.
  8. * Otherwise, the task will only terminate via cancellation or
  9. * termination of the executor. If any execution of this task
  10. * takes longer than its period, then subsequent executions
  11. * may start late, but will not concurrently execute.
  12. */

根据注释中的内容,我们需要注意的时,我们需要捕获最上层的异常,防止出现异常中止执行,导致周期性的任务不再执行。

线程池之ThreadPoolExecutor使用

ThreadPoolExecutor提供了四个构造方法:
image.png

我们以最后一个构造方法(参数最多的那个),对其参数进行解释:

  1. public ThreadPoolExecutor(int corePoolSize, // 1
  2. int maximumPoolSize, // 2
  3. long keepAliveTime, // 3
  4. TimeUnit unit, // 4
  5. BlockingQueue<Runnable> workQueue, // 5
  6. ThreadFactory threadFactory, // 6
  7. RejectedExecutionHandler handler ) { //7
  8. if (corePoolSize < 0 ||
  9. maximumPoolSize <= 0 ||
  10. maximumPoolSize < corePoolSize ||
  11. keepAliveTime < 0)
  12. throw new IllegalArgumentException();
  13. if (workQueue == null || threadFactory == null || handler == null)
  14. throw new NullPointerException();
  15. this.corePoolSize = corePoolSize;
  16. this.maximumPoolSize = maximumPoolSize;
  17. this.workQueue = workQueue;
  18. this.keepAliveTime = unit.toNanos(keepAliveTime);
  19. this.threadFactory = threadFactory;
  20. this.handler = handler;
  21. }
序号 名称 类型 含义
1 corePoolSize int 核心线程池大小
2 maximumPoolSize int 最大线程池大小
3 keepAliveTime long 线程最大空闲时间
4 unit TimeUnit 时间单位
5 workQueue BlockingQueue 线程等待队列
6 threadFactory ThreadFactory 线程创建工厂
7 handler RejectedExecutionHandler 拒绝策略

知道了各个参数的作用后,我们开始构造符合我们期待的线程池。首先看JDK给我们预定义的几种线程池:

一、预定义线程池
  1. FixedThreadPool
    1. public static ExecutorService newFixedThreadPool(int nThreads) {
    2. return new ThreadPoolExecutor(nThreads, nThreads,
    3. 0L, TimeUnit.MILLISECONDS,
    4. new LinkedBlockingQueue<Runnable>());
    5. }
    • corePoolSize与maximumPoolSize相等,即其线程全为核心线程,是一个固定大小的线程池,是其优势;
    • keepAliveTime = 0 该参数默认对核心线程无效,而FixedThreadPool全部为核心线程;
    • workQueue 为LinkedBlockingQueue(无界阻塞队列),队列最大值为Integer.MAX_VALUE。如果任务提交速度持续大余任务处理速度,会造成队列大量阻塞。因为队列很大,很有可能在拒绝策略前,内存溢出。是其劣势;
    • FixedThreadPool的任务执行是无序的;

适用场景:可用于Web服务瞬时削峰,但需注意长时间持续高峰情况造成的队列阻塞。

  1. CachedThreadPool
    1. public static ExecutorService newCachedThreadPool() {
    2. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
    3. 60L, TimeUnit.SECONDS,
    4. new SynchronousQueue<Runnable>());
    5. }
    • corePoolSize = 0,maximumPoolSize = Integer.MAX_VALUE,即线程数量几乎无限制;
    • keepAliveTime = 60s,线程空闲60s后自动结束。
    • workQueue 为 SynchronousQueue 同步队列,这个队列类似于一个接力棒,入队出队必须同时传递,因为CachedThreadPool线程创建无限制,不会有队列等待,所以使用SynchronousQueue;
  1. SingleThreadExecutor

    1. public static ExecutorService newSingleThreadExecutor() {
    2. return new FinalizableDelegatedExecutorService
    3. (new ThreadPoolExecutor(1, 1,
    4. 0L, TimeUnit.MILLISECONDS,
    5. new LinkedBlockingQueue<Runnable>()));
    6. }

    咋一瞅,不就是newFixedThreadPool(1)吗?定眼一看,这里多了一层FinalizableDelegatedExecutorService包装,这一层有什么用呢,写个dome来解释一下:

    1. public static void main(String[] args) {
    2. ExecutorService fixedExecutorService = Executors.newFixedThreadPool(1);
    3. ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) fixedExecutorService;
    4. System.out.println(threadPoolExecutor.getMaximumPoolSize());
    5. threadPoolExecutor.setCorePoolSize(8);
    6. ExecutorService singleExecutorService = Executors.newSingleThreadExecutor();
    7. // 运行时异常 java.lang.ClassCastException
    8. // ThreadPoolExecutor threadPoolExecutor2 = (ThreadPoolExecutor) singleExecutorService;
    9. }

    对比可以看出,FixedThreadPool可以向下转型为ThreadPoolExecutor,并对其线程池进行配置,而SingleThreadExecutor被包装后,无法成功向下转型。因此,SingleThreadExecutor被定以后,无法修改,做到了真正的Single。

  2. ScheduledThreadPool

    1. public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    2. return new ScheduledThreadPoolExecutor(corePoolSize);
    3. }

    newScheduledThreadPool调用的是ScheduledThreadPoolExecutor的构造方法,而ScheduledThreadPoolExecutor继承了ThreadPoolExecutor,构造是还是调用了其父类的构造方法。

  1. public ScheduledThreadPoolExecutor(int corePoolSize) {
  2. super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
  3. new DelayedWorkQueue());
  4. }

二、自定义线程池

以下是自定义线程池,使用了有界队列,自定义ThreadFactory和拒绝策略的demo:

  1. public class ThreadTest {
  2. public static void main(String[] args) throws InterruptedException, IOException {
  3. int corePoolSize = 2;
  4. int maximumPoolSize = 4;
  5. long keepAliveTime = 10;
  6. TimeUnit unit = TimeUnit.SECONDS;
  7. BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2);
  8. ThreadFactory threadFactory = new NameTreadFactory();
  9. RejectedExecutionHandler handler = new MyIgnorePolicy();
  10. ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit,
  11. workQueue, threadFactory, handler);
  12. executor.prestartAllCoreThreads(); // 预启动所有核心线程
  13. for (int i = 1; i <= 10; i++) {
  14. MyTask task = new MyTask(String.valueOf(i));
  15. executor.execute(task);
  16. }
  17. System.in.read(); //阻塞主线程
  18. }
  19. static class NameTreadFactory implements ThreadFactory {
  20. private final AtomicInteger mThreadNum = new AtomicInteger(1);
  21. @Override
  22. public Thread newThread(Runnable r) {
  23. Thread t = new Thread(r, "my-thread-" + mThreadNum.getAndIncrement());
  24. System.out.println(t.getName() + " has been created");
  25. return t;
  26. }
  27. }
  28. public static class MyIgnorePolicy implements RejectedExecutionHandler {
  29. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  30. doLog(r, e);
  31. }
  32. private void doLog(Runnable r, ThreadPoolExecutor e) {
  33. // 可做日志记录等
  34. System.err.println( r.toString() + " rejected");
  35. // System.out.println("completedTaskCount: " + e.getCompletedTaskCount());
  36. }
  37. }
  38. static class MyTask implements Runnable {
  39. private String name;
  40. public MyTask(String name) {
  41. this.name = name;
  42. }
  43. @Override
  44. public void run() {
  45. try {
  46. System.out.println(this.toString() + " is running!");
  47. Thread.sleep(3000); //让任务执行慢点
  48. } catch (InterruptedException e) {
  49. e.printStackTrace();
  50. }
  51. }
  52. public String getName() {
  53. return name;
  54. }
  55. @Override
  56. public String toString() {
  57. return "MyTask [name=" + name + "]";
  58. }
  59. }
  60. }

输出结果如下:
image.png
其中线程线程1-4先占满了核心线程和最大线程数量,然后4、5线程进入等待队列,7-10线程被直接忽略拒绝执行,等1-4线程中有线程执行完后通知4、5线程继续执行。

总结,通过自定义线程池,我们可以更好的让线程池为我们所用,更加适应我的实际场景。