问题场景

在使用 spring-cloud-stream 消息队列时,我们有一个业务场景是:

  1. 开启了 4 个消费线程消费队列中其他系统投递来的邮件发送任务
  2. 拿到邮件发送信息后,调用第三方平台 API 接口发送邮件

当我的服务运行很长一段时间后(最近发现是运行了 240 小时+,出现了问题),就会发现这个队列貌似一直不会 ACK,如下图这样
image.png
但是:

  1. 它的 total 数量会变化,一直会增多
  2. Unacked 不变化:也就是有 4 个线程消费了消息,但是一直没有 ack
  3. 查看了服务日志,也有日志产生,也能访问到这个服务

问题解决

先找到这个服务的 进程 ID

  1. # 使用 top 命令 -c 参数,查看运行命令的全部,就能靠着你的 jar 包名称分辨哪一个进程 ID 是你的微服务
  2. top -c

image.png
如上图是 56243.

然后使用 jstack 命令导出当前线程状态信息

  1. # 导出线程信息到文件中
  2. jstack 56243 > 56243.txt
  3. # 使用 less 命令查看文件信息,搜索我们的关键词
  4. # 进入之后,按 / 然后会触发一个输入操作,输入你要查找的关键词按回车可以搜索当前文件
  5. # 按字母 n 可以查看下一个搜索结果
  6. less 56243.txt

问题是:关键词从哪里来?这个就要从业务相关说起了,下面分两种方式来说:

  1. 你知道哪一个队列消费有问题,去找到这个队列的消费的地方;然后搜索这个类名
  2. cloud.stream 生成队列的时候是按照配置的 队列名 + group 名生成的,那么消费线程也是按照这个名称来起名的,如下的配置

image.png
那么我们搜索的关键词就是 cloud-tppExpertsender-send-st

image.png

  1. 首先这个线程状态是 RUNNABLE 但是一直是在 SocketInputStream.socketRead0 这个方法这里,这个是读取 socker 流的方法,一直在这里卡着。
  2. 然后看业务逻辑代码 463 行

image.png

可以看到上面代码使用了自己封装的一个 http 请求器,也就是这一句话导致这个线程假死了。所以需要跟踪这个方法的源码去查看。

  1. public CloseableHttpResponse requestPostJson(String url, String paramsJson, Map<String, String> headers, Integer timeout, Charset charset) throws IOException {
  2. HttpPost http = new HttpPost(url);
  3. charset = (charset == null ? defaultCharset : charset);
  4. StringEntity entity = new StringEntity(paramsJson, charset);
  5. entity.setContentType(ContentType.APPLICATION_JSON.getMimeType());
  6. http.setEntity(entity);
  7. setHeaders(http, headers);
  8. setTimeout(http, timeout);
  9. return execute(http);
  10. }
  11. private void setTimeout(HttpRequestBase http, Integer timeout) {
  12. if (timeout != null) {
  13. RequestConfig config = RequestConfig
  14. .custom()
  15. .setConnectTimeout(timeout)
  16. .setSocketTimeout(timeout * 2).build();
  17. http.setConfig(config);
  18. }
  19. }

最后发现这里没有传递 超时时间,导致在某些情况下,一直卡着不返回,从而导致整个消费线程假死。 MQ 消息累积。