基础概念
什么是MQ?
什么是AMQP?
什么是RabbitMQ?
消息队列的通用架构
RabbitMQ基础概念
RabbitMQ的六种工作模式
1. 简单模式
2. Work queues工作队列模式
订阅模型
订阅模型需要引入交换机exchange
,消息从生产者先发送至exchange
交换机,交换机再根据配置将消息发送给对应的channel
。下面介绍的三种模式,主要差别就是exchange
根据怎么的规则发送消息给channel
3.1订阅模型-fanout(广播模式)
3.2 订阅模型-direct(Routing 路由模式)
exchange_type = direct
通过rounting key绑定队列和交换机
3.3订阅模型-topic(Topics 话题模式 )
exchange_type = topic
与direct模式相似,只不过direct模式的routing key是固定的字符串,而订阅模式可以有通配符
还有一种hearders模式,通过header的信息匹配,现在用的很少
4. RPC模式
SpringBoot结合RabbitMQ
RabbitMQ高级特性
TTL
time to live 消息存活时间
过期时间,顾名思义设定一个时间后,消息自动移除。
TTL有两种设置方式:
- 为队列设置过期时间
- 为消息设置过期时间
为队列设置过期时间
队列可以设置一个过期时间x-message-ttl
的属性,单位毫秒
x-message-ttl How long a queue can be unused for before it is automatically deleted (milliseconds). (Sets the “x-expires” argument.) (RabbitMQ官网)
@Bean
public Queue ttlQueue(){
Map<String, Object> map = new HashMap<>();
map.put("x-message-ttl",5000);
return new Queue(RabbitMqConstant.TEST_TTL_QUEUE,true,false,false,map);
}
为消息设置过期时间
MessagePostProcessor mpp = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration("5000");
message.getMessageProperties().setContentEncoding("UTF-8");
return message;
}
};
rabbitTemplate.convertAndSend(RabbitMqConstant.TEST_FANOUT_EXCHANGE,null,"发送的消息",mpp);
二者消息过期后,都会被移除,有什么区别:
- 过期队列里的消息过期后,会被移植死信队列;直接设置的过期消息不会;
死信队列
不能正常被消费的消息,也不能随意丢弃,会将他们放入一个特殊的死信队列。
以下情况,消息会被投递至死信队列:
- 消息被否定确认,使用
basicNack
或basicReject
,并且此时requeue
属性被设置为false。 - 消息在队列的存活时间超过设置的
TTL
时间。 - 消息队列的消息数量已经超过最大队列长度。
死信交换机、死信队列本质上都是普通的交换机、队列,只是用处不同,下面看下配置过程
//1. 配置死信交换机、死信队列,并将两者绑定
// 定义一个死信交换机
@Bean
public DirectExchange deadDirectExchange(){
return new DirectExchange(RabbitMqConstant.TEST_DEAD_EXCHANGE);
}
// 定义一个死信队列
@Bean
public Queue deadQueue(){
return new Queue(RabbitMqConstant.TEST_DEAD_QUEUE,true,false,false);
}
// 绑定
// 这里用的是direct 交换机,配置了routingKey
@Bean
public Binding deadBinding(){
return BindingBuilder.bind(deadQueue()).to(deadDirectExchange()).with("dead");
}
// 为指定队列配置死信队列。配置后,队列满足条件时(此例中是消息过期)会将消息投递至死信队列
@Bean
public Queue ttlQueue(){
Map<String, Object> map = new HashMap<>();
// 设置队列过期时间
map.put("x-message-ttl",5000);
// 为队列设置死信交换机
map.put(RabbitMqConstant.X_DEAD_LETTER_EXCHANGE,RabbitMqConstant.TEST_DEAD_EXCHANGE);
// 设置routingKey(死信交换机为fanout时,不需要配置)
map.put(RabbitMqConstant.X_DEAD_LETTER_ROUTING_KEY,"dead");
return new Queue(RabbitMqConstant.TEST_TTL_QUEUE,true,false,false,map);
}
延迟队列
有时我们希望队列中的消息延迟一定时间再处理。RabbitMQ本身不支持这个功能,但是可以通过死信队列间接实现。将死信队列作为最终处理的目标队列就好,设置TTL的队列只作为中转。
消息确认
下图是简易的消息发送过程,共有三处需要进行消息确认:
- 生产者发送消息值Broker
- Broker中交换机收到消息后把消息投递给队列
- 队列将消息发给消费者
- 在 RabbitMQ 中 有两种事务机制来确保消息的安全送达,分别是事务机制和确认机制;
- 事务机制需要每个消息或一组消息发布、提交的通道设置为事务性的,因此会非常耗费性能,降低了 Rabbitmq 的消息吞吐量;
- 因此我们在实际生产中通常采用确认机制
生产者消息确认
提供了ConfirmCallback
、ReturnCallback
两个回调来进行确认。
要使用确认回调,需要在配置里手动设置一下:
spring.rabbitmq.publisher-confirm-type: correlated
spring.rabbitmq.publisher-returns: true
// 为RabbitTemplate 设置回调
@Bean
RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
log.info("消息发送成功:{}" + correlationData.getId());
} else {
log.info("消息发送食材:{}" + correlationData);
}
}
});
// confirm 模式只能保证消息到达 broker,不能保证消息准确投递到目标 queue 里。
// 在有些业务场景下,我们需要保证消息一定要投递到目标 queue 里,此时就需要用到 return 退回模式
// 这样如果未能投递到目标 queue 里将调用 returnCallback,可以记录下详细到投递数据,定期的巡检或者自动纠错都需要这些数据
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
//Message message表示的是消息体,int replyCode表示响应码,String replyText表示响应内容,String exchange表示发送消息时指定的交换机,String routingKey表示发送消息时指定的routing key
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info(MessageFormat.format("未能投递到目标队列,ReturnCallback:{0},{1},{2},{3},{4},{5}", message, replyCode,replyText, exchange, routingKey));
}
});
return rabbitTemplate;
}
ConfirmCallback
- 消息只要被 broker 接收到就会执行
confirm()
- 如果是 cluster 模式,需要所有 broker 接收到才会调用 confirmCallback
被 broker 接收到只能表示 消息 已经到达服务器,并不能保证消息一定会被投递到目标 queue 里
ReturnCallback
confirm 模式只能保证消息到达 broker,不能保证消息准确投递到目标 queue 里,要确认是否投递到目标 queue 里,此时就需要用到 return 退回模式
- 这样如果未能投递到目标 queue 里将调用 returnCallback,可以记录下详细到投递数据, 记录下来再处理
消费者消息确认
RabbitMQ需要知道消费者是否正确收到/处理了消息(判断逻辑由业务业务决定),收到确认后在队列中删除消息。这就需要消费者给RabbitMQ发送应答消息。
有两种应答方式:自动应答、手动应答
根据情况确认
自动应答
acknowledge-mode: none
消费者在消费消息的时候,如果设定应答模式为自动,则消费者收到消息后,消息就会立即被 RabbitMQ 从 队列中删除掉。但往往我们的业务还需要执行,是否应答由业务决定,因此实际开发中,更多用手动应答
手动应答
acknowledge-mode: manual
- 可以在既定的正常情况下进行确认(告诉 RabbitMQ,我已经消费过该消息了,你可以删除该条数据了)
- 可以在既定的异常情况下不进行确认(RabbitMQ 会继续保留该条数据),这样下一次可以继续消费该条数据。
开启手动应答模式
spring:
rabbitmq:
listener:
direct:
acknowledge-mode: manual
手动应答:
/**
* 确认消息
* multiple参数:true 确认所有消息;false 只确认当前这一条消息
*/
channel.basicAck(tag,false);
/**
* 拒绝消息
* 第二个参数multiple:
* true 确认所有消息;
* false 只确认当前这一条消息
* 第三个参数requeue:
* true 将消息重新放回队列(且后面还有可能消费此消息)
* false 将消息丢弃,如果配置了死信队列则放入死信队列
*
*/
channel.basicNack(tag,false,false);
/**
* 重新投递此消息
* requeue参数:
* false 将消息重新投递给自己
* true 消息放回队列,并尽可能投递给其他消费者
*/
channel.basicRecover(false);
/**
* 拒绝消息
* 第二个参数requeue:true 将消息重新放回队列(且后面还有可能消费此消息)
* false 将消息丢弃,如果配置了死信队列则放入死信队列
*/
channel.basicReject(tag,false);
关于确认模式 acknowledge-mode 有三种模式
RabbitMQ集群
RabbitMQ实现分布式事务
RabbitMQ 有三种模式:单机模式,普通集群模式,镜像集群模式。
单机模式:就是demo级别的,一般就是你本地启动了玩玩儿的,没人生产用单机模式
普通集群模式:意思就是在多台机器上启动多个RabbitMQ实例,每个机器启动一个。
镜像集群模式:这种模式,才是所谓的RabbitMQ的高可用模式,跟普通集群模式不一样的是,你创建的queue,无论
元数据
(元数据指RabbitMQ的配置数据)还是queue里的消息都会存在于多个实例上,然后每次你写消息到queue的时候,都会自动把消息到多个实例的queue里进行消息同步。