使用线程池的好处

  1. 降低资源消耗:通过重复利用已创建的线程降低线程创建和销毁所造成的的资源消耗。
  2. 提高响应速度:当任务到达时,可以不需要等到线程创建就立即执行。
  3. 提高线程的可管控性:线程是稀缺资源,不能不限复制,不仅会消耗系统资源,还会降低系统稳定性。使用线程池可以进行统一的分配,调优和监控。
  4. 线程池还有助于this逃避,this逃避指在构造函数返回之前其他线程就持有该对象的引用。调用尚未构造完全的对象方法。

    线程池原理

    预先启动一些线程,线程无限循环从队列中获取一个任务进行执行,直到线程池被关闭。如果某个线程因为执行任务而发生异常并终止,那么重新创建一个新的线程,如此反复。

    线程池的五种状态

  • RUNNING(运行中):线程池处在RUNNING状态时,能够接收新任务,以及对已添加的任务进行处理。线程池的初始化状态是RUNNING。
  • SHUTDOWN(关掉):调用线程池的shutdown()接口时,线程池由RUNNING -> SHUTDOWN。处在SHUTDOWN状态时,不接收新任务,但能处理已添加的任务。
  • STOP(停止):调用线程池的shutdownNow()接口时,线程池由(RUNNING or SHUTDOWN ) -> STOP。处在STOP状态时,不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。
  • tidying:当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中执行的任务也为空时,就会由 SHUTDOWN -> TIDYING。 当线程池在STOP状态下,线程池中执行的任务为空时,就会由STOP -> TIDYING
  • terminated(终止):线程池彻底终止,就变成terminated状态。 线程池处在tidying状态时,执行完terminated()之后,就会由 tidying -> terminated

Executor框架使用

format.png

  1. 在主线程创建Runnable和Callable任务
  2. 将实现接口的对象实例交给ExecutorService,使用ExecutorService.submit()或ExecutorService.execute()。
  3. 如果使用了ExecutorService.submit(),则会返回一个实现Future接口的对象。
  4. 最后主线程可以执行FutureTask.get()来等待任务执行完成。也可以使用FutureTask.cancel(boolean mayInterruptIfRunning)来取消任务执行。

    Java提供的四种线程池

  5. FixedThreadPool:返回一个固定线程数量的线程池,数量为构造函数参数所指定,线程数量始终不变,且任务队列长度为Integer.MAX_VALUE。

  6. SingleThreadExecutor:只有一个线程的线程池,任务队列长度为Integer.MAX_VALUE,严格按照先进先出的顺序。
  7. CachedThreadPool:允许创建线程数量为Integer.MAX_VALUE,若实际任务数量超过线程数量则创建线程,无任务队列。
  8. ScheduledThreadPool 创建一个线程池,支持定时及周期性任务执行。

    TheadPoolExecutor

  1. /**
  2. * 用给定的初始参数创建一个新的ThreadPoolExecutor。
  3. */
  4. public ThreadPoolExecutor(int corePoolSize,//线程池的核心线程数量
  5. int maximumPoolSize,//线程池的最大线程数
  6. long keepAliveTime,//当线程数大于核心线程数时,多余的空闲线程存活的最长时间
  7. TimeUnit unit,//时间单位
  8. BlockingQueue<Runnable> workQueue,//任务队列,用来储存等待执行任务的队列
  9. ThreadFactory threadFactory,//线程工厂,用来创建线程,一般默认即可
  10. RejectedExecutionHandler handler//拒绝策略,当提交的任务过多而不能及时处理时,我们可以定制策略来处理任务
  11. ) {
  12. if (corePoolSize < 0 ||
  13. maximumPoolSize <= 0 ||
  14. maximumPoolSize < corePoolSize ||
  15. keepAliveTime < 0)
  16. throw new IllegalArgumentException();
  17. if (workQueue == null || threadFactory == null || handler == null)
  18. throw new NullPointerException();
  19. this.corePoolSize = corePoolSize;
  20. this.maximumPoolSize = maximumPoolSize;
  21. this.workQueue = workQueue;
  22. this.keepAliveTime = unit.toNanos(keepAliveTime);
  23. this.threadFactory = threadFactory;
  24. this.handler = handler;
  25. }

重要参数:

  1. corePoolSize:核心线程数定义了最小可以同时运行的线程数量。
  2. maximum:当队列中的任务达到队列容量时,当前可以运行的线程数变为最大线程可运行数(即该参数。
  3. workQueue:先判断当前线程数量是否达到核心线程数,若已达到则将任务放入workQueue。

image.png

  • SynchronousQueue:每个插入操作必须等待另一个线程的相应移除操作的队列,该队列是不能存储元素的,在线程池中表现为直接创建一个线程去执行,CachedThreadPool使用该队列。
  • LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,FixedThreadPool和SingleThreadExcutor使用的是该队列。
  • ArrayBlockingQueue:有界队列。
  • PriorityBlockingQueue:一个具有优先级得无限阻塞队列

ThreadPoolExecutor其他常见参数:

  1. keepAliveTime:当线程池中的线程数量大于 corePoolSize 的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了 keepAliveTime才会被回收销毁;
  2. unit : keepAliveTime 参数的时间单位。
  3. threadFactory :executor 创建新线程的时候会用到。
  4. handler :饱和策略。
    1. abort Policy:终止策略,抛出一个RejectedExecutionException异常
    2. CallerRunsPolicy:交给调用者所在线程进行处理
    3. DiscardOldestPolicy:把队列里head丢弃,再执行这个task
    4. DiscardPolicy:直接抛弃,不执行

另外《阿里巴巴 Java 开发手册》中强制线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 构造函数的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险

Executors 返回线程池对象的弊端如下:

  • FixedThreadPoolSingleThreadExecutor : 允许请求的队列长度为 Integer.MAX_VALUE,可能堆积大量的请求,从而导致 OOM。
  • CachedThreadPool 和 ScheduledThreadPool : 允许创建的线程数量为 Integer.MAX_VALUE ,可能会创建大量线程,从而导致 OOM。·

ThreadPoolExecutor使用范例:

  1. import java.util.Date;
  2. /**
  3. * 这是一个简单的Runnable类,需要大约5秒钟来执行其任务。
  4. * @author shuang.kou
  5. */
  6. public class MyRunnable implements Runnable {
  7. private String command;
  8. public MyRunnable(String s) {
  9. this.command = s;
  10. }
  11. @Override
  12. public void run() {
  13. System.out.println(Thread.currentThread().getName() + " Start. Time = " + new Date());
  14. processCommand();
  15. System.out.println(Thread.currentThread().getName() + " End. Time = " + new Date());
  16. }
  17. private void processCommand() {
  18. try {
  19. Thread.sleep(5000);
  20. } catch (InterruptedException e) {
  21. e.printStackTrace();
  22. }
  23. }
  24. @Override
  25. public String toString() {
  26. return this.command;
  27. }
  28. }
  1. import java.util.concurrent.ArrayBlockingQueue;
  2. import java.util.concurrent.ThreadPoolExecutor;
  3. import java.util.concurrent.TimeUnit;
  4. public class ThreadPoolExecutorDemo {
  5. private static final int CORE_POOL_SIZE = 5;
  6. private static final int MAX_POOL_SIZE = 10;
  7. private static final int QUEUE_CAPACITY = 100;
  8. private static final Long KEEP_ALIVE_TIME = 1L;
  9. public static void main(String[] args) {
  10. //使用阿里巴巴推荐的创建线程池的方式
  11. //通过ThreadPoolExecutor构造函数自定义参数创建
  12. ThreadPoolExecutor executor = new ThreadPoolExecutor(
  13. CORE_POOL_SIZE,
  14. MAX_POOL_SIZE,
  15. KEEP_ALIVE_TIME,
  16. TimeUnit.SECONDS,
  17. new ArrayBlockingQueue<>(QUEUE_CAPACITY),
  18. new ThreadPoolExecutor.CallerRunsPolicy());
  19. for (int i = 0; i < 10; i++) {
  20. //创建WorkerThread对象(WorkerThread类实现了Runnable 接口)
  21. Runnable worker = new MyRunnable("" + i);
  22. //执行Runnable
  23. executor.execute(worker);
  24. }
  25. //终止线程池
  26. executor.shutdown();
  27. while (!executor.isTerminated()) {
  28. }
  29. System.out.println("Finished all threads");
  30. }
  31. }

线程池处理流程

  1. 判断线程池的核心线程是否都在执行任务,如果不是则创建新的工作线程直到核心线程都在工作。
  2. 若核心线程都在工作,则判断工作队列是否已满,若没有满则将新提交的任务存储在工作队列中,直到工作队列存储满工作
  3. 若工作队列已满,则判断线程池的数量是否达到最大线程数量,如果没有则创建一个新的工作线程来执行任务。如果达到了最大线程数量,则执行饱和策略来处理这个任务。

    线程池状态

    ThreadPoolExecutor类中定义了一个volatile变量runState来表示线程池的状态,线程池有四种状态,分别是RUNNING、SHUTDOWN、STOP、TERMINATED

  4. 线程池创建后处于RUNNING状态

  5. 对线程池调用shutdown后处于shutdown状态,线程不能接受新的任务,会等待缓存队列的任务完成。
  6. 对线程池调用shutdownNow后处于STOP状态,线程不能接受新的任务,并尝试终止正在执行的任务。
  7. 当线程池处于SHUTDOWN、STOP,且工作线程已销毁,队列已清空或执行结束后,线程被设置为TERMINATED状态。

    execute和submit的区别

  8. execute()用于提交不需要返回值的任务,无法得知任务是否执行成功与否。

  9. submit()方法用于提交有返回值的任务,线程池会返回一个Future类型的对象。通过future来判断成功与否。

注意:可以使用Future的get()方法获取返回值,get()会阻塞当前进程直至完成,而使用get(long timeout,TimeUnit unit)会阻塞当前线程一段时间后立即返回,即使任务没有执行完。

shutdown() VS shutdownNow()

  • shutdown() :关闭线程池,线程池的状态变为 SHUTDOWN。线程池不再接受新任务了,但是队列里的任务得执行完毕。
  • shutdownNow() :关闭线程池,线程的状态变为 STOP。线程池会终止当前正在运行的任务,并停止处理排队的任务并返回正在等待执行的 List。

isTerminated() VS isShutdown()

  • isShutDown 当调用 shutdown() 方法后返回为 true。
  • isTerminated 当调用 shutdown() 方法后,并且所有提交的任务完成后返回为 true

线程池大小

首先要理解上下文切换:

当前任务在执行完 CPU 时间片切换到另一个任务之前会先保存自己的状态,以便下次再切换回这个任务时,可以再加载这个任务的状态。任务从保存到再加载的过程就是一次上下文切换

对于线程池设置的过小:

如果一段时间内有大量的请求和任务要处理,线程池过小会导致大量任务队列中的任务等待,出现队列已满之后任务无法处理的现象或者出现OOM(out of memory),CPU根本没有得到充分的利用。

对于线程池设置的过大:

可能造成多个线程争抢CPU资源,这样会导致大量的上下文切换,从而增加线程的执行时间,影响了整体执行效率。

有一个简单并且适用面比较广的公式:

  • CPU 密集型任务(N+1): 这种任务消耗的主要是 CPU 资源,可以将线程数设置为 N(CPU 核心数)+1,比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。
  • I/O 密集型任务(2N): 这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用。因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 2N。

如何判断是 CPU 密集任务还是 IO 密集任务?

CPU 密集型简单理解就是利用 CPU 计算能力的任务比如你在内存中对大量数据进行排序。单凡涉及到网络读取,文件读取这类都是 IO 密集型,这类任务的特点是 CPU 计算耗费时间相比于等待 IO 操作完成的时间来说很少,大部分时间都花在了等待 IO 操作完成上。