一、延时队列简介
1.1 延时队列简介
延时队列顾名思义,即放置在该队列里面的消息是不需要立即消费的,而是等待一段时间之后取出消费。
1.2 延时队列应用场景
场景一:在订单系统中,一个用户下单之后通常有30分钟的时间进行支付,如果30分钟之内没有支付成功,那么这个订单将进行一场处理。这是就可以使用延时队列将订单信息发送到延时队列。
场景二:用户希望通过手机远程遥控家里的智能设备在指定的时间进行工作。这时候就可以将用户指令发送到延时队列,当指令设定的时间到了再将指令推送到智能设备。
Rabbitmq实现延时队列一般而言有两种形式:
第一种方式:利用两个特性: Time To Live(TTL)、Dead Letter Exchanges(DLX)
第二种方式:利用rabbitmq中的插件x-delay-message
**
1.3 DLX(死信队列)
死信队列,英文缩写: DLX 。Dead Letter Exchange (死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。
消息成为死信的三种情况:
- 队列消息长度到达限制;
- 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;
- 原队列存在消息过期设置,消息到达超时时间未被消费;
二、利用TTL,DLX(死信队列)实现延时队列
2.1 RabbitMQConfig
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitMQConfig {
//延时队列
@Bean
public Queue queue2() {
Map<String,Object> args = new HashMap<>();
//延时,单位ms
args.put("x-message-ttl",10000);
//延时结束后将消息发给指定死信队列
args.put("x-dead-letter-exchange","fanoutExchange");
//参数3:需设为false,不然运行结束,队列就会被删除
return new Queue("queue2",true,false,false,args);
}
//延时队列交换机
@Bean
public DirectExchange directExchange2() {
return new DirectExchange("directExchange2",true,false);
}
//延时队列绑定路由键
@Bean
public Binding bindingDirect2() {
return BindingBuilder.bind(queue2()).to(directExchange2()).with("two");
}
//死信队列
@Bean
public Queue queue3() {
return new Queue("queue3");
}
//死信队列交换机
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange",true,false);
}
//死信队列绑定交换机
@Bean
public Binding bindingFanout() {
return BindingBuilder.bind(queue3()).to(fanoutExchange());
}
}
2.2 测试类
@SpringBootTest
@RunWith(SpringRunner.class)
class ProducerApplicationTests {
//使用RabbitTemplate,这提供了接收/发送等等方法
@Resource
RabbitTemplate rabbitTemplate;
@Test
void test2() {
rabbitTemplate.convertAndSend("directExchange2","two","Hello2");
}
}
2.3 查看结果
消息发过去10S内:
消息在延时队列中
10S过后:
消息自动转发给死信队列,所以消费者监听死信队列就能够达到效果
三、消息确认(消息可靠投递)
使用了 RabbitMQ 以后,业务链路明显变长了,虽然做到了系统间的解耦,但可能造成消息丢失的场景也增加了。
例如:
- 消息生产者 - > rabbitmq服务器(消息发送失败)
- rabbitmq服务器自身故障导致消息丢失
- 消息消费者 - > rabbitmq服务(消费消息失败)
开启消息确认机制以后,尽管很大程度上保证了消息的准确送达,但由于频繁的确认交互,rabbitmq 整体效率变低,吞吐量下降严重。
3.1 环境搭建
3.1.1 配置中开启 发送端和 消费端 的消息确认
消费端application.yml:
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual
生产端application.yml:
spring:
rabbitmq:
publisher-confirm-type: simple
publisher-returns: true
template:
mandatory: true
3.1.2 生产端RabbitMQConfig
@Configuration
public class RabbitMQConfig {
//队列
@Bean
public Queue queue1() {
return new Queue("queue1");
}
//Direct交换机,可指定其他类型
@Bean
public DirectExchange directExchange() {
return new DirectExchange("directExchange",true,false);
}
//绑定 将队列和交换机绑定, 并设置路由键:one
@Bean
public Binding bindingDirect() {
return BindingBuilder.bind(queue1()).to(directExchange()).with("one");
}
注意:DirectExchange(“directExchange”,true,false) 第三个参数autoDelete需要设置为false
**
rabbitmq的消息确认分为两部分:发送消息确认 和 消息接收确认。
3.2 发送消息确认
发送消息确认:用来确认生产者 Producer 将消息发送到 Broker ,Broker 上的交换机 Exchange 再投递给队列 Queue的过程中,消息是否成功投递。
- 消息从 Producer 到 Rabbitmq Broker有一个 ConfirmCallback 确认模式。
- 消息从 Exchange 到 Queue 投递失败有一个 ReturnCallback 退回模式。
我们可以利用这两个Callback来确保消息的100%送达。
3.2.1 ConfirmCallback确认模式
消息只要被 Rabbitmq Broker 接收到就会触发 ConfirmCallback 回调 。**
@Slf4j
@Component
public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if(!ack){
log.info("消息发送异常!");
}else {
log.info("发送者已经收到确认,correlationData={} ,ack={}, cause={}", correlationData.getId(), ack, cause);
}
}
}
实现接口 ConfirmCallback ,重写其confirm()方法,方法内有三个参数correlationData、ack、cause。
- correlationData:对象内部只有一个 id 属性,用来表示当前消息的唯一性。
- ack:消息投递到broker 的状态,true表示成功。
- cause:表示投递失败的原因。
但消息被 broker 接收到只能表示已经到达 MQ服务器,并不能保证消息一定会被投递到目标 queue 里。所以接下来需要用到 returnCallback 。
3.2.2 ReturnCallback 退回模式
如果消息未能投递到目标 Queue 里将触发回调 ReturnCallback ,一旦向 Queue 投递消息未成功,这里一般会记录下当前消息的详细投递数据,方便后续做重发或者补偿等操作。
@Slf4j
@Component
public class ReturnCallbackService implements RabbitTemplate.ReturnCallback {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("returnedMessage ===> replyCode={} ,replyText={} ,exchange={} ,routingKey={}", replyCode, replyText, exchange, routingKey);
}
}
实现接口ReturnCallback,重写 returnedMessage() 方法,方法有五个参数message(消息体)、replyCode(响应code)、replyText(响应内容)、exchange(交换机)、routingKey(队列)。
3.2.3 测试类
@SpringBootTest
@RunWith(SpringRunner.class)
class ProducerApplicationTests {
//使用RabbitTemplate,这提供了接收/发送等等方法
@Resource
private RabbitTemplate rabbitTemplate;
@Resource
private ConfirmCallbackService callbackService;
@Resource
private ReturnCallbackService returnCallbackService;
@Test
void sendMessage(){
/**
* 确保消息发送失败后可以重新返回到队列中
* 注意:yml需要配置 publisher-returns: true
*/
rabbitTemplate.setMandatory(true);
/**
* 消费者确认收到消息后,手动ack回执回调处理
*/
rabbitTemplate.setConfirmCallback(callbackService);
/**
* 消息投递到队列失败回调处理
*/
rabbitTemplate.setReturnCallback(returnCallbackService);
/**
* 发送消息
*/
rabbitTemplate.convertAndSend("directExchange","one","Hello",message -> {
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return message;
},new CorrelationData(UUID.randomUUID().toString()));
}
}
3.3 消费端确认消息
3.3.1 消费端未确认消息
如果消费端不修改代码,消息接收后,消息会进入Unacked(未确认)中,并且,每一次重启消费端都能够再次接收该消息。
3.3.2 修改消费端代码
使用@RabbitHandler注解标注的方法要增加 channel(信道)、message 两个参数
Receiver:
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Slf4j
@Component
@RabbitListener(queues = "queue1")
public class Receiver {
@RabbitHandler
public void process(Message message, Channel channel, String str) throws IOException {
//获得投递序号
long tag = message.getMessageProperties().getDeliveryTag();
try{
log.info("收到的消息为:" + str);
//业务代码
//手动ack
channel.basicAck(tag,false);
}catch (Exception e){
if(message.getMessageProperties().getRedelivered()){
log.info("消息已重复处理失败,拒绝再次接收...");
//拒绝消息
channel.basicReject(tag,false);
}else {
log.info("消息即将再次返回队列处理");
channel.basicNack(tag,false,true);
}
}
}
}
此时重新启动消费端,就会接收消息并确认,然后将消息从队列中删除
消费消息有三种回执方法,我们来分析一下每种方法的含义。
1、basicAck
basicAck:表示成功确认,使用此回执方法后,消息会被rabbitmq broker删除。
void basicAck(long deliveryTag, boolean multiple)
deliveryTag:表示消息投递序号,每次消费消息或者消息重新投递后,deliveryTag都会增加。手动消息确认模式下,我们可以对指定deliveryTag的消息进行ack、nack、reject等操作。
multiple:是否批量确认,值为 true 则会一次性 ack所有小于当前消息 deliveryTag 的消息。
举例: 假设我先发送三条消息deliveryTag分别是5、6、7,可它们都没有被确认,当我发第四条消息此时deliveryTag为8,multiple设置为 true,会将5、6、7、8的消息全部进行确认。
2、basicNack
basicNack :表示失败确认,一般在消费消息业务异常时用到此方法,可以将消息重新投递入队列。
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
deliveryTag:表示消息投递序号。
multiple:是否批量确认。
requeue:值为 true 消息将重新入队列。
3、basicReject
basicReject:拒绝消息,与basicNack区别在于不能进行批量操作,其他用法很相似。
void basicReject(long deliveryTag, boolean requeue)
deliveryTag:表示消息投递序号。
requeue:值为 true 消息将重新入队列。
四、消费端削峰限流
4.1 举例
用户端发起下单操作
服务端完成秒杀业务逻辑(库存检查,库存冻结,余额检查,余额冻结,订单生成,余额扣减,库存扣减,生成流水,余额解冻,库存解冻)
用户端下单业务简单,每秒发起了5000个请求,服务端秒杀业务复杂,每秒只能处理1000个请求,很有可能用户端不限速的下单,导致服务端A系统被压垮,引发雪崩。
为了避免雪崩,常见的优化方案有两种:
1)业务用户端队列缓冲,限速发送
2)业务服务端队列缓冲,限速执行
这里讨论的是服务端限速执行
4.2 RabbitMQ如何实现
rabbitmq提供了一种服务质量保障功能,即在非自动确认消息的前提下,如果一定数目的消息未被确认,不进行消费新的消息。
Consumer限流机制: