前言
注意点
(1)生产者将消息发给交换器的时候,,一般会指定一个RoutingKey (路由键),用来指定这个消息的路由规则,而这个RoutingKey 需要与交换器类型和绑定键(BindingKey) 联合使用,才能最终生效。
:::info
💡下面说的如果 服务器宕机了,指的是RabbitMQ服务器(或者说Broker服务器),可以通过关掉RabbitMQ的cmd窗口或者窗口执行停止命令来实现。
而消费者宕机可通过idea停止运行来实现。
:::
(2)MQ本质是个队列,遵从先入先出的规则。
(3)exchange交换机只路由转发消息至消息队列,不会存储消息。
(4)下图中:发布者和Broker服务器只有一个connection(即TCP连接),多个发布者可复用一个TCP,避免了TCP的开销和维护(因为TCP开销很昂贵的);而多个发布者之间又通过各自的信道(Channel,虚拟连接),起到了发布者之间隔离的作用。
消费者和Broker之间亦是同理。
(5)如果消费者接受的消息是实体类对象,需要将类序列化,如下所示:
一、RabbitMQ 消息确认机制介绍
场景
- 当消息的投送方(即生产者)把消息投递出去,却不知道消息是否投递成功了?如果消息的投送方不管的话,势必对系统的可靠性 造成影响。
- 可如果要保证系统的可靠性,那么消息的投送方,如何知道消息投递成功了?
- 这个就需要消息的确认机制,我们来看下rabbitMQ的消息确认机制是如何实现的。
原理图
RabbitMQ为了防止消息不丢失的情况,可以使用事物消息,但是性能下降很多,为此引入消息确认机制。
如下的RabbotMQ是一种中间件服务器,内部可分为:Exchange交换机(可多个)、Queue(可多个)等,我理解Broker就是RabbitMQ。:::info 🍓消息确认机制又分:生产者消息确认机制、消费者消息确认机制、return消息机制 :::
1、生产者消息确认机制(confirm)
生产者消息确认机制:指生产者将消息投递给对应的Broker中的VHost里面的exchange后,产生的应答,如果exchange不存在返回false,投递成功则返回true。1.1、如何使用生产者消息确认机制
(1)application.yml中配置:
如果你们在配置确认回调,测试发现无法触发回调函数,那么存在原因也许是因为springboot版本导致的配置项不起效,spring:
rabbitmq:
host: localhost
port: 5672
virtual-host: /
username: guest
password: guest
publisher-confirms: true # 启用生产者消息确认机制,默认false。即交换机收到发布者发来的消息后,不管成功还是失败,都会触发回调方法
可以把publisher-confirms: true 替换为 publisher-confirm-type: correlated。
(2)代码:(Correlation [ˌkɒrəˈleɪʃn] 相关性) :::info 🎃发布者发布消息给交换机,不管成功还是失败,都会自动调用confirm()这个回调函数。 :::
package com.thwcompany.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.io.UnsupportedEncodingException;
import java.util.UUID;
import org.springframework.amqp.core.Message;
/**
* Created by IntelliJ IDEA2021.3
* @Author: Tenghw
* @Date: 2022/10/23 00:03
* @Description: 生产者消息确认机制配置:发布者发布消息给交换机
*/
@SuppressWarnings("all")
@Slf4j
@Service
public class ProducerMessageConfimService implements RabbitTemplate.ConfirmCallback {
// 注入rabbitTemplate
private RabbitTemplate rabbitTemplate;
@Autowired
public void setRabbitTemplate(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
/**
* 需要给ConfirmCallback赋值 不然不会走回调方法,默认是null
*/
@PostConstruct
public void init(){
rabbitTemplate.setConfirmCallback(this);
}
//exchangeKey交换机名,routingkey路由键;message为发送的消息
//new CorrelationData(id)主要是用来给发送的消息设置一个唯一id,交换机收到消息和唯一id,2次id一致,即证明是同一条消息
public void sendMessage(String exchangeKey, String routingkey, String message) throws UnsupportedEncodingException {
String id= UUID.randomUUID().toString();
CorrelationData correlationData = new CorrelationData(id);
//需要给ConfirmCallback赋值 不然不会走回调方法,默认是null
//rabbitTemplate.setConfirmCallback(this);
correlationData.setReturnedMessage(new Message(message.getBytes()));
rabbitTemplate.convertAndSend(exchangeKey,routingkey, message,correlationData);
log.info("交换机名称:{},routingkey名称:{},生产者发送给的交换机的消息: {},唯一ID:{}",
exchangeKey,routingkey,message,correlationData.getId());
}
/**
* 此方法用于监听消息是否发送到交换机
* @param correlationData: 对象内部的id 属性,用来标识当前消息的唯一性。correlationData 内含消息内容
* @param ack: 当交换机存在时为true,否则为false;
* @param cause: 发布者发布消息给交换机:若失败,cause为失败原因;若成功,cause=null
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String message = new String(correlationData.getReturnedMessage().getBody());
if(ack){
log.info("消息成功发送到交换机");
log.info("生产者消息推送给的交换机的状态:{}, 唯一ID::{}",ack,correlationData.getId());
}else {
log.info("消息发送到交换机失败");
log.info("生产者消息推送给的交换机的状态:{}, 唯一ID::{},错误原因cause :{}",ack,correlationData.getId(),cause);
log.info("消息内容为{}", message);//可以看做即使失败消息也不会丢失
}
}
}
(3)设置交换机类型、名称,以及设置消息队列名称。两者暂时没有设置排他性、持久化、自动删除。
package com.thwcompany.config1;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
//发布-订阅模式
@Configuration
@SuppressWarnings("all")
public class ProducerMessageExchangeConfig {
@Bean
public DirectExchange directPmcExchange(){
DirectExchange directExchange=new DirectExchange("direct-pmc-exchange");
return directExchange;
}
@Bean
public Queue directPmcQueue() {
Queue queue=new Queue("direct-pmc-queue");
return queue;
}
//binding将交换机和相应队列连起来
@Bean
public Binding bindingorange(){
Binding binding=BindingBuilder.bind(directPmcQueue()).to(directPmcExchange()).with("tenghw123");
return binding;
}
}
(4)测试接口:
@Controller
public class HtmlController {
@Autowired
ProducerMessageConfimService producerMessageConfimService;
@GetMapping(path = "/test2")
@ResponseBody
public Object test2() {
String exchange = "direct-pmc-exchange";
String routingKey = "tenghw123";
Object message = "message: rabbitMQ 生产者消息确认机制... ";
producerMessageConfimService.sendMessage(exchange,routingKey,message);
return message;
}
}
(5)测试
①测试前该队列的状态:Ready=0,Unacked=0,如下图所示:
发出测试请求:
此时idea控制台输出:可以看到自动执行了上面写的confirm()方法。
交换机名称:direct-pmc-exchange,routingkey名称:tenghw123,生产者发送给绑定的交换机的消息: message: rabbitMQ 生产者消息确认机制... ,唯一ID:49261c71-d240-4ac5-87a0-40a80079e676
消息成功发送到交换机
生产者消息推送给的交换机的状态:true, 唯一ID::49261c71-d240-4ac5-87a0-40a80079e676
此时:Ready=1(因为没有设置消费者监听消息队列,所以消息状态为Raeady),Unacked=0,如下图所示:
②将交换机改成错误的,即交换机不存在时
再次发出测试请求,idea控制台输出:可以看到返回false。
交换机名称:direct-pmc-exchange123,routingkey名称:tenghw123,生产者发送给绑定的交换机的消息: message: rabbitMQ 生产者消息确认机制... ,唯一ID:19cbcf49-33ab-424c-b93d-168481a6b72c
消息发送到交换机失败
生产者消息推送给的交换机的状态:false, 唯一ID::19cbcf49-33ab-424c-b93d-168481a6b72c,错误原因cause :channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'direct-pmc-exchange123' in vhost '/', class-id=60, method-id=40)
Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'direct-pmc-exchange123' in vhost '/', class-id=60, method-id=40)
虽然路由键没错,但是交换机都没了,消息也就走不到交换机了,更到不了消息队列了。此时消息队列状态还是:Ready=1,Unacked=0,如下图所示:
1.2、生产者消息确认机制存在的问题
如果发布者发布消息到交换机时发生错误,则自动回调ConfirmCallback接口,但只可以保证消息到交换机这一步不会丢失。
但如果交换机路由消息到队列的过程中出现了问题,消息一样会丢失。比如上面生产者把routingKey写错了,在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的。就会导致消息从交换机路由不到队列中,而消息丢失。
解决方案:
在消息从交换机路由到队列中失败后,回调returnCallBack函数,在returnCallBack回调接口中把失败的消息保存下来,就可以避免消息丢失了。
在回调returnCallBack接口之前,还需为RabbitMQ设置Mandatory标志,只有当该标志为true时,消息由交换机路由到队列失败后,才会回调returnCallBack接口;如果该标志设置false时,消息由交换机路由到队列失败后自动丢弃消息,会导致消息丢失,且默认false;所以如需保证消息不丢失,要打开Mandatory标志。
@PostConstruct
public void init(){
rabbitTemplate.setConfirmCallback(exchangeCallback);
/**
* true:交换机无法将消息进行路由时,会将该消息返回给生产者
* false:如果发现消息无法进行路由,则直接丢弃该消息
*/
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback(ReturnConfirmMessageService);
}
2、return消息确认机制(回退模式)
参考博文:https://www.jianshu.com/p/6e5a9e427afd
上面讲了生产者消息确认机制:即确认生产者是否成功发送消息到交换机。交换机是否发送到具体的消息队列那我们就不知道了。
如果想知道交换机是否将消息发送到队列,就需要用到return消息确认机制:监控交换机是否将消息发送到消息队列。return消息确认机制:消息发送到Exchange后,若Exchange没有找到绑定的消息队列,路由消息失败,才执行return消息确认机制。
如何配置,参考如下:开启Return机制
spring:
#消息队列rabbitMQ配置
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
# 启用return消息确认机制(回退模式),默认false。回退消息,当找不到routing key对应的队列时,是否回退消息
publisher-returns: true
承接上面的1.1的代码示列。
(1)核心代码:
:::info
🎃交换机路由消息给消息队列,只有失败了,才会自动调用returnedMessage()这个回调函数(区别上面的生产者消息确认机制不管成功还是失败,都会调用回调函数confirm() )。
:::
package com.thwcompany.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.UUID;
/**
* Created by IntelliJ IDEA2021.3
* @Author: Tenghw
* @Date: 2022/10/23 20:16
* @Description: 交换机路由消息给消息队列
*/
@Slf4j
@Service
@SuppressWarnings("all")
public class ReturnConfirmMessageService implements RabbitTemplate.ReturnCallback{
@Autowired
private RabbitTemplate rabbitTemplate;
//需要给ReturnCallback赋值 不然不会走回调方法,默认是null
@PostConstruct
public void init(){
rabbitTemplate.setReturnCallback(this);
}
public void sendMessage(String exchange, String routingkey, Object message) {
log.info("exchange交换机= {} ,路由键= {} ,要路由的消息message:{}",exchange,routingkey, message);
CorrelationData correlation = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend(exchange, routingkey, message,correlation);
}
//处理交换机发送消息到消息队列失败,则执行此方法。
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("交换机路由消息到消息队列失败=====》");
log.info("exchange交换机名 = {}",exchange);
log.info("routingKey路由键 = {}",routingKey);
log.info("message要路由的消息 = {}",new String(message.getBody()));//可以看待即使失败消息也不会丢失
log.info("replyCode返回code = {}",replyCode);
log.info("replyText = {}",replyText);
}
}
(2)控制层测试接口:
@GetMapping(path = "/test3")
@ResponseBody
public Object test3() {
String exchange = "direct-pmc-exchange";
String routingKey = "tenghw123";
Object message = "message: rabbitMQ return消息确认机制... ";
returnConfirmMessageService.sendMessage(exchange,routingKey,message);
return message;
}
(3)测试:
①测试正常时:
idea控制台输出:可以看到return正常时,不会去执行returnedMessage()方法;而且由于交换机也被监听了(发布者 —> 交换机 —> 消息队列,到达消息队列的消息也会经过交换机),所以日志也会输出 生产者消息确认机制的日志信息,如下所示:
exchange交换机= direct-pmc-exchange ,路由键= tenghw123 ,要路由的消息message:message: rabbitMQ return消息确认机制... ,correlation.getId()= 297c9eb8-a7cb-4f6d-9a22-ee0d3577434c
//也会输出 生产者消息确认机制的日志信息(即监听交换机的状态)
消息成功发送到交换机
生产者消息推送给的交换机的状态:true, 唯一ID::297c9eb8-a7cb-4f6d-9a22-ee0d3577434c
消息队列:Ready=1,Unacked=0,如下所示:
②测试异常时:为了模拟消息发送到Exchange后,没有找到绑定的消息队列,将下图的路由键由原来的“tenghw123”改为“tenghw123456”,如下图所示:
看下idea控制台信息:
exchange交换机= direct-pmc-exchange ,路由键= tenghw123456 ,要路由的消息message:message: rabbitMQ return消息确认机制... ,correlation.getId()= 41192e4a-ae08-4cd6-b62f-ee0bfed3b575
交换机路由消息到消息队列失败=====》
exchange交换机名 = direct-pmc-exchange
routingKey路由键 = tenghw123456
message要路由的消息 = message: rabbitMQ return消息确认机制...
replyCode返回code = 312
replyText = NO_ROUTE
//也会输出 生产者消息确认机制的日志信息(即监听交换机的状态)
消息成功发送到交换机
生产者消息推送给的交换机的状态:true, 唯一ID::41192e4a-ae08-4cd6-b62f-ee0bfed3b575
消息队列:Ready=1,Unacked=0,因为路由消息失败,所以消息队列不变;如下所示:
3、消费者消息确认机制
消费者消息确认机制:消费者ack消息后【不管是自动还是手工,只要执行了ack操作】,broker服务器中存放该条消息的消息队列会自动删除该条消息;删除是为了防止已消费的消息,被重复消费)。
消费者消息确认机制又分2种:
- 默认的自动消息确认。
- 手工消息确认。
图示的p->b是producer或publish->broker的意思;e->q和q->c同理。
3.1、消费者自动确认消息
消费者消息机制默认是自动确认(所以application.yml中不用特殊设置)的,一旦消费者接收到了消息,就视为自动确认了消息(此时可能消费者还没真正ack该条消息),随即broker服务器就会立即自动移除这个消息,但是该情况下存在如下问题:
🎃如果消费者在处理消息的过程中,出了错,导致最后没有自动执行ack,就没有什么办法重新获取这条消息【因为此时队列已经删除了该条消息】。 🍓如果服务器突然宕机的情况下,此时的消费者接收到消息,但是消费者并没有ack确认签收该消息,这个时候这条消息就会丢失。
上述问题解决方案:所以很多时候,消费者需要手动确认消息:即消费者在消息处理成功后,再手工ack确认消息。
3.2、消费者手工确认消息
如何将消费者的消息自动确认设置成手动确认消息,如下所示:
spring:
#消息队列rabbitMQ配置
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
listener:
#设置消费者需要手动确认消息
simple:
acknowledge-mode: manual
# concurrency: 1 # 指定最小的消费者数量
# max-concurrency: 1 #指定最大的消费者数量
# retry:
# enabled: true # 是否支持重试,默认false
# direct:
# acknowledge-mode: manual,这个我亲测不用加
acknowledge-mode确认模式有3种:( [əkˈnɒlɪdʒ] 单词释义“确认”)
- acknowledge-mode: none ,即自动模式(默认)。
- acknowledge-mode: manual ,即手动模式。
- acknowledge-mode: auto ,即自动模式 (根据侦听器检测是正常返回、还是抛出异常来发出 ack/nack)。
🐳手动模式可以确保我们消费者在没有签收消息的情况下,保证消息不会丢失,只要没有手动ack,则消息始终都是unacked状态。 假若此时服务器宕机的情况下,这时候会将这条消息重新放回队列,变成ready状态。
3.2.1、手工确认消息和手工拒绝消息示列
我们打开RabbitMQ的Channel.class源码,basicAck两个参数分别为:
(1)手动确认签收消息,示列代码:
:::info
参数说明:hannel.basicAck(deliveryTag, true); //第二个参数,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息。false时只确认本次监听到的消息。
:::
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
@RabbitListener(queques={"你要监听的队列名"})
@Service
public class RabbitMQServiceImpl implements RabbitMQService
@RabbitHandler//MessageUtil是自定义的工具类
public void receiveMessage(Message message,MessageUtil messageUtil, Channel channel){
//channel内按顺序自增
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
//是否签收货物,false为非批量签收
channel.basicAck(deliveryTag,false);
} catch (IOException e) {
System.out.println(e);
}
System.out.println("接受到的消息为:"+messageUtil);
}
}
也可写成:上下亲测等价。
(2)手动拒绝签收消息示列代码:下图第3个参数requeue设置为false,即该消息不允许再次放回队列,那么这个消息,即该消息会变成死信;requeue设置为true时,则把被拒绝的消息再次放回消息队列。
或者
channel.basicReject(deliveryTag, true);//第二个参数,true会重新放回队列,所以需要自己根据业务逻辑判断什么时候使用拒绝
4、Ready和unacked状态区别
unacked:单词释义“未确认”。
:::info
比如消息队列A中有一条消息message1,但是此消息队列没有任何一个消费者监听(或者监听队列A的消费者都宕机了),那么此时所有发到消息队列A上的消息的状态都是Ready状态【之前Unacked状态的消息也会变成Ready状态】。
或者有消费者监听该队列,消费者接受消息,但还没来得及ack(不管是设置成自动ack还是手工ack,都有可能没及时ack),此时状态是unacked;然后broker服务器突然宕机了, 这时候会将这条消息重新放回队列,该消息从unacked状态变成ready状态。
——————————————————————————————————-
比如消息队列B中有一条消息message2,此时有若干个消费者监听此消息队列B,但是消费者这边一直未确认签收此message2(可能网络原因,也可能设置成消费者手工ack确认,等待ack中),那么此时message2消息的状态就是unacked。
——————————————————————————————————-
小结:可以笼统的说,Ready状态的消息是没有消费者来监听此消息队列。
:::
4.1、代码示列
在Springboot项目中,设置了消费者手工确认消息模式;现有2个消息队列directqueue1(消费者1号监听)、directqueue2(消费者2号监听),初始状态:Ready:0,Unacked:0,如下所示:
(1)application.yml:
spring:
#消息队列rabbitMQ配置
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
listener:
#设置消费者需要手动确认消息
simple:
acknowledge-mode: manual
(2)DirectExchangeConfig什么消息队列、交换机类型、绑定的路由键:
package com.thwcompany.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
//发布-订阅模式
//direct直连模式的交换机配置,包括一个direct交换机,两个队列,三根网线binding
@Configuration
@SuppressWarnings("all")
public class DirectExchangeConfig {
@Bean
public DirectExchange directExchange(){
DirectExchange directExchange=new DirectExchange("direct");
return directExchange;
}
@Bean
public Queue directQueue1() {
Queue queue=new Queue("directqueue1");
return queue;
}
@Bean
public Queue directQueue2() {
Queue queue=new Queue("directqueue2");
return queue;
}
//3个binding将交换机和相应队列连起来
@Bean
public Binding bindingorange(){
Binding binding=BindingBuilder.bind(directQueue1()).to(directExchange()).with("orange");
return binding;
}
@Bean
public Binding bindingblack(){
Binding binding=BindingBuilder.bind(directQueue2()).to(directExchange()).with("black");
return binding;
}
@Bean
public Binding bindinggreen(){
Binding binding=BindingBuilder.bind(directQueue2()).to(directExchange()).with("green");
return binding;
}
}
(3)监听器—消费者1号和2号分别监听消息队列directqueue1、directqueue2,这里用的发布-订阅模式(即观察者模式)。
消费者1号后台手工ack消息,如下所示:
package com.thwcompany.rabbitMQ.listener;
import com.rabbitmq.client.Channel;
import com.thwcompany.pojo.Mail;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@SuppressWarnings("all")
public class DirectListener1 {
//发布-订阅模式,这里配置消费者1号监听消息队列directqueue1
@RabbitListener(queues = "directqueue1")
public void displayMail(Mail mail, Channel channel, Message message) throws Exception {
System.out.println("direct消费者1号,从directqueue1消息队列监听到的消息是:"+mail.toString());
//消费者1号进行手工确认签收消息
long deliveryTag = message.getMessageProperties().getDeliveryTag();
channel.basicAck(deliveryTag,false);
}
}
消费者2号没有ack代码逻辑,如下所示:
package com.thwcompany.rabbitMQ.listener;
import com.thwcompany.pojo.Mail;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class DirectListener2 {
//发布-订阅模式,这里配置消费者2号监听消息队列directqueue2
@RabbitListener(queues = "directqueue2")
public void displayMail(Mail mail) throws Exception {
System.out.println("direct消费者2号,从directqueue2消息队列监听到的消息是:"+mail.toString());
}
}
(4)发布者发布消息 —>Exchange,然后Exchange路由消息给与交换机绑定的相应消息队列。
package com.thwcompany.service.impl;
import com.thwcompany.pojo.Mail;
import com.thwcompany.service.PublisherModeService;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
@SuppressWarnings("all")
public class PublisherModeServiceImpl implements PublisherModeService {
@Autowired
RabbitTemplate rabbitTemplate;
//参数1:交换机名,参数2:路由键,参数3:发布者发布的消息;mail是我自定义的entity
@Override
public void senddirectMail(Mail mail, String routingkey) {
rabbitTemplate.convertAndSend("direct", routingkey, mail);
}
}
(5)控制层接口:
package com.thwcompany.controller;
import com.thwcompany.pojo.Mail;
import com.thwcompany.pojo.TopicMail;
import com.thwcompany.service.PublisherModeService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.ModelAttribute;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
@Controller
@Slf4j
@SuppressWarnings("all")
public class RabbitMQController {
@Autowired
PublisherModeService publisherModeService;
//发布-订阅模式,用的是directt交换机
@RequestMapping(value="/direct",produces = {"application/json;charset=UTF-8"})
@ResponseBody
public void direct(@ModelAttribute("mail") TopicMail topicMail){
Mail m=new Mail(topicMail.getMailId(),topicMail.getCountry(),topicMail.getWeight());
publisherModeService.senddirectMail(m, topicMail.getRoutingkey());
}
}
(6)前端页面代码就不展示了,反之就是点击页面,发送消息。底层调用的是Controller层的direct()方法。
测试:
向directqueue1(orange)发送1条消息,可以看到消费者1号是从directqueue1中读取的消息,并且由于消费者1号进行了手工ack消息确认,此时RabbitMQ UI面板,Unacked=0,Ready=0;操作如下图所示:
分别向directqueue2(black)、directqueue2(green)发送1条消息,由于消费者2号虽然设置手工确认消息,但是没有手工ack消息确认的代码逻辑(当然也没自动确认消息),此时RabbitMQ UI面板,Unacked=2,Ready=0;操作如下图所示:
测试1:假若此时Broker服务器宕机的情况下:
效果:idea控制台一直报错:监听不到消息队列的信息。
因为Broker服务器宕机,所以RabbitMQ web ui页面访问不了,但是要知道此时:Ready=2,Unacked=0。
当我们再次启动Broker服务器时,观察idea控制台:可以看到消费者2号自动又 监听、拿取到了信息。
此时:Ready=0,Unacked=2,如下图所示:
这里的蓝底大D表示:durable: true,即消息是否持久化。
:::info
小结:验证了手工ack模式下【即使还没执行ack操作】,即使Broker服务器宕机了,消息也不会丢失!
:::
—————————————————————————————————————————————————
测试2:假若此时消费者断开连接的情况下,即停掉IDEA:
消费者断开连接前:Unacked=2,Ready=0,如下图所示:
消费者断开连接后:Unacked=0,Ready=2,如下图所示:
再次启动IDEA(即启动消费者),观察控制台:可以看到消费者2号自动又 监听、拿取到了信息。
此时:Unacked=2,Ready=0,如下图所示:
:::info
小结:验证了手工ack模式下【即使还没执行ack操作】,即使消费者宕机了,消息同样也不会丢失!
即如果消费者断开连接后又连接上了,消息队列会重新投递消息给消费者。
:::
二、死信、死信队列介绍
1、什么是死信、死信队列?
死信,顾名思义就是无法被消费的消息。官方将其翻译为单词Dead Letter。死信,其实这是 RabbitMQ 中一种消息类型,和普通的消息在本质上没有什么区别,更多的是一种业务上的划分。 RabblitMQ的死信队列还是一个普通的消息队列,只不过它是用来接收一些特殊的消息,而这些消息,官网称之为“死信”。 而我们经常说的队列,如无特殊说明,一般就是消息队列(而非死信队列)。
2、消息如何变成死信?
一个消息想要变成死信,官网提供了3种方案:
- 1)如果给消息队列设置了最大容量(x-max-length),当消息队列已经满了,后续再进来的消息就会溢出,无法被消息队列接收,消息就会变成死信。
- 2)消息被消费者reject(拒绝)或者nack(nack单词释义“未确认、无应答”;例如调用 channel.basicReject 或channel.basicNack ),并且requeue设置为false(即该消息不允许再次放回消息队列),那么这个消息,也变成死信。
- 3)如果给消息队列设置了消息的过期时间(x-message-ttl),或者发送消息时设置了当前消息的过期时间,但是在这期间内,消息没有被消费者消费,就会变成死信。
如果消息变成死信后:
- 1)如果消息队列绑定了死信交换机,那么死信就会被扔到咱们的死信交换机,并且会路由到死信队列。
- 2)如果消息队列没有绑定了死信交换机,那么死信就会被直接丢弃。
3、死信的应用
比如大量的定时任务,如果用Quartz这种定时任务框架去实现,是比较麻烦的,咱们需要有一堆的定时任务。
那么如果用死信队列,效果会更加优雅;再或者一些消息:消费失败或未被消费者消费到的消息,也可以基于死信,去做一个留存或者一些补偿的操作。
4、代码示列
4.1、准备工作
(1)创建一个 SpringBoot 项目,添加 RabbitMQ 依赖,并添加需要的配置:
<!--通过spring-rabbit来支持AMQP协议-->
<!--Spring提供的一个统一消息服务的应用层标准高级消息队列协议,使用RabbitMQ只引入这个依赖就够了-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
(2)application.yml的RabbitMQ相关配置:
spring:
#消息队列rabbitMQ配置
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
listener:
#设置消费者需要手动确认消息
simple:
acknowledge-mode: manual
# direct:
# acknowledge-mode: manual,这个我亲测不用加
4.2、具体代码
(1)创建死信交换机和死信队列,并通过路由键绑定:
package com.thwcompany.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* Created by IntelliJ IDEA2021.3
* @Author: Tenghw
* @Date: 2022/10/21 20:00
* @Description:
*/
@Configuration
@SuppressWarnings("all")
public class DeadLetterRabbitMQConfig {
//创建死信交换机
@Bean
public DirectExchange deadLetterExchange(){
return new DirectExchange("dead.letter.exchange",true,false);
}
//创建死信队列
@Bean
public Queue deadLetterQueue() {
return new Queue("dead.letter.queue");
}
// 死信队列和死信交换机通过路由键绑定
@Bean
Binding deadLetterBinding() {
return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with("dead.letter");
}
}
上面的代码也可参考下面别人的写法,我感觉下面的比较优雅一点:
(2)创建正常的业务交换机和业务队列,并绑定;同时为业务队列绑定死信交换机,这样业务队列就可以把死信丢给死信交换机,进而死信交换机会把死信路由给死信队列:
package com.thwcompany.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
/**
* Created by IntelliJ IDEA2021.3
* @Author: Tenghw
* @Date: 2022/10/21 20:32
* @Description:
*/
@Configuration
@SuppressWarnings("all")
public class BusinessRabbitMQConfig {
// 创建业务直连交换机
@Bean
DirectExchange businessExchange() {
return new DirectExchange("business.exchange", true, false);
}
// 创建业务消息队列
@Bean
Queue businessQueue1() {
HashMap<String, Object> args = new HashMap<>();
// 设置死信交换机
args.put("x-dead-letter-exchange", "dead.letter.exchange");
// 设置死信交换机绑定队列的routingKey
args.put("x-dead-letter-routing-key", "dead.letter");
return new Queue("business.queue1", true, false, false, args);
}
// 业务消息队列和业务交换机通过路由键绑定
@Bean
Binding businessBinding1() {
return BindingBuilder.bind(businessQueue1()).to(businessExchange()).with("business1");
}
}
(3)发布者发布消息给业务交换机,业务交换机路由消息给业务队列:
package com.thwcompany.service;
import cn.hutool.core.date.DateUtil;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* Created by IntelliJ IDEA2021.3
* @Author: Tenghw
* @Date: 2022/10/21 20:44
* @Description:
*/
@Service
public class BusinessSendService {
@Autowired
RabbitTemplate rabbitTemplate;
public void send(String routingKey, String message) {
rabbitTemplate.convertAndSend("business.exchange", routingKey, message);
String format = DateUtil.format(DateUtil.date(), "yyyy-MM-dd HH:mm:ss.SSS");
System.out.println(format+" >>> 生产者发送的业务消息:" + message);
}
}
(4)监听业务队列的消费者对消息进行reject或nack,使该消息成为死信:
package com.thwcompany.service;
import cn.hutool.core.date.DateUtil;
import com.rabbitmq.client.Channel;
import lombok.SneakyThrows;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
/**
* Created by IntelliJ IDEA2021.3
* @Author: Tenghw
* @Date: 2022/10/21 20:38
* @Description:
*/
@RabbitListener(queues = "business.queue1")
@Service
public class BusinessReceiveService {
@SneakyThrows
@RabbitHandler
public void receive(String msg, Channel channel, Message message) {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
//接收消息
//channel.basicAck(deliveryTag,false);
//System.out.println(" >>> 消费者拒绝的业务消息:" + msg);
// 拒绝消息
channel.basicNack(deliveryTag, false, false);
String format = DateUtil.format(DateUtil.date(), "yyyy-MM-dd HH:mm:ss.SSS");
System.out.println(format+" >>> 消费者拒绝的业务消息:" + msg);
}
}
最终死信,经死信交换机,最后到了死信队列,而死信队列我没设置监听此队列的消费者。
所以此时死信队列状态:Ready:1,Unack:0。
END