使用 Spring boot 启动的 amqp 包实现手动配置队列、交换机、并使用延迟队列来实现一个回调策略功能

boot 版本:2.4.4

依赖包为

  1. implementation 'org.springframework.boot:spring-boot-starter-amqp'

背景

之前用 Spring cloud stream-rabbit,在大部分的场景下,简单配置配置就可以使用了,比较方便,没有怎么用过原生的,如今不在 cloud 环境下,仅仅是 boot 场景下,使用方式又不一样了
本次要实现的需求有:

  1. 同项目发送消息到 mq,同项目再自己消费 mq 消息
  2. 实现延迟队列,来实现 http 回调重试逻辑

这里同项目也要丢到 mq 中的考虑:生成消息可能远远大于消费消息的速度,另外数据库中未保存消息,一方面使用 mq 来充当持久化机制,保证项目重启数据不丢失,另一方面充当齿轮保证消费消息的稳定性

同项目产生消息和消费消息

这里演示的配置是最简单的,直接是队列,没有交换机

首先配置 mq 的数据源

  1. spring:
  2. rabbitmq:
  3. host: 127.0.0.1
  4. port: 5672
  5. username: guest
  6. password: guest
  7. listener:
  8. simple:
  9. prefetch: 2

listener.simple.prefetch :每个消费线程获取消息的数量(每个使用者可以处理的未确认消息的最大数量),默认是好像是 250,大概上它的作用

产生消息

  1. @Autowired
  2. private RabbitTemplate rabbitTemplate;
  3. rabbitTemplate.convertAndSend(Names.QUEUE, JSON.toJSONString(params));

产生消息是最简单的,直接拿到队列名称,发出消息就可以了

消费消息

  1. public static final String QUEUE = "send.mail";
  2. /**
  3. * <pre>
  4. * 并发消费者数量,m-n
  5. * m: 最小消费者数量
  6. * n:最大消费者数量
  7. * </pre>
  8. */
  9. public static final String CONCURRENCY = "3-7";
  10. /**
  11. 声明一个队列
  12. */
  13. @Bean
  14. public Queue sendMail() {
  15. return new Queue(QUEUE);
  16. }
  17. // 监听队列配置
  18. @RabbitListener(
  19. queues = QUEUE, // 监听队列
  20. concurrency = CONCURRENCY, // 配置消费者数量,也就是并发线程数量
  21. ackMode = "AUTO" // ack 模式,其他可选枚举看这个 listener 的注释,上面写得有
  22. )
  23. public void process(String message) {
  24. NoticeMailRequest request = JSON.parseObject(message, NoticeMailRequest.class);
  25. log.debug("队列开始处理:{}", request);
  26. try {
  27. // 处理消息
  28. } catch (Exception e) {
  29. // 看你自己的业务,如果有异常不捕获,会被 aop 判定为消息处理失败,不会 ack
  30. }
  31. }

延迟队列

实现思路

延迟队列实现的思路:

  1. 创建一个正常的队列,但是要设置过期时间,绑定到 死信队列的交换机上
  2. 创建一个死信队列
  3. 程序只消费死信队列

这样一来,当一个消息进入到正常队列中,当消息超时之后,就会自动进入死信队列,就可以被程序消费,达到延迟消费的效果,但是需要知道: RabbitMQ 的 ttl 消息生存时间,可以在队列上设置,还可以在消息上设置,但是强烈建议设置在队列上

比如:a 消息是超时 1 分钟,b 消息是超时 30 秒,a 先进队列,b 后进队列,那么你会发现,就算 30 秒到了,b 消息也不会进入到死信队列中,而是等待 a 消息出来之后,跟着出来的

本次要实现的延迟队列功能背景如下:接受第三方系统调用 API 后,结果会通过 HTTP 方式异步回调会第三方系统,要保证在如下的规则中回调

  1. 有回调结果时:立即回调给第三方系统
  2. 如果回调第三方系统异常,每次重试间隔时间翻倍;假如所有回调都是错误的,那么将在第:0 、1、2、4、8、16、32、64 分钟时,进行回调操作,也就是说,加上正常回调的那一次,每一次的通知回调,最多调用 8 次,总共用时 64 分钟

关于回调策略问题,你可以自己定制,使用一些小算法进行,下面是具体的实现思路:

  1. 创建一个正常的队列:用来接收第一次的正常回调,目的是:及时回调,而不是都要等待 1 分钟后再回调
  2. 创建一个延迟队列,也就是将消息存活期设置成 1 分钟,目的是:当正常队列回调失败后,就投递到该队列中
  3. 创建一个死信队列
    1. 延迟队列绑定该死信队列的交换机
    2. 程序也消费该死信队列,如果还是回调失败,则手动将消息投递到延迟队列中

在死信队列中的处理需要注意:由于延迟队列设置成 1 分钟过期一次,那么也就是说,死信队列中的消费者,一分钟会消费到消息,这个时候还需要根据回调的时间间隔策略,做一定的判定,如果没有到达间隔,则再次投递回延迟队列中,因为所有消息都是 1 分钟过期,也就保证了一分钟后,只要消费者足够,就能如期消费到消息

具体代码

首先声明 3 个队列的交换机和队列,与绑定关系

  1. package cn.mrcode.httpcallback;
  2. import org.springframework.amqp.core.Binding;
  3. import org.springframework.amqp.core.BindingBuilder;
  4. import org.springframework.amqp.core.DirectExchange;
  5. import org.springframework.amqp.core.Queue;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. import java.util.HashMap;
  9. import java.util.Map;
  10. /**
  11. * @author mrcode
  12. */
  13. @Configuration
  14. public class HttpCallbackMqConfig {
  15. //普通队列名称
  16. public static final String NORMAL_QUEUE = "http.callback";
  17. //普通交换机名称
  18. public static final String NORMAL_EXCHANGE = "http.callback";
  19. //延迟队列名称
  20. public static final String DELAY_QUEUE = "http.callback.delay";
  21. //延迟交换机名称
  22. public static final String DELAY_EXCHANGE = "http.callback.delay";
  23. // 延迟间隔,单位秒
  24. public static final Integer DELAY_INTERVAL = 10 * 6;
  25. //死信队列名称
  26. public static final String DEAD_QUEUE = "http.callback.dlq";
  27. //死信交换机名称
  28. public static final String DEAD_EXCHANGE = "http.callback.dlx";
  29. /**
  30. * 普通:队列
  31. *
  32. * @return
  33. */
  34. @Bean
  35. public Queue normalQueue() {
  36. return new Queue(NORMAL_QUEUE, true, false, false, null);
  37. }
  38. /**
  39. * 普通:交换机
  40. *
  41. * @return
  42. */
  43. @Bean
  44. public DirectExchange normalExchange() {
  45. //交换机名 是否持久化 是否自动删除
  46. return new DirectExchange(NORMAL_EXCHANGE, true, false);
  47. }
  48. /**
  49. * 普通:绑定交换机和队列
  50. *
  51. * @return
  52. */
  53. @Bean
  54. public Binding normalBinding() {
  55. return BindingBuilder.bind(normalQueue()).to(normalExchange()).with("");
  56. }
  57. /**
  58. * 延迟:队列
  59. *
  60. * @return
  61. */
  62. @Bean
  63. public Queue delayQueue() {
  64. Map<String, Object> map = new HashMap<>();
  65. //绑定死信交换机
  66. map.put("x-dead-letter-exchange", DEAD_EXCHANGE);
  67. //绑定 key
  68. // map.put("x-dead-letter-routing-key", DeadKey);
  69. // 设置超时时间, 单位毫秒
  70. map.put("x-message-ttl", DELAY_INTERVAL * 1000); // 1 分钟
  71. //设置队列长度
  72. // map.put("x-max-length",5);
  73. //队列名 是否持久化 是否排他 是否自动删除 其他参数
  74. return new Queue(DELAY_QUEUE, true, false, false, map);
  75. }
  76. /**
  77. * 延迟:交换机
  78. *
  79. * @return
  80. */
  81. @Bean
  82. public DirectExchange delayExchange() {
  83. //交换机名 是否持久化 是否自动删除
  84. return new DirectExchange(DELAY_EXCHANGE, true, false);
  85. }
  86. /**
  87. * 延迟:绑定交换机和队列
  88. *
  89. * @return
  90. */
  91. @Bean
  92. public Binding delayBinding() {
  93. return BindingBuilder.bind(delayQueue()).to(delayExchange()).with("");
  94. }
  95. /**
  96. * 死信:队列
  97. *
  98. * @return
  99. */
  100. @Bean
  101. public Queue deadQueue() {
  102. //队列名 是否持久化 是否排他 是否自动删除 其他参数
  103. return new Queue(DEAD_QUEUE, true, false, false, null);
  104. }
  105. /**
  106. * 死信交换机
  107. *
  108. * @return
  109. */
  110. @Bean
  111. public DirectExchange deadExchange() {
  112. //交换机名 是否持久化 是否自动删除
  113. return new DirectExchange(DEAD_EXCHANGE, true, false);
  114. }
  115. /**
  116. * 死信:绑定
  117. *
  118. * @return
  119. */
  120. @Bean
  121. public Binding deadBinding() {
  122. return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("");
  123. }
  124. }

回调处理逻辑

  1. package cn.mrcode.httpcallback;
  2. import com.alibaba.fastjson.JSON;
  3. import com.alibaba.fastjson.JSONObject;
  4. import cn.mrcode.BaseMqResponse;
  5. import cn.mrcode.NoticeMailRequest;
  6. import org.springframework.amqp.core.Queue;
  7. import org.springframework.amqp.rabbit.annotation.Exchange;
  8. import org.springframework.amqp.rabbit.annotation.QueueBinding;
  9. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  10. import org.springframework.beans.factory.annotation.Autowired;
  11. import org.springframework.context.annotation.Bean;
  12. import org.springframework.stereotype.Component;
  13. import java.util.Date;
  14. import cn.hutool.http.HttpRequest;
  15. import cn.hutool.http.HttpUtil;
  16. import lombok.extern.slf4j.Slf4j;
  17. /**
  18. * @author mrcode
  19. */
  20. @Component
  21. @Slf4j
  22. public class HttpCallbackReceiver {
  23. /**
  24. * 最大重试次数,每次重试间隔时间翻倍; 也就是说, 64 分钟内会回调 7 次
  25. */
  26. public static final int MAX_RETRY_COUNT = 7;
  27. /**
  28. * <pre>
  29. * 并发消费者数量,m-n
  30. * m: 最小消费者数量
  31. * n:最大消费者数量
  32. * </pre>
  33. */
  34. public static final String CONCURRENCY = "3-7";
  35. @Autowired
  36. private NoticeHttpCallbacklMqService noticeHttpCallbacklMqService;
  37. /**
  38. 正常消费,立即回调一次
  39. */
  40. @RabbitListener(
  41. queues = HttpCallbackMqConfig.NORMAL_QUEUE,
  42. concurrency = CONCURRENCY,
  43. ackMode = "AUTO"
  44. )
  45. public void process(String message) {
  46. log.info("首次回调:{}", JSONObject.toJSONString(message));
  47. HttpCallbackMqRequest request = JSON.parseObject(message, HttpCallbackMqRequest.class);
  48. try {
  49. final String url = request.getUrl();
  50. final String dadaJson = request.getDataJson();
  51. push(url, dadaJson);
  52. } catch (Exception e) {
  53. log.error("首次回调异常,进入延迟队列:message:{},Exception={} ", message, e.getMessage());
  54. request.setRetryCount(0);
  55. request.setLastRetryTime(new Date());
  56. noticeHttpCallbacklMqService.delay(request);
  57. }
  58. }
  59. private void push(String url, String dadaJson) {
  60. String response = HttpRequest.post(url)
  61. .body(dadaJson)
  62. .timeout(1000 * 5)
  63. .execute()
  64. .body();
  65. }
  66. /**
  67. 处理死信消息、处理间隔时间逻辑
  68. */
  69. @RabbitListener(
  70. queues = HttpCallbackMqConfig.DEAD_QUEUE,
  71. concurrency = CONCURRENCY,
  72. ackMode = "AUTO"
  73. )
  74. public void deadProcess(String message) {
  75. HttpCallbackMqRequest request = JSON.parseObject(message, HttpCallbackMqRequest.class);
  76. final String url = request.getUrl();
  77. final String dadaJson = request.getDataJson();
  78. final Integer retryCount = request.getRetryCount() + 1;
  79. final Integer reEnterCount = request.getReEnterCount() + 1;
  80. try {
  81. // 下一次执行的次数:1,2,4,8 ...
  82. int nextExecCount = (int) Math.pow(2, request.getRetryCount());
  83. if (nextExecCount == reEnterCount) {
  84. log.info("重试回调处理信息:{}", JSONObject.toJSONString(message));
  85. push(url, dadaJson);
  86. } else {
  87. request.setReEnterCount(reEnterCount);
  88. noticeHttpCallbacklMqService.delay(request);
  89. }
  90. } catch (Exception e) {
  91. log.error("重试回调异常,当前重试次数{}/{}:message:{},Exception={} ", retryCount, MAX_RETRY_COUNT, message, e.getMessage());
  92. // 超过重试次数,则不再继续,直接丢弃
  93. if (retryCount >= MAX_RETRY_COUNT) {
  94. return;
  95. }
  96. request.setRetryCount(retryCount);
  97. request.setReEnterCount(reEnterCount);
  98. noticeHttpCallbacklMqService.delay(request);
  99. }
  100. }
  101. public static void main(String[] args) {
  102. for (int i = 0; i < 10; i++) {
  103. System.out.println((int) Math.pow(2, i));
  104. }
  105. }
  106. }

辅助投递消息的工具服务

  1. package cn.mrcode.httpcallback;
  2. import com.alibaba.fastjson.JSON;
  3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.stereotype.Service;
  6. import java.util.Date;
  7. import lombok.extern.slf4j.Slf4j;
  8. /**
  9. * @author mrcode
  10. */
  11. @Service
  12. @Slf4j
  13. public class NoticeHttpCallbacklMqService {
  14. @Autowired
  15. private RabbitTemplate rabbitTemplate;
  16. /**
  17. * 投递到普通队列
  18. *
  19. * @param callbackDestination
  20. * @param toJSONString
  21. */
  22. public void send(String callbackDestination, String toJSONString) {
  23. final HttpCallbackMqRequest request = new HttpCallbackMqRequest();
  24. request.setDataJson(toJSONString);
  25. request.setTime(new Date());
  26. request.setUrl(callbackDestination);
  27. rabbitTemplate.convertAndSend(HttpCallbackMqConfig.NORMAL_QUEUE,
  28. "",
  29. JSON.toJSONString(request));
  30. }
  31. /**
  32. * 投递到延迟队列:一分钟后自动过期转移到死信队列
  33. *
  34. * @param request
  35. */
  36. public void delay(HttpCallbackMqRequest request) {
  37. rabbitTemplate.convertAndSend(HttpCallbackMqConfig.DELAY_QUEUE,
  38. "",
  39. JSON.toJSONString(request));
  40. }
  41. /**
  42. * 重新入延迟队列
  43. *
  44. * @param request
  45. */
  46. public void reDelay(HttpCallbackMqRequest request) {
  47. rabbitTemplate.convertAndSend(HttpCallbackMqConfig.DELAY_QUEUE,
  48. "",
  49. JSON.toJSONString(request));
  50. }
  51. }

包装原始响应消息的消息体

  1. package cn.mrcode.httpcallback;
  2. import cn.mrcode.BaseMqRequest;
  3. import cn.mrcode.BaseMqResponse;
  4. import java.util.Date;
  5. import lombok.Data;
  6. import lombok.EqualsAndHashCode;
  7. import lombok.ToString;
  8. /**
  9. * <pre>
  10. * 注意,本类不提供结果响应
  11. * 如果回调失败,直接丢弃
  12. * </pre>
  13. */
  14. @Data
  15. @ToString
  16. @EqualsAndHashCode(callSuper = true)
  17. public class HttpCallbackMqRequest extends BaseMqRequest {
  18. /**
  19. * 要调用的 地址,只支持 POST JSON 方式
  20. */
  21. private String url;
  22. /**
  23. * 要提交的 json 信息
  24. */
  25. private String dataJson;
  26. // 当前重试次数
  27. private Integer retryCount = 0;
  28. // 当前重新排队次数
  29. private Integer reEnterCount = 0;
  30. // 上一次重试时间
  31. private Date lastRetryTime;
  32. @Override
  33. public BaseMqResponse newInstance() {
  34. return null;
  35. }
  36. }
  37. @Data
  38. @ApiModel
  39. public abstract class BaseMqRequest {
  40. @ApiModelProperty("投递结果回调地址,必须是 POST URL, 接受 JSON 类型,如果回调失败,那么将会在 64 分钟内持续回调 7 次,直至成功,请在 5 秒内给出响应,否则会超时 ")
  41. private String callbackDestination;
  42. @ApiModelProperty("消息产生时间,格式 2020-03-05 02:06:07")
  43. @NotNull
  44. private Date time;
  45. @ApiModelProperty("自定义消息 ID,如果有,则在回调时原样返回,这里你可以自由发挥,不仅仅只能传递一个简单的 ID 字符串")
  46. private String cid;
  47. @ApiModelProperty(value = "简单的系统 ID", hidden = true)
  48. private String id;
  49. public BaseMqResponse buildBaseResponse() {
  50. BaseMqResponse response = newInstance();
  51. if (response == null) {
  52. return null;
  53. }
  54. response.setCid(cid);
  55. response.setId(id);
  56. response.setTime(new Date());
  57. response.setSuccess(true);
  58. return response;
  59. }
  60. public abstract BaseMqResponse newInstance();
  61. @Override
  62. public String toString() {
  63. return
  64. "id='" + id + '\'' +
  65. ", cid='" + cid + '\'' +
  66. ", time=" + time + '\'' +
  67. "callbackDestination='" + callbackDestination + '\''
  68. ;
  69. }
  70. }

获取队列中消息数量

记得有一篇文章说过,使用消息队列不只是会投递、消费消息,最重要的一个指标是消息积累数量,需要监控,如果堆积严重,需要进行改进,那么就是如何通过程序获取到队列中消息数量

先配置 RabbitAdmin ,通过它来的 API 来获取到消息数量

  1. import org.springframework.amqp.rabbit.connection.ConnectionFactory;
  2. import org.springframework.amqp.rabbit.core.RabbitAdmin;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. @Configuration
  6. public class RabbitMqConfig {
  7. @Bean
  8. public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
  9. RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
  10. rabbitAdmin.setAutoStartup(true); // 服务启动时候开启自动启动
  11. return rabbitAdmin;
  12. }
  13. }

获取指定队列消息数量

  1. import org.springframework.amqp.core.QueueInformation;
  2. @Autowired
  3. private RabbitAdmin rabbitAdmin;
  4. @GetMapping("/queue-monitor")
  5. @ApiOperation(value = "队列监控", notes = "获取消息队列中的消息堆积数量等信息,请不要大量并发调用该接口")
  6. public Result<QueueInformation> queueMonitor() {
  7. final QueueInformation queueInfo = rabbitAdmin.getQueueInfo(Names.QUEUE);
  8. return ResultHelper.ok(queueInfo);
  9. }

该对象能获取到的信息有:队列名称、消息数量、消费者数量

  1. "name": "send.mail",
  2. "messageCount": 0,
  3. "consumerCount": 3