一、使用线程池的目的

  • 减少系统维护线程的开销
  • 解耦,把运行和创建分开处理
  • 线程可以复用

二、线程池的使用

接口 Executor

执行已提交的 Runnable 任务的对象。此接口提供一种将任务提交与每个任务将如何运行的机制(包括线程使用的细节、调度等)分离开来的方法。通常使用 Executor 而不是显式地创建线程。例如,可能会使用以下方法,而不是为一组任务中的每个任务调用 new Thread(new(RunnableTask())).start()

  • execute(Runnable command)**:在未来某个时间执行给定的命令。**

**

接口 ExecutorService

继承了 Executor 接口,并增加了一些管理线程以及线程池的一些方法

image.png

类 Executors(静态类)

此包中所定义的 ExecutorExecutorServiceScheduledExecutorServiceThreadFactoryCallable 类的工厂和实用方法。此类支持以下各种方法:

  • 创建并返回设置有常用配置字符串的 ExecutorService 的方法。
  • 创建并返回设置有常用配置字符串的 ScheduledExecutorService 的方法。
  • 创建并返回“包装的”ExecutorService 方法,它通过使特定于实现的方法不可访问来禁用重新配置。
  • 创建并返回 ThreadFactory 的方法,它可将新创建的线程设置为已知的状态。
  • 创建并返回非闭包形式的 Callable 的方法,这样可将其用于需要 Callable 的执行方法中。

1、线程池的分类

a、创建固定大小的线程池

底层队列:LinkedBlockingQueue,无界(Integer.MAX_VALUE)队列

  1. public static ExecutorService newFixedThreadPool(int nThreads)

创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程。在任意点,在大多数 nThreads 线程会处于处理任务的活动状态。如果在所有线程处于活动状态时提交附加任务,则在有可用线程之前,附加任务将在队列中等待。如果在关闭前的执行期间由于失败而导致任何线程终止,那么一个新线程将代替它执行后续的任务(如果需要)。在某个线程被显式地关闭之前,池中的线程将一直存在。

  1. public class FixedPoolDemo {
  2. public static void main(String[] args) {
  3. // 固定大小的线程池
  4. ExecutorService pool = Executors.newFixedThreadPool(5);
  5. // 创建 10 个任务给线程池
  6. for (int i = 0; i < 10; i++) {
  7. // 把任务交给线程池执行
  8. pool.execute(new Task());
  9. }
  10. // 线程池关闭后,后面的代码就无法使用线程池了
  11. // 不再接受新的任务,等待已执行的任务完成
  12. pool.shutdown();
  13. // 立即关闭线程池,也会尝试中断正在执行的线程,返回等待执行的线程任务列表
  14. List<Runnable> waitingForExecute = pool.shutdownNow();
  15. System.out.println("waitingForExecute = " + waitingForExecute.size());
  16. for (int i = 0; i < 10; i++) {
  17. // 把任务交给线程池执行
  18. pool.execute(new Task());
  19. }
  20. }
  21. }

b、创建可变大小的线程池(线程复用)

底层队列:SynchronousQueue,只保存一个任务

  1. public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory)

创建一个可根据需要创建新线程的线程池,但是在以前构造的线程可用时将复用它们。对于执行很多短期异步任务的程序而言,这些线程池通常可提高程序性能。调用 execute 将复用以前构造的线程(如果线程可用)。如果现有线程没有可用的,则创建一个新线程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。因此,长时间保持空闲的线程池不会使用任何资源。注意,可以使用 ThreadPoolExecutor 构造方法创建具有类似属性但细节不同(例如超时参数)的线程池。

  1. public class CachedPoolDemo {
  2. public static void main(String[] args) {
  3. // 可变大小的线程池
  4. ExecutorService pool = Executors.newCachedThreadPool();
  5. // 创建 10 个任务给线程池
  6. for (int i = 0; i < 10; i++) {
  7. // 把任务交给线程池执行
  8. pool.execute(new Task());
  9. }
  10. pool.shutdown();
  11. }
  12. }

c、创建单一线程的线程池

底层队列:LinkedBlockingQueue,无界(Integer.MAX_VALUE)队列

  1. public static ExecutorService newSingleThreadExecutor()

创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程。(注意,如果因为在关闭前的执行期间出现失败而终止了此单个线程,那么如果需要,一个新线程将代替它执行后续的任务)。可保证顺序地执行各个任务,并且在任意给定的时间不会有多个线程是活动的。与其他等效的 newFixedThreadPool(1) 不同,可保证无需重新配置此方法所返回的执行程序即可使用其他的线程。

  1. public class SingleThreadPoolDemo {
  2. public static void main(String[] args) {
  3. // 单一线程的线程池
  4. ExecutorService pool = Executors.newSingleThreadExecutor();
  5. // 创建 10 个任务给线程池
  6. for (int i = 0; i < 10; i++) {
  7. // 把任务交给线程池执行
  8. pool.execute(new Task());
  9. }
  10. pool.shutdown();
  11. }
  12. }

d、创建可调度的线程池

  1. public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

创建一个线程池,它可安排在给定延迟后运行命令或者定期地执行

  1. public class ScheduledPoolDemo {
  2. public static void main(String[] args) {
  3. // 可调度的线程池
  4. ExecutorService pool = Executors.newScheduledThreadPool(5);
  5. // 创建 10 个任务给线程池
  6. for (int i = 0; i < 10; i++) {
  7. // 把任务交给线程池执行
  8. pool.execute(new Task());
  9. }
  10. pool.shutdown();
  11. }
  12. }

三、线程池的原理

image.png

1、构造方法

固定大小的线程池

  1. ExecutorService pool = Executors.newFixedThreadPool(5);
  2. public static ExecutorService newFixedThreadPool(int nThreads) {
  3. return new ThreadPoolExecutor(nThreads, nThreads,
  4. 0L, TimeUnit.MILLISECONDS,
  5. new LinkedBlockingQueue<Runnable>());
  6. }

可变大小的线程池

  1. ExecutorService pool = Executors.newCachedThreadPool();
  2. public static ExecutorService newCachedThreadPool() {
  3. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
  4. 60L, TimeUnit.SECONDS,
  5. new SynchronousQueue<Runnable>());
  6. }

单一线程的线程池

  1. ExecutorService pool = Executors.newSingleThreadExecutor();
  2. public static ExecutorService newSingleThreadExecutor() {
  3. return new FinalizableDelegatedExecutorService
  4. (new ThreadPoolExecutor(1, 1,
  5. 0L, TimeUnit.MILLISECONDS,
  6. new LinkedBlockingQueue<Runnable>()));
  7. }

可调度的线程池

  1. ExecutorService pool = Executors.newScheduledThreadPool(5);
  2. public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
  3. return new ScheduledThreadPoolExecutor(corePoolSize);
  4. }
  5. public ScheduledThreadPoolExecutor(int corePoolSize) {
  6. super(corePoolSize, Integer.MAX_VALUE,
  7. 0, NANOSECONDS,
  8. new DelayedWorkQueue());
  9. }

2、核心参数

  1. public ThreadPoolExecutor(int corePoolSize,
  2. int maximumPoolSize,
  3. long keepAliveTime,
  4. TimeUnit unit,
  5. BlockingQueue<Runnable> workQueue) {
  6. this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
  7. Executors.defaultThreadFactory(), defaultHandler);
  8. }
  • int corePoolSize :创建线程池时初始的核心线程数量,永远不会被回收
  • int maximumPoolSize :最大线程数量
  • long keepAliveTime :线程可保持活动的时间
  • TimeUnit unit :活动时间的单位(毫秒、秒)
  • BlockingQueue workQueue :任务阻塞队列
  • defaultHandler :拒绝策略


3、线程池的五种状态

image.png

  • private static final int COUNT_BITS = Integer.SIZE - 3;
    • 用 32 - 3 = 29 位,来表示线程数量
  • private static final int CAPACITY = (1 << COUNT_BITS) - 1;


  • private static final int RUNNING = -1 << COUNT_BITS;
    • 说明:线程池一旦创建,就处于 RUNNING 状态
    • 切换:线程池一旦创建,就处于 RUNNING 状态
  • private static final int SHUTDOWN = 0 << COUNT_BITS;
    • 说明:关闭线程池,不再接受新任务,但是队列中的任务等待其执行完成
    • 切换:线程池执行 shutdown 方法
  • private static final int STOP = 1 << COUNT_BITS;
    • 说明:不接受新任务,也不执行队列中的任务
    • 切换:线程池执行 shutdownNow 方法
  • private static final int TIDYING = 2 << COUNT_BITS;
    • 说明:TIDYING 状态时,会执行钩子函数 terminated()。terminated() 在 ThreadPoolExecutor 类中是空的,若用户想在线程池变为 TIDYING 时,进行相应的处理;可以通过重载 terminated() 函数来实现。
    • 切换:调用 shutdown 或 shutdownNow 后,线程都已经执行完毕或停止
  • private static final int TERMINATED = 3 << COUNT_BITS;
    • 说明:终止状态,表示线程池的生命周期就结束了
    • 切换:线程池处在 TIDYING 状态时,执行完 terminated() 之后


4、Execute 方法


  1. public void execute(Runnable command) {
  2. if (command == null)
  3. throw new NullPointerException();
  4. /*
  5. * Proceed in 3 steps:
  6. *
  7. * 1. If fewer than corePoolSize threads are running, try to
  8. * start a new thread with the given command as its first
  9. * task. The call to addWorker atomically checks runState and
  10. * workerCount, and so prevents false alarms that would add
  11. * threads when it shouldn't, by returning false.
  12. *
  13. * 2. If a task can be successfully queued, then we still need
  14. * to double-check whether we should have added a thread
  15. * (because existing ones died since last checking) or that
  16. * the pool shut down since entry into this method. So we
  17. * recheck state and if necessary roll back the enqueuing if
  18. * stopped, or start a new thread if there are none.
  19. *
  20. * 3. If we cannot queue task, then we try to add a new
  21. * thread. If it fails, we know we are shut down or saturated
  22. * and so reject the task.
  23. */
  24. int c = ctl.get(); // 获取线程池的状态码
  25. if (workerCountOf(c) < corePoolSize) { // 如果工作线程数量小于核心线程数量
  26. if (addWorker(command, true)) // 添加到工作线程中去执行
  27. return;
  28. c = ctl.get();
  29. }
  30. // 如果是运行状态,写入阻塞队列中
  31. if (isRunning(c) && workQueue.offer(command)) {
  32. int recheck = ctl.get(); // 再次获取状态值
  33. // 如果不是运行状态,就从队列中移除任务
  34. if (!isRunning(recheck) && remove(command)) // DCL 双重检测机制
  35. reject(command); // 根据策略拒绝任务
  36. else if (workerCountOf(recheck) == 0)
  37. addWorker(null, false);
  38. }
  39. // 尝试新建一个线程,如果失败了,就执行拒绝策略
  40. else if (!addWorker(command, false))
  41. reject(command); // 根据策略拒绝任务
  42. }

5、线程池关闭

  1. pool.shutdown();
  2. // 空循环检测线程池是否完全关闭
  3. while (!pool.isTerminated()) {
  4. TimeUnit.MICROSECONDS.sleep(500);
  5. }
  6. // 执行其他业务逻辑 ...