使用 Spring boot 启动的 amqp 包实现手动配置队列、交换机、并使用延迟队列来实现一个回调策略功能
boot 版本:2.4.4
依赖包为
implementation 'org.springframework.boot:spring-boot-starter-amqp'
背景
之前用 Spring cloud stream-rabbit,在大部分的场景下,简单配置配置就可以使用了,比较方便,没有怎么用过原生的,如今不在 cloud 环境下,仅仅是 boot 场景下,使用方式又不一样了
本次要实现的需求有:
- 同项目发送消息到 mq,同项目再自己消费 mq 消息
- 实现延迟队列,来实现 http 回调重试逻辑
这里同项目也要丢到 mq 中的考虑:生成消息可能远远大于消费消息的速度,另外数据库中未保存消息,一方面使用 mq 来充当持久化机制,保证项目重启数据不丢失,另一方面充当齿轮保证消费消息的稳定性
同项目产生消息和消费消息
这里演示的配置是最简单的,直接是队列,没有交换机
首先配置 mq 的数据源
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
listener:
simple:
prefetch: 2
listener.simple.prefetch
:每个消费线程获取消息的数量(每个使用者可以处理的未确认消息的最大数量),默认是好像是 250,大概上它的作用
产生消息
@Autowired
private RabbitTemplate rabbitTemplate;
rabbitTemplate.convertAndSend(Names.QUEUE, JSON.toJSONString(params));
消费消息
public static final String QUEUE = "send.mail";
/**
* <pre>
* 并发消费者数量,m-n
* m: 最小消费者数量
* n:最大消费者数量
* </pre>
*/
public static final String CONCURRENCY = "3-7";
/**
声明一个队列
*/
@Bean
public Queue sendMail() {
return new Queue(QUEUE);
}
// 监听队列配置
@RabbitListener(
queues = QUEUE, // 监听队列
concurrency = CONCURRENCY, // 配置消费者数量,也就是并发线程数量
ackMode = "AUTO" // ack 模式,其他可选枚举看这个 listener 的注释,上面写得有
)
public void process(String message) {
NoticeMailRequest request = JSON.parseObject(message, NoticeMailRequest.class);
log.debug("队列开始处理:{}", request);
try {
// 处理消息
} catch (Exception e) {
// 看你自己的业务,如果有异常不捕获,会被 aop 判定为消息处理失败,不会 ack
}
}
延迟队列
实现思路
延迟队列实现的思路:
- 创建一个正常的队列,但是要设置过期时间,绑定到 死信队列的交换机上
- 创建一个死信队列
- 程序只消费死信队列
这样一来,当一个消息进入到正常队列中,当消息超时之后,就会自动进入死信队列,就可以被程序消费,达到延迟消费的效果,但是需要知道: RabbitMQ 的 ttl 消息生存时间,可以在队列上设置,还可以在消息上设置,但是强烈建议设置在队列上
比如:a 消息是超时 1 分钟,b 消息是超时 30 秒,a 先进队列,b 后进队列,那么你会发现,就算 30 秒到了,b 消息也不会进入到死信队列中,而是等待 a 消息出来之后,跟着出来的
本次要实现的延迟队列功能背景如下:接受第三方系统调用 API 后,结果会通过 HTTP 方式异步回调会第三方系统,要保证在如下的规则中回调
- 有回调结果时:立即回调给第三方系统
- 如果回调第三方系统异常,每次重试间隔时间翻倍;假如所有回调都是错误的,那么将在第:0 、1、2、4、8、16、32、64 分钟时,进行回调操作,也就是说,加上正常回调的那一次,每一次的通知回调,最多调用 8 次,总共用时 64 分钟
关于回调策略问题,你可以自己定制,使用一些小算法进行,下面是具体的实现思路:
- 创建一个正常的队列:用来接收第一次的正常回调,目的是:及时回调,而不是都要等待 1 分钟后再回调
- 创建一个延迟队列,也就是将消息存活期设置成 1 分钟,目的是:当正常队列回调失败后,就投递到该队列中
- 创建一个死信队列
- 延迟队列绑定该死信队列的交换机
- 程序也消费该死信队列,如果还是回调失败,则手动将消息投递到延迟队列中
在死信队列中的处理需要注意:由于延迟队列设置成 1 分钟过期一次,那么也就是说,死信队列中的消费者,一分钟会消费到消息,这个时候还需要根据回调的时间间隔策略,做一定的判定,如果没有到达间隔,则再次投递回延迟队列中,因为所有消息都是 1 分钟过期,也就保证了一分钟后,只要消费者足够,就能如期消费到消息
具体代码
首先声明 3 个队列的交换机和队列,与绑定关系
package cn.mrcode.httpcallback;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* @author mrcode
*/
@Configuration
public class HttpCallbackMqConfig {
//普通队列名称
public static final String NORMAL_QUEUE = "http.callback";
//普通交换机名称
public static final String NORMAL_EXCHANGE = "http.callback";
//延迟队列名称
public static final String DELAY_QUEUE = "http.callback.delay";
//延迟交换机名称
public static final String DELAY_EXCHANGE = "http.callback.delay";
// 延迟间隔,单位秒
public static final Integer DELAY_INTERVAL = 10 * 6;
//死信队列名称
public static final String DEAD_QUEUE = "http.callback.dlq";
//死信交换机名称
public static final String DEAD_EXCHANGE = "http.callback.dlx";
/**
* 普通:队列
*
* @return
*/
@Bean
public Queue normalQueue() {
return new Queue(NORMAL_QUEUE, true, false, false, null);
}
/**
* 普通:交换机
*
* @return
*/
@Bean
public DirectExchange normalExchange() {
//交换机名 是否持久化 是否自动删除
return new DirectExchange(NORMAL_EXCHANGE, true, false);
}
/**
* 普通:绑定交换机和队列
*
* @return
*/
@Bean
public Binding normalBinding() {
return BindingBuilder.bind(normalQueue()).to(normalExchange()).with("");
}
/**
* 延迟:队列
*
* @return
*/
@Bean
public Queue delayQueue() {
Map<String, Object> map = new HashMap<>();
//绑定死信交换机
map.put("x-dead-letter-exchange", DEAD_EXCHANGE);
//绑定 key
// map.put("x-dead-letter-routing-key", DeadKey);
// 设置超时时间, 单位毫秒
map.put("x-message-ttl", DELAY_INTERVAL * 1000); // 1 分钟
//设置队列长度
// map.put("x-max-length",5);
//队列名 是否持久化 是否排他 是否自动删除 其他参数
return new Queue(DELAY_QUEUE, true, false, false, map);
}
/**
* 延迟:交换机
*
* @return
*/
@Bean
public DirectExchange delayExchange() {
//交换机名 是否持久化 是否自动删除
return new DirectExchange(DELAY_EXCHANGE, true, false);
}
/**
* 延迟:绑定交换机和队列
*
* @return
*/
@Bean
public Binding delayBinding() {
return BindingBuilder.bind(delayQueue()).to(delayExchange()).with("");
}
/**
* 死信:队列
*
* @return
*/
@Bean
public Queue deadQueue() {
//队列名 是否持久化 是否排他 是否自动删除 其他参数
return new Queue(DEAD_QUEUE, true, false, false, null);
}
/**
* 死信交换机
*
* @return
*/
@Bean
public DirectExchange deadExchange() {
//交换机名 是否持久化 是否自动删除
return new DirectExchange(DEAD_EXCHANGE, true, false);
}
/**
* 死信:绑定
*
* @return
*/
@Bean
public Binding deadBinding() {
return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("");
}
}
回调处理逻辑
package cn.mrcode.httpcallback;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import cn.mrcode.BaseMqResponse;
import cn.mrcode.NoticeMailRequest;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import java.util.Date;
import cn.hutool.http.HttpRequest;
import cn.hutool.http.HttpUtil;
import lombok.extern.slf4j.Slf4j;
/**
* @author mrcode
*/
@Component
@Slf4j
public class HttpCallbackReceiver {
/**
* 最大重试次数,每次重试间隔时间翻倍; 也就是说, 64 分钟内会回调 7 次
*/
public static final int MAX_RETRY_COUNT = 7;
/**
* <pre>
* 并发消费者数量,m-n
* m: 最小消费者数量
* n:最大消费者数量
* </pre>
*/
public static final String CONCURRENCY = "3-7";
@Autowired
private NoticeHttpCallbacklMqService noticeHttpCallbacklMqService;
/**
正常消费,立即回调一次
*/
@RabbitListener(
queues = HttpCallbackMqConfig.NORMAL_QUEUE,
concurrency = CONCURRENCY,
ackMode = "AUTO"
)
public void process(String message) {
log.info("首次回调:{}", JSONObject.toJSONString(message));
HttpCallbackMqRequest request = JSON.parseObject(message, HttpCallbackMqRequest.class);
try {
final String url = request.getUrl();
final String dadaJson = request.getDataJson();
push(url, dadaJson);
} catch (Exception e) {
log.error("首次回调异常,进入延迟队列:message:{},Exception={} ", message, e.getMessage());
request.setRetryCount(0);
request.setLastRetryTime(new Date());
noticeHttpCallbacklMqService.delay(request);
}
}
private void push(String url, String dadaJson) {
String response = HttpRequest.post(url)
.body(dadaJson)
.timeout(1000 * 5)
.execute()
.body();
}
/**
处理死信消息、处理间隔时间逻辑
*/
@RabbitListener(
queues = HttpCallbackMqConfig.DEAD_QUEUE,
concurrency = CONCURRENCY,
ackMode = "AUTO"
)
public void deadProcess(String message) {
HttpCallbackMqRequest request = JSON.parseObject(message, HttpCallbackMqRequest.class);
final String url = request.getUrl();
final String dadaJson = request.getDataJson();
final Integer retryCount = request.getRetryCount() + 1;
final Integer reEnterCount = request.getReEnterCount() + 1;
try {
// 下一次执行的次数:1,2,4,8 ...
int nextExecCount = (int) Math.pow(2, request.getRetryCount());
if (nextExecCount == reEnterCount) {
log.info("重试回调处理信息:{}", JSONObject.toJSONString(message));
push(url, dadaJson);
} else {
request.setReEnterCount(reEnterCount);
noticeHttpCallbacklMqService.delay(request);
}
} catch (Exception e) {
log.error("重试回调异常,当前重试次数{}/{}:message:{},Exception={} ", retryCount, MAX_RETRY_COUNT, message, e.getMessage());
// 超过重试次数,则不再继续,直接丢弃
if (retryCount >= MAX_RETRY_COUNT) {
return;
}
request.setRetryCount(retryCount);
request.setReEnterCount(reEnterCount);
noticeHttpCallbacklMqService.delay(request);
}
}
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
System.out.println((int) Math.pow(2, i));
}
}
}
辅助投递消息的工具服务
package cn.mrcode.httpcallback;
import com.alibaba.fastjson.JSON;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Date;
import lombok.extern.slf4j.Slf4j;
/**
* @author mrcode
*/
@Service
@Slf4j
public class NoticeHttpCallbacklMqService {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 投递到普通队列
*
* @param callbackDestination
* @param toJSONString
*/
public void send(String callbackDestination, String toJSONString) {
final HttpCallbackMqRequest request = new HttpCallbackMqRequest();
request.setDataJson(toJSONString);
request.setTime(new Date());
request.setUrl(callbackDestination);
rabbitTemplate.convertAndSend(HttpCallbackMqConfig.NORMAL_QUEUE,
"",
JSON.toJSONString(request));
}
/**
* 投递到延迟队列:一分钟后自动过期转移到死信队列
*
* @param request
*/
public void delay(HttpCallbackMqRequest request) {
rabbitTemplate.convertAndSend(HttpCallbackMqConfig.DELAY_QUEUE,
"",
JSON.toJSONString(request));
}
/**
* 重新入延迟队列
*
* @param request
*/
public void reDelay(HttpCallbackMqRequest request) {
rabbitTemplate.convertAndSend(HttpCallbackMqConfig.DELAY_QUEUE,
"",
JSON.toJSONString(request));
}
}
包装原始响应消息的消息体
package cn.mrcode.httpcallback;
import cn.mrcode.BaseMqRequest;
import cn.mrcode.BaseMqResponse;
import java.util.Date;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
/**
* <pre>
* 注意,本类不提供结果响应
* 如果回调失败,直接丢弃
* </pre>
*/
@Data
@ToString
@EqualsAndHashCode(callSuper = true)
public class HttpCallbackMqRequest extends BaseMqRequest {
/**
* 要调用的 地址,只支持 POST JSON 方式
*/
private String url;
/**
* 要提交的 json 信息
*/
private String dataJson;
// 当前重试次数
private Integer retryCount = 0;
// 当前重新排队次数
private Integer reEnterCount = 0;
// 上一次重试时间
private Date lastRetryTime;
@Override
public BaseMqResponse newInstance() {
return null;
}
}
@Data
@ApiModel
public abstract class BaseMqRequest {
@ApiModelProperty("投递结果回调地址,必须是 POST URL, 接受 JSON 类型,如果回调失败,那么将会在 64 分钟内持续回调 7 次,直至成功,请在 5 秒内给出响应,否则会超时 ")
private String callbackDestination;
@ApiModelProperty("消息产生时间,格式 2020-03-05 02:06:07")
@NotNull
private Date time;
@ApiModelProperty("自定义消息 ID,如果有,则在回调时原样返回,这里你可以自由发挥,不仅仅只能传递一个简单的 ID 字符串")
private String cid;
@ApiModelProperty(value = "简单的系统 ID", hidden = true)
private String id;
public BaseMqResponse buildBaseResponse() {
BaseMqResponse response = newInstance();
if (response == null) {
return null;
}
response.setCid(cid);
response.setId(id);
response.setTime(new Date());
response.setSuccess(true);
return response;
}
public abstract BaseMqResponse newInstance();
@Override
public String toString() {
return
"id='" + id + '\'' +
", cid='" + cid + '\'' +
", time=" + time + '\'' +
"callbackDestination='" + callbackDestination + '\''
;
}
}
获取队列中消息数量
记得有一篇文章说过,使用消息队列不只是会投递、消费消息,最重要的一个指标是消息积累数量,需要监控,如果堆积严重,需要进行改进,那么就是如何通过程序获取到队列中消息数量
先配置 RabbitAdmin ,通过它来的 API 来获取到消息数量
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMqConfig {
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
rabbitAdmin.setAutoStartup(true); // 服务启动时候开启自动启动
return rabbitAdmin;
}
}
获取指定队列消息数量
import org.springframework.amqp.core.QueueInformation;
@Autowired
private RabbitAdmin rabbitAdmin;
@GetMapping("/queue-monitor")
@ApiOperation(value = "队列监控", notes = "获取消息队列中的消息堆积数量等信息,请不要大量并发调用该接口")
public Result<QueueInformation> queueMonitor() {
final QueueInformation queueInfo = rabbitAdmin.getQueueInfo(Names.QUEUE);
return ResultHelper.ok(queueInfo);
}
该对象能获取到的信息有:队列名称、消息数量、消费者数量
"name": "send.mail",
"messageCount": 0,
"consumerCount": 3