问题描述

需求信息

最近在工作中有个需求,先在A服务页面增加一条数据,然后去B服务查询此数据的详细信息

解决方案

为了使A服务的新增数据接口快速响应,在查询B服务数据详情的地方使用了线程池异步查询与更新。

问题现象

在验证时发现数据库中的数据字段不全,经分析缺少的都是需要从B服务查询并更新的字段

初步定位

猜测应该是查询B服务时出了一些异常,而由于不规范使用线程池导致异常没有抛出,直接打到了控制台,故A服务的日志系统并看不到错误日志。

问题解决

  1. 查询B服务数据详情时暂时去掉使用线程池,改为同步调用
  2. 增加容错定时任务,定时查询需要从B服务获取缺失字段的数据进行更新

上线后观察,新增的数据不再有部分字段缺失的情况;问题解决。

问题复盘

A服务线程池的使用

线程池定义

  1. /**
  2. * 查询B服务数据详情的线程池
  3. */
  4. private static final ExecutorService executorService = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MICROSECONDS,
  5. new LinkedBlockingDeque<>(), new ThreadFactoryBuilder().setNameFormat("queryDataDetail-%d").build());

线程池使用

  1. @Override
  2. @Transactional(rollbackFor = Exception.class)
  3. public void add(String contract) {
  4. // 0、检查合同必须不存在
  5. checkIfExist(contract);
  6. // 1、保存合同数据
  7. Entity entity = saveData(contract);
  8. // 2、从B服务查询缺失信息
  9. executorService.execute(() -> queryDataDetail(entity));
  10. }

如此使用有何问题

我重新写了一个测试方法如下:
1、定义一个会一直抛异常的方法

  1. /**
  2. * 引入SystemOutRule,监听程序日志输出
  3. */
  4. @Rule
  5. public SystemOutRule systemOutRule = new SystemOutRule().enableLog();
  6. /**
  7. * 引入SystemOutRule,监听程序日志输出
  8. */
  9. private String runWithException() {
  10. Thread thread = Thread.currentThread();
  11. log.info("thread is {}", thread);
  12. log.info("eh={}", thread.getUncaughtExceptionHandler());
  13. throw new NicaiException("出错啦!");
  14. }

2、使用线程池调用上面的方法

  1. @Test
  2. public void run() throws InterruptedException {
  3. ExecutorService executorService = Executors.newCachedThreadPool();
  4. executorService.execute(this::runWithException);
  5. TimeUnit.MILLISECONDS.sleep(100L);
  6. // 断言程序打印的日志不包含“出错啦!”
  7. Assert.assertFalse(systemOutRule.getLog().contains("出错啦!"));
  8. }

3、上面的单测断言是成功的,那么异常跑哪里去了?上面的单测在控制台的输出如下:
线程池无法捕获线程的异常踩坑复盘 - 图1
4、可以看出上面的异常信息是直接输出到了控制台,而不是由程序输出到控制台,主要原因是主程序没有捕获到此异常导致的。(具体原因还没有深入)

如何解决线程池的异常捕获问题

上面的测试可以说明到为什么日志里面查不到错误日志,那么如何捕获线程里的异常呢?

方法1:使用UncaughtExceptionHandler

1、在创建线程池的时候,设置传入的ThreadFactory的UncaughtExceptionHandler属性,此UncaughtExceptionHandler会处理线程中的异常;下面的例子我直接打印了出来异常原因和异常栈。

  1. @Test
  2. public void runWithUncaughtExceptionHandler() throws InterruptedException {
  3. ExecutorService executorService = Executors.newCachedThreadPool(
  4. new ThreadFactoryBuilder()
  5. .setUncaughtExceptionHandler((t, e) -> log.info("UncaughtExceptionHandler caught, error_message={}", e.getMessage(), e))
  6. .build());
  7. executorService.execute(this::runWithException);
  8. TimeUnit.MILLISECONDS.sleep(100L);
  9. Assert.assertTrue(systemOutRule.getLog().contains("出错啦!"));
  10. }

2、上面的单测运行结果如下:(可以和上面的运行结果进行比对)
线程池无法捕获线程的异常踩坑复盘 - 图2
3、从上面的运行结果可以看出异常信息是由程序捕获后再输出出来,这样就不会导致查不到异常日志了。

方法2:使用guava扩展的FutureCallback

1、guava对jdk的线程做了一些扩展,其中一个就是FutureCallback,使用方法如下:

  1. @Test
  2. public void runWithGuavaThreadPool() throws InterruptedException {
  3. ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
  4. ListenableFuture<String> listenableFuture = executorService.submit(this::runWithException);
  5. Futures.addCallback(listenableFuture, new FutureCallback<String>() {
  6. @Override
  7. public void onSuccess(String result) {
  8. log.info("success! result = {}", result);
  9. }
  10. @Override
  11. public void onFailure(Throwable t) {
  12. log.error("guava FutureCallback caught, error_message={}", t.getMessage(), t);
  13. }
  14. }, executorService);
  15. TimeUnit.MILLISECONDS.sleep(100L);
  16. Assert.assertTrue(systemOutRule.getLog().contains("出错啦!"));
  17. }

2、上面的单测运行结果如下:
线程池无法捕获线程的异常踩坑复盘 - 图3

问题总结

1、通过上面的测试,优化A服务的线程池定义,使之在遇到异常时能够正常被捕获,能输出,方便问题定位;补偿定时任务也能对第一次查询异常进行容错,保证数据能够同步过来。

  1. /**
  2. * 查询B服务数据详情的线程池
  3. */
  4. private static final ExecutorService executorService = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MICROSECONDS,
  5. new LinkedBlockingDeque<>(), new ThreadFactoryBuilder()
  6. .setUncaughtExceptionHandler((t, e) -> log.error("查询数据详情的线程池异常,error_message={}", e.getMessage(), e))
  7. .setNameFormat("queryDataDetail-%d").build());

2、当然此问题更深层的问题还没有完全解答

  • 为什么线程里的异常不会被捕获?
  • UncaughtExceptionHandler的运行原理是什么?
  • Guava的FutureCallback是如何运行的?

3、测试代码源码地址

  1. package com.nicai.experience.concurrency;
  2. import com.google.common.util.concurrent.*;
  3. import com.nicai.exception.NicaiException;
  4. import lombok.extern.slf4j.Slf4j;
  5. import org.junit.Assert;
  6. import org.junit.Before;
  7. import org.junit.Rule;
  8. import org.junit.Test;
  9. import org.junit.contrib.java.lang.system.SystemOutRule;
  10. import java.util.concurrent.ExecutorService;
  11. import java.util.concurrent.Executors;
  12. import java.util.concurrent.TimeUnit;
  13. @Slf4j
  14. public class ExceptionThreadTest {
  15. @Rule
  16. public SystemOutRule systemOutRule = new SystemOutRule().enableLog();
  17. @Test
  18. public void run() throws InterruptedException {
  19. ExecutorService executorService = Executors.newCachedThreadPool();
  20. executorService.execute(this::runWithException);
  21. TimeUnit.MILLISECONDS.sleep(100L);
  22. Assert.assertFalse(systemOutRule.getLog().contains("出错啦!"));
  23. }
  24. @Test
  25. public void runWithUncaughtExceptionHandler() throws InterruptedException {
  26. ExecutorService executorService = Executors.newCachedThreadPool(
  27. new ThreadFactoryBuilder()
  28. .setUncaughtExceptionHandler((t, e) -> log.info("UncaughtExceptionHandler caught, error_message={}", e.getMessage(), e))
  29. .build());
  30. executorService.execute(this::runWithException);
  31. TimeUnit.MILLISECONDS.sleep(100L);
  32. Assert.assertTrue(systemOutRule.getLog().contains("出错啦!"));
  33. }
  34. @Test
  35. public void runWithGuavaThreadPool() throws InterruptedException {
  36. ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
  37. ListenableFuture<String> listenableFuture = executorService.submit(this::runWithException);
  38. Futures.addCallback(listenableFuture, new FutureCallback<String>() {
  39. @Override
  40. public void onSuccess(String result) {
  41. log.info("success! result = {}", result);
  42. }
  43. @Override
  44. public void onFailure(Throwable t) {
  45. log.error("guava FutureCallback caught, error_message={}", t.getMessage(), t);
  46. }
  47. }, executorService);
  48. TimeUnit.MILLISECONDS.sleep(100L);
  49. Assert.assertTrue(systemOutRule.getLog().contains("出错啦!"));
  50. }
  51. private String runWithException() {
  52. Thread thread = Thread.currentThread();
  53. log.info("thread is {}", thread);
  54. log.info("eh={}", thread.getUncaughtExceptionHandler());
  55. throw new NicaiException("出错啦!");
  56. }
  57. @Before
  58. public void cleanLog() {
  59. systemOutRule.clearLog();
  60. }
  61. }