2022年10月21日21:20:35

前言

注意点

(1)生产者将消息发给交换器的时候,,一般会指定一个RoutingKey (路由键),用来指定这个消息的路由规则,而这个RoutingKey 需要与交换器类型和绑定键(BindingKey) 联合使用,才能最终生效。

:::info 💡下面说的如果 服务器宕机了,指的是RabbitMQ服务器(或者说Broker服务器),可以通过关掉RabbitMQ的cmd窗口或者窗口执行停止命令来实现。
而消费者宕机可通过idea停止运行来实现。 :::

(2)MQ本质是个队列,遵从先入先出的规则。

(3)exchange交换机只路由转发消息至消息队列,不会存储消息。

(4)下图中:发布者和Broker服务器只有一个connection(即TCP连接),多个发布者可复用一个TCP,避免了TCP的开销和维护(因为TCP开销很昂贵的);而多个发布者之间又通过各自的信道(Channel,虚拟连接),起到了发布者之间隔离的作用。
消费者和Broker之间亦是同理。
image.png
(5)如果消费者接受的消息是实体类对象,需要将类序列化,如下所示:
image.png
image.png
image.png


一、RabbitMQ 消息确认机制介绍

场景

  • 当消息的投送方(即生产者)把消息投递出去,却不知道消息是否投递成功了?如果消息的投送方不管的话,势必对系统的可靠性 造成影响。
  • 可如果要保证系统的可靠性,那么消息的投送方,如何知道消息投递成功了?
  • 这个就需要消息的确认机制,我们来看下rabbitMQ的消息确认机制是如何实现的。

    原理图

    RabbitMQ为了防止消息不丢失的情况,可以使用事物消息,但是性能下降很多,为此引入消息确认机制。
    如下的RabbotMQ是一种中间件服务器,内部可分为:Exchange交换机(可多个)、Queue(可多个)等,我理解Broker就是RabbitMQ。
    image.png :::info 🍓消息确认机制又分:生产者消息确认机制、消费者消息确认机制、return消息机制 :::

    1、生产者消息确认机制(confirm)

    生产者消息确认机制:指生产者将消息投递给对应的Broker中的VHost里面的exchange后,产生的应答,如果exchange不存在返回false,投递成功则返回true。
    image.png

    1.1、如何使用生产者消息确认机制

    (1)application.yml中配置:
    1. spring:
    2. rabbitmq:
    3. host: localhost
    4. port: 5672
    5. virtual-host: /
    6. username: guest
    7. password: guest
    8. publisher-confirms: true # 启用生产者消息确认机制,默认false。即交换机收到发布者发来的消息后,不管成功还是失败,都会触发回调方法
    如果你们在配置确认回调,测试发现无法触发回调函数,那么存在原因也许是因为springboot版本导致的配置项不起效,
    可以把publisher-confirms: true 替换为 publisher-confirm-type: correlated。

(2)代码:(Correlation [ˌkɒrəˈleɪʃn] 相关性) :::info 🎃发布者发布消息给交换机,不管成功还是失败,都会自动调用confirm()这个回调函数。 :::

  1. package com.thwcompany.service;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4. import org.springframework.amqp.rabbit.support.CorrelationData;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.stereotype.Service;
  7. import javax.annotation.PostConstruct;
  8. import java.io.UnsupportedEncodingException;
  9. import java.util.UUID;
  10. import org.springframework.amqp.core.Message;
  11. /**
  12. * Created by IntelliJ IDEA2021.3
  13. * @Author: Tenghw
  14. * @Date: 2022/10/23 00:03
  15. * @Description: 生产者消息确认机制配置:发布者发布消息给交换机
  16. */
  17. @SuppressWarnings("all")
  18. @Slf4j
  19. @Service
  20. public class ProducerMessageConfimService implements RabbitTemplate.ConfirmCallback {
  21. // 注入rabbitTemplate
  22. private RabbitTemplate rabbitTemplate;
  23. @Autowired
  24. public void setRabbitTemplate(RabbitTemplate rabbitTemplate) {
  25. this.rabbitTemplate = rabbitTemplate;
  26. }
  27. /**
  28. * 需要给ConfirmCallback赋值 不然不会走回调方法,默认是null
  29. */
  30. @PostConstruct
  31. public void init(){
  32. rabbitTemplate.setConfirmCallback(this);
  33. }
  34. //exchangeKey交换机名,routingkey路由键;message为发送的消息
  35. //new CorrelationData(id)主要是用来给发送的消息设置一个唯一id,交换机收到消息和唯一id,2次id一致,即证明是同一条消息
  36. public void sendMessage(String exchangeKey, String routingkey, String message) throws UnsupportedEncodingException {
  37. String id= UUID.randomUUID().toString();
  38. CorrelationData correlationData = new CorrelationData(id);
  39. //需要给ConfirmCallback赋值 不然不会走回调方法,默认是null
  40. //rabbitTemplate.setConfirmCallback(this);
  41. correlationData.setReturnedMessage(new Message(message.getBytes()));
  42. rabbitTemplate.convertAndSend(exchangeKey,routingkey, message,correlationData);
  43. log.info("交换机名称:{},routingkey名称:{},生产者发送给的交换机的消息: {},唯一ID:{}",
  44. exchangeKey,routingkey,message,correlationData.getId());
  45. }
  46. /**
  47. * 此方法用于监听消息是否发送到交换机
  48. * @param correlationData: 对象内部的id 属性,用来标识当前消息的唯一性。correlationData 内含消息内容
  49. * @param ack: 当交换机存在时为true,否则为false;
  50. * @param cause: 发布者发布消息给交换机:若失败,cause为失败原因;若成功,cause=null
  51. */
  52. @Override
  53. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  54. String message = new String(correlationData.getReturnedMessage().getBody());
  55. if(ack){
  56. log.info("消息成功发送到交换机");
  57. log.info("生产者消息推送给的交换机的状态:{}, 唯一ID::{}",ack,correlationData.getId());
  58. }else {
  59. log.info("消息发送到交换机失败");
  60. log.info("生产者消息推送给的交换机的状态:{}, 唯一ID::{},错误原因cause :{}",ack,correlationData.getId(),cause);
  61. log.info("消息内容为{}", message);//可以看做即使失败消息也不会丢失
  62. }
  63. }
  64. }

(3)设置交换机类型、名称,以及设置消息队列名称。两者暂时没有设置排他性、持久化、自动删除。

  1. package com.thwcompany.config1;
  2. import org.springframework.amqp.core.Binding;
  3. import org.springframework.amqp.core.BindingBuilder;
  4. import org.springframework.amqp.core.DirectExchange;
  5. import org.springframework.amqp.core.Queue;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. //发布-订阅模式
  9. @Configuration
  10. @SuppressWarnings("all")
  11. public class ProducerMessageExchangeConfig {
  12. @Bean
  13. public DirectExchange directPmcExchange(){
  14. DirectExchange directExchange=new DirectExchange("direct-pmc-exchange");
  15. return directExchange;
  16. }
  17. @Bean
  18. public Queue directPmcQueue() {
  19. Queue queue=new Queue("direct-pmc-queue");
  20. return queue;
  21. }
  22. //binding将交换机和相应队列连起来
  23. @Bean
  24. public Binding bindingorange(){
  25. Binding binding=BindingBuilder.bind(directPmcQueue()).to(directPmcExchange()).with("tenghw123");
  26. return binding;
  27. }
  28. }

(4)测试接口:

  1. @Controller
  2. public class HtmlController {
  3. @Autowired
  4. ProducerMessageConfimService producerMessageConfimService;
  5. @GetMapping(path = "/test2")
  6. @ResponseBody
  7. public Object test2() {
  8. String exchange = "direct-pmc-exchange";
  9. String routingKey = "tenghw123";
  10. Object message = "message: rabbitMQ 生产者消息确认机制... ";
  11. producerMessageConfimService.sendMessage(exchange,routingKey,message);
  12. return message;
  13. }
  14. }

(5)测试
①测试前该队列的状态:Ready=0,Unacked=0,如下图所示:
image.png
发出测试请求:
image.png
此时idea控制台输出:可以看到自动执行了上面写的confirm()方法

  1. 交换机名称:direct-pmc-exchangeroutingkey名称:tenghw123,生产者发送给绑定的交换机的消息: message: rabbitMQ 生产者消息确认机制... ,唯一ID49261c71-d240-4ac5-87a0-40a80079e676
  2. 消息成功发送到交换机
  3. 生产者消息推送给的交换机的状态:true, 唯一ID::49261c71-d240-4ac5-87a0-40a80079e676

此时:Ready=1(因为没有设置消费者监听消息队列,所以消息状态为Raeady),Unacked=0,如下图所示:
image.png

②将交换机改成错误的,即交换机不存在时
image.png
再次发出测试请求,idea控制台输出:可以看到返回false。

  1. 交换机名称:direct-pmc-exchange123routingkey名称:tenghw123,生产者发送给绑定的交换机的消息: message: rabbitMQ 生产者消息确认机制... ,唯一ID19cbcf49-33ab-424c-b93d-168481a6b72c
  2. 消息发送到交换机失败
  3. 生产者消息推送给的交换机的状态:false, 唯一ID::19cbcf49-33ab-424c-b93d-168481a6b72c,错误原因cause channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'direct-pmc-exchange123' in vhost '/', class-id=60, method-id=40)
  4. Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'direct-pmc-exchange123' in vhost '/', class-id=60, method-id=40)

虽然路由键没错,但是交换机都没了,消息也就走不到交换机了,更到不了消息队列了。此时消息队列状态还是:Ready=1,Unacked=0,如下图所示:
image.png

1.2、生产者消息确认机制存在的问题

如果发布者发布消息到交换机时发生错误,则自动回调ConfirmCallback接口,但只可以保证消息到交换机这一步不会丢失。
但如果交换机路由消息到队列的过程中出现了问题,消息一样会丢失。比如上面生产者把routingKey写错了,在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的。就会导致消息从交换机路由不到队列中,而消息丢失。
解决方案:
在消息从交换机路由到队列中失败后,回调returnCallBack函数,在returnCallBack回调接口中把失败的消息保存下来,就可以避免消息丢失了。
在回调returnCallBack接口之前,还需为RabbitMQ设置Mandatory标志,只有当该标志为true时,消息由交换机路由到队列失败后,才会回调returnCallBack接口;如果该标志设置false时,消息由交换机路由到队列失败后自动丢弃消息,会导致消息丢失,且默认false;所以如需保证消息不丢失,要打开Mandatory标志。

  1. @PostConstruct
  2. public void init(){
  3. rabbitTemplate.setConfirmCallback(exchangeCallback);
  4. /**
  5. * true:交换机无法将消息进行路由时,会将该消息返回给生产者
  6. * false:如果发现消息无法进行路由,则直接丢弃该消息
  7. */
  8. rabbitTemplate.setMandatory(true);
  9. rabbitTemplate.setReturnCallback(ReturnConfirmMessageService);
  10. }

2、return消息确认机制(回退模式)

参考博文:https://www.jianshu.com/p/6e5a9e427afd
上面讲了生产者消息确认机制:即确认生产者是否成功发送消息到交换机。交换机是否发送到具体的消息队列那我们就不知道了
如果想知道交换机是否将消息发送到队列,就需要用到return消息确认机制:监控交换机是否将消息发送到消息队列。return消息确认机制:消息发送到Exchange后,若Exchange没有找到绑定的消息队列,路由消息失败,才执行return消息确认机制。
image.png
如何配置,参考如下:开启Return机制

  1. spring:
  2. #消息队列rabbitMQ配置
  3. rabbitmq:
  4. host: localhost
  5. port: 5672
  6. username: guest
  7. password: guest
  8. virtual-host: /
  9. # 启用return消息确认机制(回退模式),默认false。回退消息,当找不到routing key对应的队列时,是否回退消息
  10. publisher-returns: true

承接上面的1.1的代码示列。
(1)核心代码: :::info 🎃交换机路由消息给消息队列,只有失败了,才会自动调用returnedMessage()这个回调函数(区别上面的生产者消息确认机制不管成功还是失败,都会调用回调函数confirm() ) :::

  1. package com.thwcompany.service;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.amqp.core.Message;
  4. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  5. import org.springframework.amqp.rabbit.support.CorrelationData;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.stereotype.Service;
  8. import javax.annotation.PostConstruct;
  9. import java.util.UUID;
  10. /**
  11. * Created by IntelliJ IDEA2021.3
  12. * @Author: Tenghw
  13. * @Date: 2022/10/23 20:16
  14. * @Description: 交换机路由消息给消息队列
  15. */
  16. @Slf4j
  17. @Service
  18. @SuppressWarnings("all")
  19. public class ReturnConfirmMessageService implements RabbitTemplate.ReturnCallback{
  20. @Autowired
  21. private RabbitTemplate rabbitTemplate;
  22. //需要给ReturnCallback赋值 不然不会走回调方法,默认是null
  23. @PostConstruct
  24. public void init(){
  25. rabbitTemplate.setReturnCallback(this);
  26. }
  27. public void sendMessage(String exchange, String routingkey, Object message) {
  28. log.info("exchange交换机= {} ,路由键= {} ,要路由的消息message:{}",exchange,routingkey, message);
  29. CorrelationData correlation = new CorrelationData(UUID.randomUUID().toString());
  30. rabbitTemplate.convertAndSend(exchange, routingkey, message,correlation);
  31. }
  32. //处理交换机发送消息到消息队列失败,则执行此方法。
  33. @Override
  34. public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
  35. log.info("交换机路由消息到消息队列失败=====》");
  36. log.info("exchange交换机名 = {}",exchange);
  37. log.info("routingKey路由键 = {}",routingKey);
  38. log.info("message要路由的消息 = {}",new String(message.getBody()));//可以看待即使失败消息也不会丢失
  39. log.info("replyCode返回code = {}",replyCode);
  40. log.info("replyText = {}",replyText);
  41. }
  42. }

(2)控制层测试接口:

  1. @GetMapping(path = "/test3")
  2. @ResponseBody
  3. public Object test3() {
  4. String exchange = "direct-pmc-exchange";
  5. String routingKey = "tenghw123";
  6. Object message = "message: rabbitMQ return消息确认机制... ";
  7. returnConfirmMessageService.sendMessage(exchange,routingKey,message);
  8. return message;
  9. }

(3)测试:
image.png
①测试正常时:
idea控制台输出:可以看到return正常时,不会去执行returnedMessage()方法;而且由于交换机也被监听了(发布者 —> 交换机 —> 消息队列,到达消息队列的消息也会经过交换机),所以日志也会输出 生产者消息确认机制的日志信息,如下所示:

  1. exchange交换机= direct-pmc-exchange ,路由键= tenghw123 ,要路由的消息message:message: rabbitMQ return消息确认机制... ,correlation.getId()= 297c9eb8-a7cb-4f6d-9a22-ee0d3577434c
  2. //也会输出 生产者消息确认机制的日志信息(即监听交换机的状态)
  3. 消息成功发送到交换机
  4. 生产者消息推送给的交换机的状态:true, 唯一ID::297c9eb8-a7cb-4f6d-9a22-ee0d3577434c

消息队列:Ready=1,Unacked=0,如下所示:
image.png

测试异常时:为了模拟消息发送到Exchange后,没有找到绑定的消息队列,将下图的路由键由原来的“tenghw123”改为“tenghw123456”,如下图所示:
image.png
看下idea控制台信息:

  1. exchange交换机= direct-pmc-exchange ,路由键= tenghw123456 ,要路由的消息message:message: rabbitMQ return消息确认机制... ,correlation.getId()= 41192e4a-ae08-4cd6-b62f-ee0bfed3b575
  2. 交换机路由消息到消息队列失败=====》
  3. exchange交换机名 = direct-pmc-exchange
  4. routingKey路由键 = tenghw123456
  5. message要路由的消息 = message: rabbitMQ return消息确认机制...
  6. replyCode返回code = 312
  7. replyText = NO_ROUTE
  8. //也会输出 生产者消息确认机制的日志信息(即监听交换机的状态)
  9. 消息成功发送到交换机
  10. 生产者消息推送给的交换机的状态:true, 唯一ID::41192e4a-ae08-4cd6-b62f-ee0bfed3b575

消息队列:Ready=1,Unacked=0,因为路由消息失败,所以消息队列不变;如下所示:
image.png


3、消费者消息确认机制

消费者消息确认机制:消费者ack消息后【不管是自动还是手工,只要执行了ack操作】,broker服务器中存放该条消息的消息队列会自动删除该条消息;删除是为了防止已消费的消息,被重复消费)。
消费者消息确认机制又分2种:

  • 默认的自动消息确认。
  • 手工消息确认。

图示的p->b是producer或publish->broker的意思;e->q和q->c同理。
image.png

3.1、消费者自动确认消息

消费者消息机制默认是自动确认(所以application.yml中不用特殊设置的,一旦消费者接收到了消息,就视为自动确认了消息(此时可能消费者还没真正ack该条消息),随即broker服务器就会立即自动移除这个消息,但是该情况下存在如下问题:

🎃如果消费者在处理消息的过程中,出了错,导致最后没有自动执行ack,就没有什么办法重新获取这条消息【因为此时队列已经删除了该条消息】。 🍓如果服务器突然宕机的情况下,此时的消费者接收到消息,但是消费者并没有ack确认签收该消息,这个时候这条消息就会丢失。

上述问题解决方案:所以很多时候,消费者需要手动确认消息:即消费者在消息处理成功后,再手工ack确认消息。


3.2、消费者手工确认消息

如何将消费者的消息自动确认设置成手动确认消息,如下所示:

  1. spring:
  2. #消息队列rabbitMQ配置
  3. rabbitmq:
  4. host: localhost
  5. port: 5672
  6. username: guest
  7. password: guest
  8. virtual-host: /
  9. listener:
  10. #设置消费者需要手动确认消息
  11. simple:
  12. acknowledge-mode: manual
  13. # concurrency: 1 # 指定最小的消费者数量
  14. # max-concurrency: 1 #指定最大的消费者数量
  15. # retry:
  16. # enabled: true # 是否支持重试,默认false
  17. # direct:
  18. # acknowledge-mode: manual,这个我亲测不用加

acknowledge-mode确认模式有3种:( [əkˈnɒlɪdʒ] 单词释义“确认”

  • acknowledge-mode: none ,即自动模式(默认)。
  • acknowledge-mode: manual ,即手动模式。
  • acknowledge-mode: auto ,即自动模式 (根据侦听器检测是正常返回、还是抛出异常来发出 ack/nack)。

    🐳手动模式可以确保我们消费者在没有签收消息的情况下,保证消息不会丢失,只要没有手动ack,则消息始终都是unacked状态。 假若此时服务器宕机的情况下,这时候会将这条消息重新放回队列,变成ready状态


3.2.1、手工确认消息和手工拒绝消息示列

我们打开RabbitMQ的Channel.class源码,basicAck两个参数分别为:
image.png
(1)手动确认签收消息,示列代码
image.png :::info 参数说明:hannel.basicAck(deliveryTag, true); //第二个参数,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息。false时只确认本次监听到的消息。 :::

  1. import com.rabbitmq.client.Channel;
  2. import org.springframework.amqp.core.Message;
  3. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. import org.springframework.stereotype.Service;
  6. @RabbitListener(queques={"你要监听的队列名"})
  7. @Service
  8. public class RabbitMQServiceImpl implements RabbitMQService
  9. @RabbitHandler//MessageUtil是自定义的工具类
  10. public void receiveMessage(Message message,MessageUtil messageUtil, Channel channel){
  11. //channel内按顺序自增
  12. long deliveryTag = message.getMessageProperties().getDeliveryTag();
  13. try {
  14. //是否签收货物,false为非批量签收
  15. channel.basicAck(deliveryTag,false);
  16. } catch (IOException e) {
  17. System.out.println(e);
  18. }
  19. System.out.println("接受到的消息为:"+messageUtil);
  20. }
  21. }

也可写成:上下亲测等价。
image.png

(2)手动拒绝签收消息示列代码:下图第3个参数requeue设置为false,即该消息不允许再次放回队列,那么这个消息,即该消息会变成死信;requeue设置为true时,则把被拒绝的消息再次放回消息队列。
image.png
或者

  1. channel.basicReject(deliveryTag, true);//第二个参数,true会重新放回队列,所以需要自己根据业务逻辑判断什么时候使用拒绝

4、Ready和unacked状态区别

unacked:单词释义“未确认”。 :::info 比如消息队列A中有一条消息message1,但是此消息队列没有任何一个消费者监听(或者监听队列A的消费者都宕机了),那么此时所有发到消息队列A上的消息的状态都是Ready状态【之前Unacked状态的消息也会变成Ready状态】
或者有消费者监听该队列,消费者接受消息,但还没来得及ack(不管是设置成自动ack还是手工ack,都有可能没及时ack),此时状态是unacked;然后broker服务器突然宕机了, 这时候会将这条消息重新放回队列,该消息从unacked状态变成ready状态
——————————————————————————————————-
比如消息队列B中有一条消息message2,此时有若干个消费者监听此消息队列B,但是消费者这边一直未确认签收此message2(可能网络原因,也可能设置成消费者手工ack确认,等待ack中),那么此时message2消息的状态就是unacked。
——————————————————————————————————-
小结:可以笼统的说,Ready状态的消息是没有消费者来监听此消息队列。 :::

4.1、代码示列

在Springboot项目中,设置了消费者手工确认消息模式;现有2个消息队列directqueue1(消费者1号监听)、directqueue2(消费者2号监听),初始状态:Ready:0,Unacked:0,如下所示:
image.png
(1)application.yml:

  1. spring:
  2. #消息队列rabbitMQ配置
  3. rabbitmq:
  4. host: localhost
  5. port: 5672
  6. username: guest
  7. password: guest
  8. virtual-host: /
  9. listener:
  10. #设置消费者需要手动确认消息
  11. simple:
  12. acknowledge-mode: manual

(2)DirectExchangeConfig什么消息队列、交换机类型、绑定的路由键:

  1. package com.thwcompany.config;
  2. import org.springframework.amqp.core.Binding;
  3. import org.springframework.amqp.core.BindingBuilder;
  4. import org.springframework.amqp.core.DirectExchange;
  5. import org.springframework.amqp.core.Queue;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. //发布-订阅模式
  9. //direct直连模式的交换机配置,包括一个direct交换机,两个队列,三根网线binding
  10. @Configuration
  11. @SuppressWarnings("all")
  12. public class DirectExchangeConfig {
  13. @Bean
  14. public DirectExchange directExchange(){
  15. DirectExchange directExchange=new DirectExchange("direct");
  16. return directExchange;
  17. }
  18. @Bean
  19. public Queue directQueue1() {
  20. Queue queue=new Queue("directqueue1");
  21. return queue;
  22. }
  23. @Bean
  24. public Queue directQueue2() {
  25. Queue queue=new Queue("directqueue2");
  26. return queue;
  27. }
  28. //3个binding将交换机和相应队列连起来
  29. @Bean
  30. public Binding bindingorange(){
  31. Binding binding=BindingBuilder.bind(directQueue1()).to(directExchange()).with("orange");
  32. return binding;
  33. }
  34. @Bean
  35. public Binding bindingblack(){
  36. Binding binding=BindingBuilder.bind(directQueue2()).to(directExchange()).with("black");
  37. return binding;
  38. }
  39. @Bean
  40. public Binding bindinggreen(){
  41. Binding binding=BindingBuilder.bind(directQueue2()).to(directExchange()).with("green");
  42. return binding;
  43. }
  44. }

(3)监听器—消费者1号和2号分别监听消息队列directqueue1、directqueue2,这里用的发布-订阅模式(即观察者模式)。
消费者1号后台手工ack消息,如下所示:

  1. package com.thwcompany.rabbitMQ.listener;
  2. import com.rabbitmq.client.Channel;
  3. import com.thwcompany.pojo.Mail;
  4. import org.springframework.amqp.core.Message;
  5. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  6. import org.springframework.stereotype.Component;
  7. @Component
  8. @SuppressWarnings("all")
  9. public class DirectListener1 {
  10. //发布-订阅模式,这里配置消费者1号监听消息队列directqueue1
  11. @RabbitListener(queues = "directqueue1")
  12. public void displayMail(Mail mail, Channel channel, Message message) throws Exception {
  13. System.out.println("direct消费者1号,从directqueue1消息队列监听到的消息是:"+mail.toString());
  14. //消费者1号进行手工确认签收消息
  15. long deliveryTag = message.getMessageProperties().getDeliveryTag();
  16. channel.basicAck(deliveryTag,false);
  17. }
  18. }

消费者2号没有ack代码逻辑,如下所示:

  1. package com.thwcompany.rabbitMQ.listener;
  2. import com.thwcompany.pojo.Mail;
  3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  4. import org.springframework.stereotype.Component;
  5. @Component
  6. public class DirectListener2 {
  7. //发布-订阅模式,这里配置消费者2号监听消息队列directqueue2
  8. @RabbitListener(queues = "directqueue2")
  9. public void displayMail(Mail mail) throws Exception {
  10. System.out.println("direct消费者2号,从directqueue2消息队列监听到的消息是:"+mail.toString());
  11. }
  12. }

(4)发布者发布消息 —>Exchange,然后Exchange路由消息给与交换机绑定的相应消息队列。

  1. package com.thwcompany.service.impl;
  2. import com.thwcompany.pojo.Mail;
  3. import com.thwcompany.service.PublisherModeService;
  4. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.stereotype.Service;
  7. @Service
  8. @SuppressWarnings("all")
  9. public class PublisherModeServiceImpl implements PublisherModeService {
  10. @Autowired
  11. RabbitTemplate rabbitTemplate;
  12. //参数1:交换机名,参数2:路由键,参数3:发布者发布的消息;mail是我自定义的entity
  13. @Override
  14. public void senddirectMail(Mail mail, String routingkey) {
  15. rabbitTemplate.convertAndSend("direct", routingkey, mail);
  16. }
  17. }

(5)控制层接口:

  1. package com.thwcompany.controller;
  2. import com.thwcompany.pojo.Mail;
  3. import com.thwcompany.pojo.TopicMail;
  4. import com.thwcompany.service.PublisherModeService;
  5. import lombok.extern.slf4j.Slf4j;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.stereotype.Controller;
  8. import org.springframework.web.bind.annotation.ModelAttribute;
  9. import org.springframework.web.bind.annotation.RequestMapping;
  10. import org.springframework.web.bind.annotation.ResponseBody;
  11. @Controller
  12. @Slf4j
  13. @SuppressWarnings("all")
  14. public class RabbitMQController {
  15. @Autowired
  16. PublisherModeService publisherModeService;
  17. //发布-订阅模式,用的是directt交换机
  18. @RequestMapping(value="/direct",produces = {"application/json;charset=UTF-8"})
  19. @ResponseBody
  20. public void direct(@ModelAttribute("mail") TopicMail topicMail){
  21. Mail m=new Mail(topicMail.getMailId(),topicMail.getCountry(),topicMail.getWeight());
  22. publisherModeService.senddirectMail(m, topicMail.getRoutingkey());
  23. }
  24. }

(6)前端页面代码就不展示了,反之就是点击页面,发送消息。底层调用的是Controller层的direct()方法。
image.png
测试:
向directqueue1(orange)发送1条消息,可以看到消费者1号是从directqueue1中读取的消息,并且由于消费者1号进行了手工ack消息确认,此时RabbitMQ UI面板,Unacked=0,Ready=0;操作如下图所示:
1gif.gif
分别向directqueue2(black)、directqueue2(green)发送1条消息,由于消费者2号虽然设置手工确认消息,但是没有手工ack消息确认的代码逻辑(当然也没自动确认消息),此时RabbitMQ UI面板,Unacked=2,Ready=0;操作如下图所示:
1gif.gif

测试1:假若此时Broker服务器宕机的情况下:
效果:idea控制台一直报错:监听不到消息队列的信息。
因为Broker服务器宕机,所以RabbitMQ web ui页面访问不了,但是要知道此时:Ready=2,Unacked=0。

当我们再次启动Broker服务器时,观察idea控制台:可以看到消费者2号自动又 监听、拿取到了信息。
image.png
此时:Ready=0,Unacked=2,如下图所示:
image.png
这里的蓝底大D表示:durable: true,即消息是否持久化。 :::info 小结:验证了手工ack模式下【即使还没执行ack操作】,即使Broker服务器宕机了,消息也不会丢失! ::: —————————————————————————————————————————————————
测试2:假若此时消费者断开连接的情况下,即停掉IDEA:
消费者断开连接前:Unacked=2,Ready=0,如下图所示:
image.png
消费者断开连接后:Unacked=0,Ready=2,如下图所示:
image.png
再次启动IDEA(即启动消费者),观察控制台:可以看到消费者2号自动又 监听、拿取到了信息。
image.png
此时:Unacked=2,Ready=0,如下图所示:
image.png

:::info 小结:验证了手工ack模式下【即使还没执行ack操作】,即使消费者宕机了,消息同样也不会丢失!
即如果消费者断开连接后又连接上了,消息队列会重新投递消息给消费者。 :::


二、死信、死信队列介绍

1、什么是死信、死信队列?

死信,顾名思义就是无法被消费的消息。官方将其翻译为单词Dead Letter。死信,其实这是 RabbitMQ 中一种消息类型,和普通的消息在本质上没有什么区别,更多的是一种业务上的划分。 RabblitMQ的死信队列还是一个普通的消息队列,只不过它是用来接收一些特殊的消息,而这些消息,官网称之为“死信”。 而我们经常说的队列,如无特殊说明,一般就是消息队列(而非死信队列)。


2、消息如何变成死信?

一个消息想要变成死信,官网提供了3种方案:

  • 1)如果给消息队列设置了最大容量(x-max-length),当消息队列已经满了,后续再进来的消息就会溢出,无法被消息队列接收,消息就会变成死信。
  • 2)消息被消费者reject(拒绝)或者nack(nack单词释义“未确认、无应答”;例如调用 channel.basicReject 或channel.basicNack ),并且requeue设置为false(即该消息不允许再次放回消息队列),那么这个消息,也变成死信。
  • 3)如果给消息队列设置了消息的过期时间(x-message-ttl),或者发送消息时设置了当前消息的过期时间,但是在这期间内,消息没有被消费者消费,就会变成死信。

如果消息变成死信后:

  • 1)如果消息队列绑定了死信交换机,那么死信就会被扔到咱们的死信交换机,并且会路由到死信队列。
  • 2)如果消息队列没有绑定了死信交换机,那么死信就会被直接丢弃。

3、死信的应用

比如大量的定时任务,如果用Quartz这种定时任务框架去实现,是比较麻烦的,咱们需要有一堆的定时任务。
那么如果用死信队列,效果会更加优雅;再或者一些消息:消费失败或未被消费者消费到的消息,也可以基于死信,去做一个留存或者一些补偿的操作。


4、代码示列

4.1、准备工作

(1)创建一个 SpringBoot 项目,添加 RabbitMQ 依赖,并添加需要的配置:

  1. <!--通过spring-rabbit来支持AMQP协议-->
  2. <!--Spring提供的一个统一消息服务的应用层标准高级消息队列协议,使用RabbitMQ只引入这个依赖就够了-->
  3. <dependency>
  4. <groupId>org.springframework.boot</groupId>
  5. <artifactId>spring-boot-starter-amqp</artifactId>
  6. </dependency>

(2)application.yml的RabbitMQ相关配置:

  1. spring:
  2. #消息队列rabbitMQ配置
  3. rabbitmq:
  4. host: localhost
  5. port: 5672
  6. username: guest
  7. password: guest
  8. virtual-host: /
  9. listener:
  10. #设置消费者需要手动确认消息
  11. simple:
  12. acknowledge-mode: manual
  13. # direct:
  14. # acknowledge-mode: manual,这个我亲测不用加

4.2、具体代码

(1)创建死信交换机和死信队列,并通过路由键绑定:

  1. package com.thwcompany.config;
  2. import org.springframework.amqp.core.Binding;
  3. import org.springframework.amqp.core.BindingBuilder;
  4. import org.springframework.amqp.core.DirectExchange;
  5. import org.springframework.amqp.core.Queue;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. /**
  9. * Created by IntelliJ IDEA2021.3
  10. * @Author: Tenghw
  11. * @Date: 2022/10/21 20:00
  12. * @Description:
  13. */
  14. @Configuration
  15. @SuppressWarnings("all")
  16. public class DeadLetterRabbitMQConfig {
  17. //创建死信交换机
  18. @Bean
  19. public DirectExchange deadLetterExchange(){
  20. return new DirectExchange("dead.letter.exchange",true,false);
  21. }
  22. //创建死信队列
  23. @Bean
  24. public Queue deadLetterQueue() {
  25. return new Queue("dead.letter.queue");
  26. }
  27. // 死信队列和死信交换机通过路由键绑定
  28. @Bean
  29. Binding deadLetterBinding() {
  30. return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with("dead.letter");
  31. }
  32. }

上面的代码也可参考下面别人的写法,我感觉下面的比较优雅一点:
image.png
image.png
image.png
(2)创建正常的业务交换机和业务队列,并绑定;同时为业务队列绑定死信交换机,这样业务队列就可以把死信丢给死信交换机,进而死信交换机会把死信路由给死信队列:

  1. package com.thwcompany.config;
  2. import org.springframework.amqp.core.Binding;
  3. import org.springframework.amqp.core.BindingBuilder;
  4. import org.springframework.amqp.core.DirectExchange;
  5. import org.springframework.amqp.core.Queue;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. import java.util.HashMap;
  9. /**
  10. * Created by IntelliJ IDEA2021.3
  11. * @Author: Tenghw
  12. * @Date: 2022/10/21 20:32
  13. * @Description:
  14. */
  15. @Configuration
  16. @SuppressWarnings("all")
  17. public class BusinessRabbitMQConfig {
  18. // 创建业务直连交换机
  19. @Bean
  20. DirectExchange businessExchange() {
  21. return new DirectExchange("business.exchange", true, false);
  22. }
  23. // 创建业务消息队列
  24. @Bean
  25. Queue businessQueue1() {
  26. HashMap<String, Object> args = new HashMap<>();
  27. // 设置死信交换机
  28. args.put("x-dead-letter-exchange", "dead.letter.exchange");
  29. // 设置死信交换机绑定队列的routingKey
  30. args.put("x-dead-letter-routing-key", "dead.letter");
  31. return new Queue("business.queue1", true, false, false, args);
  32. }
  33. // 业务消息队列和业务交换机通过路由键绑定
  34. @Bean
  35. Binding businessBinding1() {
  36. return BindingBuilder.bind(businessQueue1()).to(businessExchange()).with("business1");
  37. }
  38. }

(3)发布者发布消息给业务交换机,业务交换机路由消息给业务队列:

  1. package com.thwcompany.service;
  2. import cn.hutool.core.date.DateUtil;
  3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.stereotype.Service;
  6. /**
  7. * Created by IntelliJ IDEA2021.3
  8. * @Author: Tenghw
  9. * @Date: 2022/10/21 20:44
  10. * @Description:
  11. */
  12. @Service
  13. public class BusinessSendService {
  14. @Autowired
  15. RabbitTemplate rabbitTemplate;
  16. public void send(String routingKey, String message) {
  17. rabbitTemplate.convertAndSend("business.exchange", routingKey, message);
  18. String format = DateUtil.format(DateUtil.date(), "yyyy-MM-dd HH:mm:ss.SSS");
  19. System.out.println(format+" >>> 生产者发送的业务消息:" + message);
  20. }
  21. }

(4)监听业务队列的消费者对消息进行reject或nack,使该消息成为死信:

  1. package com.thwcompany.service;
  2. import cn.hutool.core.date.DateUtil;
  3. import com.rabbitmq.client.Channel;
  4. import lombok.SneakyThrows;
  5. import org.springframework.amqp.core.Message;
  6. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  7. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  8. import org.springframework.stereotype.Service;
  9. /**
  10. * Created by IntelliJ IDEA2021.3
  11. * @Author: Tenghw
  12. * @Date: 2022/10/21 20:38
  13. * @Description:
  14. */
  15. @RabbitListener(queues = "business.queue1")
  16. @Service
  17. public class BusinessReceiveService {
  18. @SneakyThrows
  19. @RabbitHandler
  20. public void receive(String msg, Channel channel, Message message) {
  21. long deliveryTag = message.getMessageProperties().getDeliveryTag();
  22. //接收消息
  23. //channel.basicAck(deliveryTag,false);
  24. //System.out.println(" >>> 消费者拒绝的业务消息:" + msg);
  25. // 拒绝消息
  26. channel.basicNack(deliveryTag, false, false);
  27. String format = DateUtil.format(DateUtil.date(), "yyyy-MM-dd HH:mm:ss.SSS");
  28. System.out.println(format+" >>> 消费者拒绝的业务消息:" + msg);
  29. }
  30. }

最终死信,经死信交换机,最后到了死信队列,而死信队列我没设置监听此队列的消费者。
所以此时死信队列状态:Ready:1,Unack:0。

END