两阶段终止模式:如何优雅地终止线程?

优雅地终止线程, 不是自己终止自己,而是在一个线程 T1 中,终止线程 T2;这里所谓的“优雅”,指的是给 T2 一个机会料理后事,而不是被一剑封喉。

如何理解两阶段终止模式

前辈们经过认真对比分析,已经总结出了一套成熟的方案,叫做两阶段终止模式。顾名思义,就是将终止过程分成两个阶段,其中第一个阶段主要是线程 T1 向线程 T2发送终止指令,而第二阶段则是线程 T2响应终止指令。
并发设计模式(三) - 图1
Java 线程的状态转换图,如下图所示。

并发设计模式(三) - 图2
从这个图里你会发现,Java 线程进入终止状态的前提是线程进入 RUNNABLE 状态,而实际上线程也可能处在休眠状态,也就是说,我们要想终止一个线程,首先要把线程的状态从休眠状态转换到 RUNNABLE 状态。如何做到呢?这个要靠 Java Thread 类提供的 interrupt() 方法,它可以将休眠状态的线程转换到 RUNNABLE 状态。

线程转换到 RUNNABLE 状态之后,我们如何再将其终止呢?RUNNABLE 状态转换到终止状态,优雅的方式是让 Java 线程自己执行完 run() 方法,所以一般我们采用的方法是设置一个标志位,然后线程会在合适的时机检查这个标志位,如果发现符合终止条件,则自动退出 run() 方法。这个过程其实就是我们前面提到的第二阶段:响应终止指令。

综合上面这两点,我们能总结出终止指令,其实包括两方面内容:interrupt() 方法和线程终止的标志位。

用两阶段终止模式终止监控操作

实际工作中,有些监控系统需要动态地采集一些数据,一般都是监控系统发送采集指令给被监控系统的监控代理,监控代理接收到指令之后,从监控目标收集数据,然后回传给监控系统,详细过程如下图所示。出于对性能的考虑(有些监控项对系统性能影响很大,所以不能一直持续监控),动态采集功能一般都会有终止操作。

并发设计模式(三) - 图3
简化代码:

  1. class Proxy {
  2. boolean started = false;
  3. //采集线程
  4. Thread rptThread;
  5. //启动采集功能
  6. synchronized void start(){
  7. //不允许同时启动多个采集线程
  8. if (started) {
  9. return;
  10. }
  11. started = true;
  12. rptThread = new Thread(()->{
  13. while (true) {
  14. //省略采集、回传实现
  15. report();
  16. //每隔两秒钟采集、回传一次数据
  17. try {
  18. Thread.sleep(2000);
  19. } catch (InterruptedException e) {
  20. }
  21. }
  22. //执行到此处说明线程马上终止
  23. started = false;
  24. });
  25. rptThread.start();
  26. }
  27. //终止采集功能
  28. synchronized void stop(){
  29. //如何实现?
  30. }
  31. }

按照两阶段终止模式,我们首先需要做的就是将线程 rptThread 状态转换到 RUNNABLE,做法很简单,只需要在调用 rptThread.interrupt() 就可以了。线程 rptThread 的状态转换到 RUNNABLE 之后,如何优雅地终止呢?下面的示例代码中,我们选择的标志位是线程的中断状态:Thread.currentThread().isInterrupted() ,需要注意的是,我们在捕获 Thread.sleep() 的中断异常之后,通过 Thread.currentThread().interrupt() 重新设置了线程的中断状态,因为 JVM 的异常处理会清除线程的中断状态。

  1. class Proxy {
  2. boolean started = false;
  3. //采集线程
  4. Thread rptThread;
  5. //启动采集功能
  6. synchronized void start(){
  7. //不允许同时启动多个采集线程
  8. if (started) {
  9. return;
  10. }
  11. started = true;
  12. rptThread = new Thread(()->{
  13. while (!Thread.currentThread().isInterrupted()){
  14. //省略采集、回传实现
  15. report();
  16. //每隔两秒钟采集、回传一次数据
  17. try {
  18. Thread.sleep(2000);
  19. } catch (InterruptedException e){
  20. //重新设置线程中断状态
  21. Thread.currentThread().interrupt();
  22. }
  23. }
  24. //执行到此处说明线程马上终止
  25. started = false;
  26. });
  27. rptThread.start();
  28. }
  29. //终止采集功能
  30. synchronized void stop(){
  31. rptThread.interrupt();
  32. }
  33. }

上面的示例代码的确能够解决当前的问题,但是建议你在实际工作中谨慎使用。原因在于我们很可能在线程的 run() 方法中调用第三方类库提供的方法,而我们没有办法保证第三方类库正确处理了线程的中断异常,例如第三方类库在捕获到 Thread.sleep() 方法抛出的中断异常后,没有重新设置线程的中断状态,那么就会导致线程不能够正常终止。所以强烈建议你设置自己的线程终止标志位,例如在下面的代码中,使用 isTerminated 作为线程终止标志位,此时无论是否正确处理了线程的中断异常,都不会影响线程优雅地终止。

  1. class Proxy {
  2. //线程终止标志位
  3. volatile boolean terminated = false;
  4. boolean started = false;
  5. //采集线程
  6. Thread rptThread;
  7. //启动采集功能
  8. synchronized void start(){
  9. //不允许同时启动多个采集线程
  10. if (started) {
  11. return;
  12. }
  13. started = true;
  14. terminated = false;
  15. rptThread = new Thread(()->{
  16. while (!terminated){
  17. //省略采集、回传实现
  18. report();
  19. //每隔两秒钟采集、回传一次数据
  20. try {
  21. Thread.sleep(2000);
  22. } catch (InterruptedException e){
  23. //重新设置线程中断状态
  24. Thread.currentThread().interrupt();
  25. }
  26. }
  27. //执行到此处说明线程马上终止
  28. started = false;
  29. });
  30. rptThread.start();
  31. }
  32. //终止采集功能
  33. synchronized void stop(){
  34. //设置中断标志位
  35. terminated = true;
  36. //中断线程rptThread
  37. rptThread.interrupt();
  38. }
  39. }

如何优雅地终止线程池

Java 领域用的最多的还是线程池,而不是手动地创建线程。那我们该如何优雅地终止线程池呢?

线程池提供了两个方法:shutdown()和shutdownNow()。这两个方法有什么区别呢?要了解它们的区别,就先需要了解线程池的实现原理。

我们曾经讲过,Java 线程池是生产者 - 消费者模式的一种实现,提交给线程池的任务,首先是进入一个阻塞队列中,之后线程池中的线程从阻塞队列中取出任务执行。

shutdown() 方法是一种很保守的关闭线程池的方法。线程池执行 shutdown() 后,就会拒绝接收新的任务,但是会等待线程池中正在执行的任务和已经进入阻塞队列的任务都执行完之后才最终关闭线程池。

而 shutdownNow() 方法,相对就激进一些了,线程池执行 shutdownNow() 后,会拒绝接收新的任务,同时还会中断线程池中正在执行的任务,已经进入阻塞队列的任务也被剥夺了执行的机会,不过这些被剥夺执行机会的任务会作为 shutdownNow() 方法的返回值返回。因为 shutdownNow() 方法会中断正在执行的线程,所以提交到线程池的任务,如果需要优雅地结束,就需要正确地处理线程中断。

如果提交到线程池的任务不允许取消,那就不能使用 shutdownNow() 方法终止线程池。不过,如果提交到线程池的任务允许后续以补偿的方式重新执行,也是可以使用 shutdownNow() 方法终止线程池的。《Java 并发编程实战》这本书第 7 章《取消与关闭》的“shutdownNow 的局限性”一节中,提到一种将已提交但尚未开始执行的任务以及已经取消的正在执行的任务保存起来,以便后续重新执行的方案,你可以参考一下,方案很简单,这里就不详细介绍了。

其实分析完 shutdown() 和 shutdownNow() 方法你会发现,它们实质上使用的也是两阶段终止模式,只是终止指令的范围不同而已,前者只影响阻塞队列接收任务,后者范围扩大到线程池中所有的任务。

总结
两阶段终止模式是一种应用很广泛的并发设计模式,在 Java 语言中使用两阶段终止模式来优雅地终止线程,需要注意两个关键点:一个是仅检查终止标志位是不够的,因为线程的状态可能处于休眠态;另一个是仅检查线程的中断状态也是不够的,因为我们依赖的第三方类库很可能没有正确处理中断异常。

当你使用 Java 的线程池来管理线程的时候,需要依赖线程池提供的 shutdown() 和 shutdownNow() 方法来终止线程池。不过在使用时需要注意它们的应用场景,尤其是在使用 shutdownNow() 的时候,一定要谨慎。


生产者-消费者模式:用流水线思想提高效率

生产者 - 消费者模式的优点

生产者 - 消费者模式的核心是一个任务队列,生产者线程生产任务,并将任务添加到任务队列中,而消费者线程从任务队列中获取任务并执行。

并发设计模式(三) - 图4
从架构设计的角度来看,生产者 - 消费者模式有一个很重要的优点,就是解耦。解耦对于大型系统的设计非常重要,而解耦的一个关键就是组件之间的依赖关系和通信方式必须受限。在生产者 - 消费者模式中,生产者和消费者没有任何依赖关系,它们彼此之间的通信只能通过任务队列,所以生产者 - 消费者模式是一个不错的解耦方案。

除了架构设计上的优点之外,生产者 - 消费者模式还有一个重要的优点就是支持异步,并且能够平衡生产者和消费者的速度差异。在生产者 - 消费者模式中,生产者线程只需要将任务添加到任务队列而无需等待任务被消费者线程执行完,也就是说任务的生产和消费是异步的,这是与传统的方法之间调用的本质区别,传统的方法之间调用是同步的。

你或许会有这样的疑问,异步化处理最简单的方式就是创建一个新的线程去处理,那中间增加一个“任务队列”究竟有什么用呢?我觉得主要还是用于平衡生产者和消费者的速度差异。我们假设生产者的速率很慢,而消费者的速率很高,比如是 1:3,如果生产者有 3 个线程,采用创建新的线程的方式,那么会创建 3 个子线程,而采用生产者 - 消费者模式,消费线程只需要 1 个就可以了。Java 语言里,Java 线程和操作系统线程是一一对应的,线程创建得太多,会增加上下文切换的成本,所以 Java 线程不是越多越好,适量即可。而生产者 - 消费者模式恰好能支持你用适量的线程。

支持批量执行以提升性能

在《35 | 两阶段终止模式:如何优雅地终止线程?》文章中,我们提到一个监控系统动态采集的案例,其实最终回传的监控数据还是要存入数据库的(如下图)。但被监控系统往往有很多,如果每一条回传数据都直接 INSERT 到数据库,那么这个方案就是上面提到的第一种方案:每个线程 INSERT 一条数据。很显然,更好的方案是批量执行 SQL,那如何实现呢?这就要用到生产者 - 消费者模式了。

并发设计模式(三) - 图5
利用生产者 - 消费者模式实现批量执行 SQL 非常简单:将原来直接 INSERT 数据到数据库的线程作为生产者线程,生产者线程只需将数据添加到任务队列,然后消费者线程负责将任务从任务队列中批量取出并批量执行。

在下面的示例代码中,我们创建了 5 个消费者线程负责批量执行 SQL,这 5 个消费者线程以 while(true){} 循环方式批量地获取任务并批量地执行。需要注意的是,从任务队列中获取批量任务的方法 pollTasks() 中,首先是以阻塞方式获取任务队列中的一条任务,而后则是以非阻塞的方式获取任务;之所以首先采用阻塞方式,是因为如果任务队列中没有任务,这样的方式能够避免无谓的循环。

  1. //任务队列
  2. BlockingQueue<Task> bq=new
  3. LinkedBlockingQueue<>(2000);
  4. //启动5个消费者线程
  5. //执行批量任务
  6. void start() {
  7. ExecutorService es=executors
  8. .newFixedThreadPool(5);
  9. for (int i=0; i<5; i++) {
  10. es.execute(()->{
  11. try {
  12. while (true) {
  13. //获取批量任务
  14. List<Task> ts=pollTasks();
  15. //执行批量任务
  16. execTasks(ts);
  17. }
  18. } catch (Exception e) {
  19. e.printStackTrace();
  20. }
  21. });
  22. }
  23. }
  24. //从任务队列中获取批量任务
  25. List<Task> pollTasks()
  26. throws InterruptedException{
  27. List<Task> ts=new LinkedList<>();
  28. //阻塞式获取一条任务
  29. Task t = bq.take();
  30. while (t != null) {
  31. ts.add(t);
  32. //非阻塞式获取一条任务
  33. t = bq.poll();
  34. }
  35. return ts;
  36. }
  37. //批量执行任务
  38. execTasks(List<Task> ts) {
  39. //省略具体代码无数
  40. }

支持分阶段提交以提升性能

利用生产者 - 消费者模式还可以轻松地支持一种分阶段提交的应用场景。我们知道写文件如果同步刷盘性能会很慢,所以对于不是很重要的数据,我们往往采用异步刷盘的方式。我曾经参与过一个项目,其中的日志组件是自己实现的,采用的就是异步刷盘方式,刷盘的时机是:
1.ERROR 级别的日志需要立即刷盘;
2.数据积累到 500 条需要立即刷盘;
3.存在未刷盘数据,且 5 秒钟内未曾刷盘,需要立即刷盘。

这个日志组件的异步刷盘操作本质上其实就是一种分阶段提交。下面我们具体看看用生产者 - 消费者模式如何实现。在下面的示例代码中,可以通过调用 info()和error() 方法写入日志,这两个方法都是创建了一个日志任务 LogMsg,并添加到阻塞队列中,调用 info()和error() 方法的线程是生产者;而真正将日志写入文件的是消费者线程,在 Logger 这个类中,我们只创建了 1 个消费者线程,在这个消费者线程中,会根据刷盘规则执行刷盘操作,逻辑很简单,这里就不赘述了。

  1. class Logger {
  2. //任务队列
  3. final BlockingQueue<LogMsg> bq
  4. = new BlockingQueue<>();
  5. //flush批量
  6. static final int batchSize=500;
  7. //只需要一个线程写日志
  8. ExecutorService es =
  9. Executors.newFixedThreadPool(1);
  10. //启动写日志线程
  11. void start(){
  12. File file=File.createTempFile(
  13. "foo", ".log");
  14. final FileWriter writer=
  15. new FileWriter(file);
  16. this.es.execute(()->{
  17. try {
  18. //未刷盘日志数量
  19. int curIdx = 0;
  20. long preFT=System.currentTimeMillis();
  21. while (true) {
  22. LogMsg log = bq.poll(
  23. 5, TimeUnit.SECONDS);
  24. //写日志
  25. if (log != null) {
  26. writer.write(log.toString());
  27. ++curIdx;
  28. }
  29. //如果不存在未刷盘数据,则无需刷盘
  30. if (curIdx <= 0) {
  31. continue;
  32. }
  33. //根据规则刷盘
  34. if (log!=null && log.level==LEVEL.ERROR ||
  35. curIdx == batchSize ||
  36. System.currentTimeMillis()-preFT>5000){
  37. writer.flush();
  38. curIdx = 0;
  39. preFT=System.currentTimeMillis();
  40. }
  41. }
  42. }catch(Exception e){
  43. e.printStackTrace();
  44. } finally {
  45. try {
  46. writer.flush();
  47. writer.close();
  48. }catch(IOException e){
  49. e.printStackTrace();
  50. }
  51. }
  52. });
  53. }
  54. //写INFO级别日志
  55. void info(String msg) {
  56. bq.put(new LogMsg(
  57. LEVEL.INFO, msg));
  58. }
  59. //写ERROR级别日志
  60. void error(String msg) {
  61. bq.put(new LogMsg(
  62. LEVEL.ERROR, msg));
  63. }
  64. }
  65. //日志级别
  66. enum LEVEL {
  67. INFO, ERROR
  68. }
  69. class LogMsg {
  70. LEVEL level;
  71. String msg;
  72. //省略构造函数实现
  73. LogMsg(LEVEL lvl, String msg){}
  74. //省略toString()实现
  75. String toString(){}
  76. }

总结
Java 语言提供的线程池本身就是一种生产者 - 消费者模式的实现,但是线程池中的线程每次只能从任务队列中消费一个任务来执行,对于大部分并发场景这种策略都没有问题。但是有些场景还是需要自己来实现,例如需要批量执行以及分阶段提交的场景。