问题描述
需求信息
最近在工作中有个需求,先在A服务页面增加一条数据,然后去B服务查询此数据的详细信息
解决方案
为了使A服务的新增数据接口快速响应,在查询B服务数据详情的地方使用了线程池异步查询与更新。
问题现象
在验证时发现数据库中的数据字段不全,经分析缺少的都是需要从B服务查询并更新的字段
初步定位
猜测应该是查询B服务时出了一些异常,而由于不规范使用线程池导致异常没有抛出,直接打到了控制台,故A服务的日志系统并看不到错误日志。
问题解决
- 查询B服务数据详情时暂时去掉使用线程池,改为同步调用
- 增加容错定时任务,定时查询需要从B服务获取缺失字段的数据进行更新
问题复盘
A服务线程池的使用
线程池定义
/*** 查询B服务数据详情的线程池*/private static final ExecutorService executorService = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MICROSECONDS,new LinkedBlockingDeque<>(), new ThreadFactoryBuilder().setNameFormat("queryDataDetail-%d").build());
线程池使用
@Override@Transactional(rollbackFor = Exception.class)public void add(String contract) {// 0、检查合同必须不存在checkIfExist(contract);// 1、保存合同数据Entity entity = saveData(contract);// 2、从B服务查询缺失信息executorService.execute(() -> queryDataDetail(entity));}
如此使用有何问题
我重新写了一个测试方法如下:
1、定义一个会一直抛异常的方法
/*** 引入SystemOutRule,监听程序日志输出*/@Rulepublic SystemOutRule systemOutRule = new SystemOutRule().enableLog();/*** 引入SystemOutRule,监听程序日志输出*/private String runWithException() {Thread thread = Thread.currentThread();log.info("thread is {}", thread);log.info("eh={}", thread.getUncaughtExceptionHandler());throw new NicaiException("出错啦!");}
2、使用线程池调用上面的方法
@Testpublic void run() throws InterruptedException {ExecutorService executorService = Executors.newCachedThreadPool();executorService.execute(this::runWithException);TimeUnit.MILLISECONDS.sleep(100L);// 断言程序打印的日志不包含“出错啦!”Assert.assertFalse(systemOutRule.getLog().contains("出错啦!"));}
3、上面的单测断言是成功的,那么异常跑哪里去了?上面的单测在控制台的输出如下:
4、可以看出上面的异常信息是直接输出到了控制台,而不是由程序输出到控制台,主要原因是主程序没有捕获到此异常导致的。(具体原因还没有深入)
如何解决线程池的异常捕获问题
上面的测试可以说明到为什么日志里面查不到错误日志,那么如何捕获线程里的异常呢?
方法1:使用UncaughtExceptionHandler
1、在创建线程池的时候,设置传入的ThreadFactory的UncaughtExceptionHandler属性,此UncaughtExceptionHandler会处理线程中的异常;下面的例子我直接打印了出来异常原因和异常栈。
@Testpublic void runWithUncaughtExceptionHandler() throws InterruptedException {ExecutorService executorService = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setUncaughtExceptionHandler((t, e) -> log.info("UncaughtExceptionHandler caught, error_message={}", e.getMessage(), e)).build());executorService.execute(this::runWithException);TimeUnit.MILLISECONDS.sleep(100L);Assert.assertTrue(systemOutRule.getLog().contains("出错啦!"));}
2、上面的单测运行结果如下:(可以和上面的运行结果进行比对)
3、从上面的运行结果可以看出异常信息是由程序捕获后再输出出来,这样就不会导致查不到异常日志了。
方法2:使用guava扩展的FutureCallback
1、guava对jdk的线程做了一些扩展,其中一个就是FutureCallback,使用方法如下:
@Testpublic void runWithGuavaThreadPool() throws InterruptedException {ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());ListenableFuture<String> listenableFuture = executorService.submit(this::runWithException);Futures.addCallback(listenableFuture, new FutureCallback<String>() {@Overridepublic void onSuccess(String result) {log.info("success! result = {}", result);}@Overridepublic void onFailure(Throwable t) {log.error("guava FutureCallback caught, error_message={}", t.getMessage(), t);}}, executorService);TimeUnit.MILLISECONDS.sleep(100L);Assert.assertTrue(systemOutRule.getLog().contains("出错啦!"));}
问题总结
1、通过上面的测试,优化A服务的线程池定义,使之在遇到异常时能够正常被捕获,能输出,方便问题定位;补偿定时任务也能对第一次查询异常进行容错,保证数据能够同步过来。
/*** 查询B服务数据详情的线程池*/private static final ExecutorService executorService = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MICROSECONDS,new LinkedBlockingDeque<>(), new ThreadFactoryBuilder().setUncaughtExceptionHandler((t, e) -> log.error("查询数据详情的线程池异常,error_message={}", e.getMessage(), e)).setNameFormat("queryDataDetail-%d").build());
2、当然此问题更深层的问题还没有完全解答
- 为什么线程里的异常不会被捕获?
- UncaughtExceptionHandler的运行原理是什么?
- Guava的FutureCallback是如何运行的?
3、测试代码源码地址
package com.nicai.experience.concurrency;import com.google.common.util.concurrent.*;import com.nicai.exception.NicaiException;import lombok.extern.slf4j.Slf4j;import org.junit.Assert;import org.junit.Before;import org.junit.Rule;import org.junit.Test;import org.junit.contrib.java.lang.system.SystemOutRule;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit;@Slf4jpublic class ExceptionThreadTest {@Rulepublic SystemOutRule systemOutRule = new SystemOutRule().enableLog();@Testpublic void run() throws InterruptedException {ExecutorService executorService = Executors.newCachedThreadPool();executorService.execute(this::runWithException);TimeUnit.MILLISECONDS.sleep(100L);Assert.assertFalse(systemOutRule.getLog().contains("出错啦!"));}@Testpublic void runWithUncaughtExceptionHandler() throws InterruptedException {ExecutorService executorService = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setUncaughtExceptionHandler((t, e) -> log.info("UncaughtExceptionHandler caught, error_message={}", e.getMessage(), e)).build());executorService.execute(this::runWithException);TimeUnit.MILLISECONDS.sleep(100L);Assert.assertTrue(systemOutRule.getLog().contains("出错啦!"));}@Testpublic void runWithGuavaThreadPool() throws InterruptedException {ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());ListenableFuture<String> listenableFuture = executorService.submit(this::runWithException);Futures.addCallback(listenableFuture, new FutureCallback<String>() {@Overridepublic void onSuccess(String result) {log.info("success! result = {}", result);}@Overridepublic void onFailure(Throwable t) {log.error("guava FutureCallback caught, error_message={}", t.getMessage(), t);}}, executorService);TimeUnit.MILLISECONDS.sleep(100L);Assert.assertTrue(systemOutRule.getLog().contains("出错啦!"));}private String runWithException() {Thread thread = Thread.currentThread();log.info("thread is {}", thread);log.info("eh={}", thread.getUncaughtExceptionHandler());throw new NicaiException("出错啦!");}@Beforepublic void cleanLog() {systemOutRule.clearLog();}}
