问题场景
在使用 spring-cloud-stream 消息队列时,我们有一个业务场景是:
- 开启了 4 个消费线程消费队列中其他系统投递来的邮件发送任务
- 拿到邮件发送信息后,调用第三方平台 API 接口发送邮件
当我的服务运行很长一段时间后(最近发现是运行了 240 小时+,出现了问题),就会发现这个队列貌似一直不会 ACK,如下图这样
但是:
- 它的 total 数量会变化,一直会增多
- Unacked 不变化:也就是有 4 个线程消费了消息,但是一直没有 ack
- 查看了服务日志,也有日志产生,也能访问到这个服务
问题解决
先找到这个服务的 进程 ID
# 使用 top 命令 -c 参数,查看运行命令的全部,就能靠着你的 jar 包名称分辨哪一个进程 ID 是你的微服务
top -c
如上图是 56243.
然后使用 jstack 命令导出当前线程状态信息
# 导出线程信息到文件中
jstack 56243 > 56243.txt
# 使用 less 命令查看文件信息,搜索我们的关键词
# 进入之后,按 / 然后会触发一个输入操作,输入你要查找的关键词按回车可以搜索当前文件
# 按字母 n 可以查看下一个搜索结果
less 56243.txt
问题是:关键词从哪里来?这个就要从业务相关说起了,下面分两种方式来说:
- 你知道哪一个队列消费有问题,去找到这个队列的消费的地方;然后搜索这个类名
- cloud.stream 生成队列的时候是按照配置的 队列名 + group 名生成的,那么消费线程也是按照这个名称来起名的,如下的配置
那么我们搜索的关键词就是 cloud-tppExpertsender-send-st
- 首先这个线程状态是 RUNNABLE 但是一直是在 SocketInputStream.socketRead0 这个方法这里,这个是读取 socker 流的方法,一直在这里卡着。
- 然后看业务逻辑代码 463 行
可以看到上面代码使用了自己封装的一个 http 请求器,也就是这一句话导致这个线程假死了。所以需要跟踪这个方法的源码去查看。
public CloseableHttpResponse requestPostJson(String url, String paramsJson, Map<String, String> headers, Integer timeout, Charset charset) throws IOException {
HttpPost http = new HttpPost(url);
charset = (charset == null ? defaultCharset : charset);
StringEntity entity = new StringEntity(paramsJson, charset);
entity.setContentType(ContentType.APPLICATION_JSON.getMimeType());
http.setEntity(entity);
setHeaders(http, headers);
setTimeout(http, timeout);
return execute(http);
}
private void setTimeout(HttpRequestBase http, Integer timeout) {
if (timeout != null) {
RequestConfig config = RequestConfig
.custom()
.setConnectTimeout(timeout)
.setSocketTimeout(timeout * 2).build();
http.setConfig(config);
}
}
最后发现这里没有传递 超时时间,导致在某些情况下,一直卡着不返回,从而导致整个消费线程假死。 MQ 消息累积。