生产者确认

事务机制 :发送消息前,开启事务(channel.txSelect()),然后发送消息,如果发送过程中出现什么异常,事务就会回滚(channel.txRollback()),如果发送成功则提交事务(channel.txCommit())。然而,这种方式有个缺点:吞吐量下降
RabbitMQ提供了事务机制保证消息投递,RabbitMQ客户端中与事务机制相关的方法有三个:

  • channel.txSelect : 将当前的channel通道设置为事务模式;
  • channel.txCommit :用于提交事务;
  • channel.txRollback :用于事务回滚;

但是使用事务会大大降低RabbitMQ的性能,在一些较小的吞吐量情况下,也可以采用事务方式,具体情况视各自的系统来决定,这里仅以一段代码来让大家了解事务的机制

  1. try {
  2. channel.txSelect();
  3. channel.basicPublish(exchange , routingKey ,
  4. MessageProperties.PERSISTENT_TEXT_PLAIN , msg.getBytes());
  5. int result = 1 / 0 ;
  6. channel.txCommit();
  7. }catch (Exception e) {
  8. e.printStackTrace();
  9. channel.txRollback();
  10. }

生产者确认机制
image.png

  1. 生产者将Channel设置成Confirm模式,当设置Confirm模式后所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始,ID在同个Channel范围是唯一的),一旦消息被投递到所有匹配的队列之后Broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了;
  2. 如果消息和队列是可持久化的,那么确认消息会在消息写入磁盘之后发出;
  3. RabbitMQ回调消息的deliveryTag包含了确认消息的ID,此外RabbitMQ也可以设置channel.basicAck 方法中的multiple参数,表示到这个序号之前的所有消息都己经得到了处理;稍后介绍handleNack 和 handleAck两个方法我们再举个说明;
  4. confirm的机制是异步的,如果消息成功发送,会返回ack消息供异步处理,如果消息发送失败发生异常,也会返回nack消息,confirm的时间没有明确说明,并且同一个消息只会被confirm一次;

接下来介绍两种confirm方法

  1. 批量confirm方法 : 每发送一批消息后,调用channel.waitForConfirms方法,等待服务器的确认返回;

先看代码样例,注意看注释

  1. //开启confirm模式
  2. channel.confirmSelect();
  3. //模拟发送50条消息
  4. for(int i =0;i<1000;i++){
  5. String message = "Hello World RabbitMQ";
  6. //发送消息
  7. channel.basicPublish(EXCHANGE_NAME,"",MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
  8. //每发送2条判断一次是否回复
  9. if(i%2==0){
  10. //waitForConfirms可以换成带有时间参数的方法waitForConfirms(Long mills)指定等待响应时间
  11. if(channel.waitForConfirms()){
  12. System.out.println("Message send success.");
  13. }
  14. }
  15. }

批量的方法从数量级上降低了confirm的性能消耗,提高了效率,但是批量confmn方式的问题在于遇到RabbitMQ服务端返回Basic.Nack 需要重发批量消息而导致的性能降低

  1. 异步confirm方法(推荐) :提供一个回调方法,服务端确认了一条或者多条消息后客户端会回调这个方法进行处理;

依旧还是先看代码:生产者

  1. import com.rabbitmq.client.Channel;
  2. import com.rabbitmq.client.ConfirmListener;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. public class ConfirmProducer {
  6. public static void main(String[] args) throws Exception {
  7. //1 创建ConnectionFactory
  8. ConnectionFactory connectionFactory = new ConnectionFactory();
  9. connectionFactory.setHost("localhost");
  10. connectionFactory.setPort(5672);
  11. connectionFactory.setVirtualHost("huang");
  12. connectionFactory.setUsername("guest");
  13. connectionFactory.setPassword("guest");
  14. //2 创建Connection
  15. Connection connection = connectionFactory.newConnection();
  16. //3 创建Channel
  17. Channel channel = connection.createChannel();
  18. //4 指定我们的消息投递模式: 消息的确认模式
  19. channel.confirmSelect();
  20. //5 声明交换机 以及 路由KEY
  21. String exchangeName = "test_confirm_exchange";
  22. String queueName = "test_confirm_queue";
  23. //指定类型为topic
  24. String exchangeType = "topic";
  25. String routingKey = "confirm.send";
  26. //表示声明了一个交换机
  27. channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
  28. //表示声明了一个队列
  29. channel.queueDeclare(queueName, true, false, false, null);
  30. //建立一个绑定关系:
  31. channel.queueBind(queueName, exchangeName, routingKey);
  32. //6 发送一条消息
  33. String msg = "Test Confirm Message";
  34. channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
  35. //7 添加确认监听
  36. channel.addConfirmListener(new ConfirmListener(){
  37. @Override
  38. public void handleNack(long deliveryTag, boolean multiple) throws IOException {
  39. System.err.println("收到NACK应答");
  40. }
  41. @Override
  42. public void handleAck(long deliveryTag, boolean multiple) throws IOException {
  43. System.err.println("收到ACK应答");
  44. }
  45. });
  46. }
  47. }

消费者:

  1. public class ConfirmConsumer {
  2. public static void main(String[] args) throws Exception {
  3. //1 创建ConnectionFactory
  4. ConnectionFactory connectionFactory = new ConnectionFactory() ;
  5. connectionFactory.setHost("192.168.1.28");
  6. connectionFactory.setPort(5672);
  7. connectionFactory.setVirtualHost("/");
  8. connectionFactory.setUsername("toher");
  9. connectionFactory.setPassword("toher888");
  10. //2 创建Connection
  11. Connection connection = connectionFactory.newConnection();
  12. //3 创建Channel
  13. Channel channel = connection.createChannel();
  14. //4 声明
  15. String exchangeName = "test_confirm_exchange";
  16. //指定类型为topic
  17. String exchangeType = "topic";
  18. String queueName = "test_confirm_queue";
  19. //因为*号代表匹配一个单词,生产者中routingKey3将匹配不到
  20. String routingKey = "confirm.*";
  21. //表示声明了一个交换机
  22. channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
  23. //表示声明了一个队列
  24. channel.queueDeclare(queueName, true, false, false, null);
  25. //建立一个绑定关系:
  26. channel.queueBind(queueName, exchangeName, routingKey);
  27. //5 创建消费者
  28. Consumer consumer = new DefaultConsumer(channel){
  29. @Override
  30. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
  31. throws IOException {
  32. String msg = new String(body, "UTF-8");
  33. System.out.println("消费端:" + msg);
  34. }
  35. };
  36. //参数:队列名称、是否自动ACK、Consumer
  37. channel.basicConsume(queueName, true, consumer);
  38. }
  39. }

运行效果:
image.png
image.png
从上面代码我们可以看到有重写了ConfirmListener两个方法:handleNack 和 handleAck,分别用来处理RabbitMQ 回传的Basic.Nack和Basic.Ack;
它们都有两个参数:

  1. long deliveryTag : 前面介绍确认消息的ID
  2. boolean multiple : multiple 是否批量 如果是True 则将比该deliveryTag小的所有数据都移除 否则只移除该条;

我们简单的用一个数组来说明 [1,2,3,4]存储着4条消息ID , 此时确认消息返回的是 deliveryTag = 3 ,multiple = true那么RabbitMQ会通知我们小于ID3的消息得到确认了,如果multiple = false, 就通知我们ID3的确认了
我们再用修改一下上面的代码看一下

  1. //声明一个用来记录消息唯一ID的有序集合SortedSet
  2. final SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
  3. //开启confirm模式
  4. channel.confirmSelect();
  5. //异步监听方法 处理ack与nack方法
  6. channel.addConfirmListener(new ConfirmListener() {
  7. //处理ack multiple 是否批量 如果是批量 则将比该条小的所有数据都移除 否则只移除该条
  8. public void handleAck(long deliveryTag, boolean multiple) throws IOException {
  9. if (multiple) {
  10. confirmSet.headSet(deliveryTag).clear();
  11. } else {
  12. confirmSet.remove(deliveryTag);
  13. }
  14. }
  15. //处理nack 与ack相同
  16. public void handleNack(long deliveryTag, boolean multiple) throws IOException {
  17. System.out.println("There is Nack, SeqNo: " + deliveryTag + ", multiple: " + multiple);
  18. if (multiple) {
  19. confirmSet.headSet(deliveryTag).clear();
  20. } else {
  21. confirmSet.remove(deliveryTag);
  22. }
  23. }
  24. });

以上代码按照每一个comfirm的通道维护一个集合,每发送一条数据,集合增加一个元素,每异步响应一条ack或者nack的数据,集合删除一条。SortedSet是一个有序的集合,它的有序是值大小的有序,不是插入时间的有序。JDK中waitForConfirms()方法也是使用了SortedSet集合
[

](https://blog.csdn.net/lhmyy521125/article/details/88064322)

消息确认回调

  • 配置yml

    1. spring:
    2. rabbitmq:
    3. username: guest
    4. password: guest
    5. host: localhost
    6. port: 5672
    7. #消息确认配置项
    8. publisher-returns: true #确认消息已发送到队列(Queue)
    9. publisher-confirm-type: correlated #确认消息已发送到交换机(Exchange)
  • 配置相关的消息确认回调函数 ```java import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;

@Configuration public class RabbitConfig { private Logger logger = LoggerFactory.getLogger(RabbitConfig.class);

  1. // 创建一个模版,绑定的是connectionFactory这个工厂
  2. @Bean
  3. public RabbitTemplate createRabbitTemplate(CachingConnectionFactory connectionFactory){
  4. RabbitTemplate rabbitTemplate = new RabbitTemplate();
  5. rabbitTemplate.setConnectionFactory(connectionFactory);
  6. //设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
  7. rabbitTemplate.setMandatory(true);
  8. // 消息只要被 rabbitmq broker 接收到就会触发 confirmCallback 回调 。
  9. rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
  10. @Override
  11. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  12. logger.info("ConfirmCallback:"+"相关数据:"+correlationData+","+"确认情况:"+ack+","+"原因:"+cause);
  13. }
  14. });
  15. // 消息未能投递到目标 queue 里将触发回调 returnCallback
  16. rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
  17. @Override
  18. public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
  19. logger.info("ReturnCallback:"+"消息:"+message+","+"回应码:"+replyCode+","+"回应信息:"+replyText+","+"交换机:"+exchange+","+"路由键:"+routingKey);
  20. }
  21. });
  22. return rabbitTemplate;
  23. }

}

  1. 两个回调函数ConfirmCallback RetrunCallback在什么情况会触发
  2. 1. 消息推送到server,但是在server里找不到交换机
  3. 写个测试接口,把消息推送到名为‘non-existent-exchange’的交换机上(这个交换机是没有创建没有配置的)
  4. ```shell
  5. @GetMapping("/TestMessageAck")
  6. public String TestMessageAck() {
  7. String messageId = String.valueOf(UUID.randomUUID());
  8. String messageData = "message: non-existent-exchange test message ";
  9. String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
  10. Map<String, Object> map = new HashMap<>();
  11. map.put("messageId", messageId);
  12. map.put("messageData", messageData);
  13. map.put("createTime", createTime);
  14. rabbitTemplate.convertAndSend("non-existent-exchange", "TestDirectRouting", map);
  15. return "ok";
  16. }

调用接口,查看控制台输出情况(原因里面有说,没有找到交换机’non-existent-exchange’)
image.png
结论:这种情况触发的是ConfirmCallback 回调函数

  1. 消息推送到server,找到交换机了,但是没找到队列

这种情况就是需要新增一个交换机,但是不给这个交换机绑定队列,我来简单地在DirectRabitConfig里面新增一个直连交换机,名叫‘lonelyDirectExchange’,但没给它做任何绑定配置操作

  1. @Bean
  2. DirectExchange lonelyDirectExchange() {
  3. return new DirectExchange("lonelyDirectExchange");
  4. }

写个测试接口,把消息推送到名为lonelyDirectExchange的交换机上(这个交换机是没有任何队列配置的)

  1. @GetMapping("/TestMessageAck2")
  2. public String TestMessageAck2() {
  3. String messageId = String.valueOf(UUID.randomUUID());
  4. String messageData = "message: lonelyDirectExchange test message ";
  5. String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
  6. Map<String, Object> map = new HashMap<>();
  7. map.put("messageId", messageId);
  8. map.put("messageData", messageData);
  9. map.put("createTime", createTime);
  10. rabbitTemplate.convertAndSend("lonelyDirectExchange", "TestDirectRouting", map);
  11. return "ok";
  12. }

调用接口,查看项目的控制台输出情况
image.png
可以看到这种情况,两个函数都被调用了;
这种情况下,消息是推送成功到服务器了的,所以ConfirmCallback对消息确认情况是true;
而在RetrunCallback回调函数的打印参数里面可以看到,消息是推送到了交换机成功了,但是在路由分发给队列的时候,找不到队列,所以报了错误 NO_ROUTE 。
结论:这种情况触发的是 ConfirmCallback和RetrunCallback两个回调函数。

  1. 消息推送到sever,交换机和队列啥都没找到

这种情况其实一看就觉得跟①很像,没错 ,③和①情况回调是一致的,所以不做结果说明了。
结论: 这种情况触发的是 ConfirmCallback 回调函数。

  1. 消息推送成功

那么测试下,按照正常调用之前消息推送的接口就行,可以看到控制台输出:
image.png
结论: 这种情况触发的是 ConfirmCallback 回调函数。

消息确认机制

和生产者的消息确认机制不同,因为消息接收本来就是在监听消息,符合条件的消息就会消费下来。
所以,消息接收的确认机制主要存在三种模式:

  1. 自动确认

这也是默认的消息确认情况。 AcknowledgeMode.NONE
RabbitMQ成功将消息发出(即将消息成功写入TCP Socket)中立即认为本次投递已经被正确处理,不管消费者端是否成功处理本次投递。
所以这种情况如果消费端消费逻辑抛出异常,也就是消费端没有处理成功这条消息,那么就相当于丢失了消息。
一般这种情况我们都是使用try catch捕捉异常后,打印日志用于追踪数据,这样找出对应数据再做后续处理。

  1. 手动确认 , 这个比较关键,也是我们配置接收消息确认机制时,多数选择的模式。

消费者收到消息后,手动调用basic.ack/basic.nack/basic.reject后,RabbitMQ收到这些消息后,才认为本次投递成功。

  • basic.ack用于肯定确认
  • basic.nack用于否定确认(注意:这是AMQP 0-9-1的RabbitMQ扩展)
  • basic.reject用于否定确认,但与basic.nack相比有一个限制:一次只能拒绝单条消息

消费者端以上的3个方法都表示消息已经被正确投递,但是basic.ack表示消息已经被正确处理。而basic.nack,basic.reject表示没有被正确处理

reject
着重讲下reject,因为有时候一些场景是需要重新入列的。

channel.basicReject(deliveryTag, true); 拒绝消费当前消息,如果第二参数传入true,就是将数据重新丢回队列里,那么下次还会消费这消息。设置false,就是告诉服务器,我已经知道这条消息数据了,因为一些原因拒绝它,而且服务器也把这个消息丢掉就行。 下次不想再消费这条消息了。

使用拒绝后重新入列这个确认模式要谨慎,因为一般都是出现异常的时候,catch异常再拒绝入列,选择是否重入列。

但是如果使用不当会导致一些每次都被你重入列的消息一直消费-入列-消费-入列这样循环,会导致消息积压。

nack
这个也是相当于设置不消费某条消息。

channel.basicNack(deliveryTag, false, true);

  • 第一个参数依然是当前消息到的数据的唯一id;
  • 第二个参数是指是否针对多条消息;如果是true,也就是说一次性针对当前通道的消息的tagID小于当前这条消息的,都拒绝确认。
  • 第三个参数是指是否重新入列,也就是指不确认的消息是否重新丢回到队列里面去。

同样使用不确认后重新入列这个确认模式要谨慎,因为这里也可能因为考虑不周出现消息一直被重新丢回去的情况,导致积压。

实践
在消费者项目里,改为手动确认模式

  1. 修改yml文件,添加一下配置

    1. spring:
    2. rabbitmq:
    3. listener:
    4. direct:
    5. acknowledge-mode: manual
    6. simple:
    7. acknowledge-mode: manual

    2.添加配置文件

    1. @Configuration
    2. public class RabbitConfig {
    3. private Logger logger = LoggerFactory.getLogger(RabbitConfig.class);
    4. // 手动确认信息
    5. // 方法一
    6. @Bean
    7. public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(CachingConnectionFactory connectionFactory) {
    8. SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    9. factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    10. factory.setConnectionFactory(connectionFactory);
    11. return factory;
    12. }
    13. // 方法二
    14. @Bean
    15. public DirectRabbitListenerContainerFactory rabbitListenerContainerFactory(CachingConnectionFactory connectionFactory){
    16. DirectRabbitListenerContainerFactory factory = new DirectRabbitListenerContainerFactory();
    17. factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    18. factory.setConnectionFactory(connectionFactory);
    19. return factory;
    20. }
    21. }

    在消费者项目里,新建MessageListenerConfig.java上添加代码相关的配置代码 ```java package com.example.demo.rabbit.config;

import com.example.demo.rabbit.consumer.MyAckReceiver; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;

@Configuration public class MessageListenerConfig {

  1. @Autowired
  2. private CachingConnectionFactory connectionFactory;
  3. @Bean
  4. public SimpleMessageListenerContainer simpleMessageListenerContainer() {
  5. SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
  6. container.setConcurrentConsumers(1);
  7. container.setMaxConcurrentConsumers(1);
  8. // RabbitMQ默认是自动确认,这里改为手动确认消息
  9. container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
  10. //设置一个队列
  11. container.setQueueNames("TestDirectQueue");
  12. //如果同时设置多个如下: 前提是队列都是必须已经创建存在的
  13. // container.setQueueNames("TestDirectQueue","TestDirectQueue2","TestDirectQueue3");
  14. //另一种设置队列的方法,如果使用这种情况,那么要设置多个,就使用addQueues
  15. //container.setQueues(new Queue("TestDirectQueue",true));
  16. //container.addQueues(new Queue("TestDirectQueue2",true));
  17. //container.addQueues(new Queue("TestDirectQueue3",true));
  18. return container;
  19. }

}

  1. - 手动确认消息监听类,需要手动确认
  2. ```java
  3. import com.rabbitmq.client.Channel;
  4. import org.slf4j.Logger;
  5. import org.slf4j.LoggerFactory;
  6. import org.springframework.amqp.core.Message;
  7. import org.springframework.amqp.core.MessageProperties;
  8. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  9. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  10. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  11. import org.springframework.beans.factory.annotation.Autowired;
  12. import org.springframework.stereotype.Component;
  13. import java.io.IOException;
  14. @Component
  15. public class Consumer {
  16. private Logger logger = LoggerFactory.getLogger(Consumer.class);
  17. @RabbitHandler
  18. @RabbitListener(queues = "queue-direct")
  19. public void direct1(String msg, Channel channel, Message message) throws IOException {
  20. logger.info("consume direct 1:"+msg);
  21. MessageProperties messageProperties= message.getMessageProperties();
  22. long deliveryTag = messageProperties.getDeliveryTag();
  23. try {
  24. logger.info("message from exchange:"+messageProperties.getReceivedExchange()+",queue:"+messageProperties.getConsumerQueue()
  25. +",routingKey:"+messageProperties.getReceivedRoutingKey());
  26. //第二个参数,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息
  27. channel.basicAck(deliveryTag,false);
  28. }catch (Exception e){
  29. // 第二个参数,true会重新放回队列,所以需要自己根据业务逻辑判断什么时候使用拒绝
  30. channel.basicReject(deliveryTag, false);
  31. logger.error(e.getMessage());
  32. }
  33. }
  34. }