问题描述
需求信息
最近在工作中有个需求,先在A服务页面增加一条数据,然后去B服务查询此数据的详细信息
解决方案
为了使A服务的新增数据接口快速响应,在查询B服务数据详情的地方使用了线程池异步查询与更新。
问题现象
在验证时发现数据库中的数据字段不全,经分析缺少的都是需要从B服务查询并更新的字段
初步定位
猜测应该是查询B服务时出了一些异常,而由于不规范使用线程池导致异常没有抛出,直接打到了控制台,故A服务的日志系统并看不到错误日志。
问题解决
- 查询B服务数据详情时暂时去掉使用线程池,改为同步调用
- 增加容错定时任务,定时查询需要从B服务获取缺失字段的数据进行更新
问题复盘
A服务线程池的使用
线程池定义
/**
* 查询B服务数据详情的线程池
*/
private static final ExecutorService executorService = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MICROSECONDS,
new LinkedBlockingDeque<>(), new ThreadFactoryBuilder().setNameFormat("queryDataDetail-%d").build());
线程池使用
@Override
@Transactional(rollbackFor = Exception.class)
public void add(String contract) {
// 0、检查合同必须不存在
checkIfExist(contract);
// 1、保存合同数据
Entity entity = saveData(contract);
// 2、从B服务查询缺失信息
executorService.execute(() -> queryDataDetail(entity));
}
如此使用有何问题
我重新写了一个测试方法如下:
1、定义一个会一直抛异常的方法
/**
* 引入SystemOutRule,监听程序日志输出
*/
@Rule
public SystemOutRule systemOutRule = new SystemOutRule().enableLog();
/**
* 引入SystemOutRule,监听程序日志输出
*/
private String runWithException() {
Thread thread = Thread.currentThread();
log.info("thread is {}", thread);
log.info("eh={}", thread.getUncaughtExceptionHandler());
throw new NicaiException("出错啦!");
}
2、使用线程池调用上面的方法
@Test
public void run() throws InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(this::runWithException);
TimeUnit.MILLISECONDS.sleep(100L);
// 断言程序打印的日志不包含“出错啦!”
Assert.assertFalse(systemOutRule.getLog().contains("出错啦!"));
}
3、上面的单测断言是成功的,那么异常跑哪里去了?上面的单测在控制台的输出如下:
4、可以看出上面的异常信息是直接输出到了控制台,而不是由程序输出到控制台,主要原因是主程序没有捕获到此异常导致的。(具体原因还没有深入)
如何解决线程池的异常捕获问题
上面的测试可以说明到为什么日志里面查不到错误日志,那么如何捕获线程里的异常呢?
方法1:使用UncaughtExceptionHandler
1、在创建线程池的时候,设置传入的ThreadFactory的UncaughtExceptionHandler属性,此UncaughtExceptionHandler会处理线程中的异常;下面的例子我直接打印了出来异常原因和异常栈。
@Test
public void runWithUncaughtExceptionHandler() throws InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool(
new ThreadFactoryBuilder()
.setUncaughtExceptionHandler((t, e) -> log.info("UncaughtExceptionHandler caught, error_message={}", e.getMessage(), e))
.build());
executorService.execute(this::runWithException);
TimeUnit.MILLISECONDS.sleep(100L);
Assert.assertTrue(systemOutRule.getLog().contains("出错啦!"));
}
2、上面的单测运行结果如下:(可以和上面的运行结果进行比对)
3、从上面的运行结果可以看出异常信息是由程序捕获后再输出出来,这样就不会导致查不到异常日志了。
方法2:使用guava扩展的FutureCallback
1、guava对jdk的线程做了一些扩展,其中一个就是FutureCallback,使用方法如下:
@Test
public void runWithGuavaThreadPool() throws InterruptedException {
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
ListenableFuture<String> listenableFuture = executorService.submit(this::runWithException);
Futures.addCallback(listenableFuture, new FutureCallback<String>() {
@Override
public void onSuccess(String result) {
log.info("success! result = {}", result);
}
@Override
public void onFailure(Throwable t) {
log.error("guava FutureCallback caught, error_message={}", t.getMessage(), t);
}
}, executorService);
TimeUnit.MILLISECONDS.sleep(100L);
Assert.assertTrue(systemOutRule.getLog().contains("出错啦!"));
}
问题总结
1、通过上面的测试,优化A服务的线程池定义,使之在遇到异常时能够正常被捕获,能输出,方便问题定位;补偿定时任务也能对第一次查询异常进行容错,保证数据能够同步过来。
/**
* 查询B服务数据详情的线程池
*/
private static final ExecutorService executorService = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MICROSECONDS,
new LinkedBlockingDeque<>(), new ThreadFactoryBuilder()
.setUncaughtExceptionHandler((t, e) -> log.error("查询数据详情的线程池异常,error_message={}", e.getMessage(), e))
.setNameFormat("queryDataDetail-%d").build());
2、当然此问题更深层的问题还没有完全解答
- 为什么线程里的异常不会被捕获?
- UncaughtExceptionHandler的运行原理是什么?
- Guava的FutureCallback是如何运行的?
3、测试代码源码地址
package com.nicai.experience.concurrency;
import com.google.common.util.concurrent.*;
import com.nicai.exception.NicaiException;
import lombok.extern.slf4j.Slf4j;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.contrib.java.lang.system.SystemOutRule;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@Slf4j
public class ExceptionThreadTest {
@Rule
public SystemOutRule systemOutRule = new SystemOutRule().enableLog();
@Test
public void run() throws InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(this::runWithException);
TimeUnit.MILLISECONDS.sleep(100L);
Assert.assertFalse(systemOutRule.getLog().contains("出错啦!"));
}
@Test
public void runWithUncaughtExceptionHandler() throws InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool(
new ThreadFactoryBuilder()
.setUncaughtExceptionHandler((t, e) -> log.info("UncaughtExceptionHandler caught, error_message={}", e.getMessage(), e))
.build());
executorService.execute(this::runWithException);
TimeUnit.MILLISECONDS.sleep(100L);
Assert.assertTrue(systemOutRule.getLog().contains("出错啦!"));
}
@Test
public void runWithGuavaThreadPool() throws InterruptedException {
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
ListenableFuture<String> listenableFuture = executorService.submit(this::runWithException);
Futures.addCallback(listenableFuture, new FutureCallback<String>() {
@Override
public void onSuccess(String result) {
log.info("success! result = {}", result);
}
@Override
public void onFailure(Throwable t) {
log.error("guava FutureCallback caught, error_message={}", t.getMessage(), t);
}
}, executorService);
TimeUnit.MILLISECONDS.sleep(100L);
Assert.assertTrue(systemOutRule.getLog().contains("出错啦!"));
}
private String runWithException() {
Thread thread = Thread.currentThread();
log.info("thread is {}", thread);
log.info("eh={}", thread.getUncaughtExceptionHandler());
throw new NicaiException("出错啦!");
}
@Before
public void cleanLog() {
systemOutRule.clearLog();
}
}