生产者确认
事务机制 :发送消息前,开启事务(channel.txSelect()),然后发送消息,如果发送过程中出现什么异常,事务就会回滚(channel.txRollback()),如果发送成功则提交事务(channel.txCommit())。然而,这种方式有个缺点:吞吐量下降
RabbitMQ提供了事务机制保证消息投递,RabbitMQ客户端中与事务机制相关的方法有三个:
- channel.txSelect : 将当前的channel通道设置为事务模式;
- channel.txCommit :用于提交事务;
- channel.txRollback :用于事务回滚;
但是使用事务会大大降低RabbitMQ的性能,在一些较小的吞吐量情况下,也可以采用事务方式,具体情况视各自的系统来决定,这里仅以一段代码来让大家了解事务的机制
try {
channel.txSelect();
channel.basicPublish(exchange , routingKey ,
MessageProperties.PERSISTENT_TEXT_PLAIN , msg.getBytes());
int result = 1 / 0 ;
channel.txCommit();
}catch (Exception e) {
e.printStackTrace();
channel.txRollback();
}
生产者确认机制:
- 生产者将Channel设置成Confirm模式,当设置Confirm模式后所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始,ID在同个Channel范围是唯一的),一旦消息被投递到所有匹配的队列之后Broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了;
- 如果消息和队列是可持久化的,那么确认消息会在消息写入磁盘之后发出;
- RabbitMQ回调消息的deliveryTag包含了确认消息的ID,此外RabbitMQ也可以设置channel.basicAck 方法中的multiple参数,表示到这个序号之前的所有消息都己经得到了处理;稍后介绍handleNack 和 handleAck两个方法我们再举个说明;
- confirm的机制是异步的,如果消息成功发送,会返回ack消息供异步处理,如果消息发送失败发生异常,也会返回nack消息,confirm的时间没有明确说明,并且同一个消息只会被confirm一次;
接下来介绍两种confirm方法
- 批量confirm方法 : 每发送一批消息后,调用channel.waitForConfirms方法,等待服务器的确认返回;
先看代码样例,注意看注释
//开启confirm模式
channel.confirmSelect();
//模拟发送50条消息
for(int i =0;i<1000;i++){
String message = "Hello World RabbitMQ";
//发送消息
channel.basicPublish(EXCHANGE_NAME,"",MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
//每发送2条判断一次是否回复
if(i%2==0){
//waitForConfirms可以换成带有时间参数的方法waitForConfirms(Long mills)指定等待响应时间
if(channel.waitForConfirms()){
System.out.println("Message send success.");
}
}
}
批量的方法从数量级上降低了confirm的性能消耗,提高了效率,但是批量confmn方式的问题在于遇到RabbitMQ服务端返回Basic.Nack 需要重发批量消息而导致的性能降低
- 异步confirm方法(推荐) :提供一个回调方法,服务端确认了一条或者多条消息后客户端会回调这个方法进行处理;
依旧还是先看代码:生产者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class ConfirmProducer {
public static void main(String[] args) throws Exception {
//1 创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("huang");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//2 创建Connection
Connection connection = connectionFactory.newConnection();
//3 创建Channel
Channel channel = connection.createChannel();
//4 指定我们的消息投递模式: 消息的确认模式
channel.confirmSelect();
//5 声明交换机 以及 路由KEY
String exchangeName = "test_confirm_exchange";
String queueName = "test_confirm_queue";
//指定类型为topic
String exchangeType = "topic";
String routingKey = "confirm.send";
//表示声明了一个交换机
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
//表示声明了一个队列
channel.queueDeclare(queueName, true, false, false, null);
//建立一个绑定关系:
channel.queueBind(queueName, exchangeName, routingKey);
//6 发送一条消息
String msg = "Test Confirm Message";
channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
//7 添加确认监听
channel.addConfirmListener(new ConfirmListener(){
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.err.println("收到NACK应答");
}
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.err.println("收到ACK应答");
}
});
}
}
消费者:
public class ConfirmConsumer {
public static void main(String[] args) throws Exception {
//1 创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory() ;
connectionFactory.setHost("192.168.1.28");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("toher");
connectionFactory.setPassword("toher888");
//2 创建Connection
Connection connection = connectionFactory.newConnection();
//3 创建Channel
Channel channel = connection.createChannel();
//4 声明
String exchangeName = "test_confirm_exchange";
//指定类型为topic
String exchangeType = "topic";
String queueName = "test_confirm_queue";
//因为*号代表匹配一个单词,生产者中routingKey3将匹配不到
String routingKey = "confirm.*";
//表示声明了一个交换机
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
//表示声明了一个队列
channel.queueDeclare(queueName, true, false, false, null);
//建立一个绑定关系:
channel.queueBind(queueName, exchangeName, routingKey);
//5 创建消费者
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("消费端:" + msg);
}
};
//参数:队列名称、是否自动ACK、Consumer
channel.basicConsume(queueName, true, consumer);
}
}
运行效果:
从上面代码我们可以看到有重写了ConfirmListener两个方法:handleNack 和 handleAck,分别用来处理RabbitMQ 回传的Basic.Nack和Basic.Ack;
它们都有两个参数:
- long deliveryTag : 前面介绍确认消息的ID
- boolean multiple : multiple 是否批量 如果是True 则将比该deliveryTag小的所有数据都移除 否则只移除该条;
我们简单的用一个数组来说明 [1,2,3,4]存储着4条消息ID , 此时确认消息返回的是 deliveryTag = 3 ,multiple = true那么RabbitMQ会通知我们小于ID3的消息得到确认了,如果multiple = false, 就通知我们ID3的确认了
我们再用修改一下上面的代码看一下
//声明一个用来记录消息唯一ID的有序集合SortedSet
final SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
//开启confirm模式
channel.confirmSelect();
//异步监听方法 处理ack与nack方法
channel.addConfirmListener(new ConfirmListener() {
//处理ack multiple 是否批量 如果是批量 则将比该条小的所有数据都移除 否则只移除该条
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
if (multiple) {
confirmSet.headSet(deliveryTag).clear();
} else {
confirmSet.remove(deliveryTag);
}
}
//处理nack 与ack相同
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("There is Nack, SeqNo: " + deliveryTag + ", multiple: " + multiple);
if (multiple) {
confirmSet.headSet(deliveryTag).clear();
} else {
confirmSet.remove(deliveryTag);
}
}
});
以上代码按照每一个comfirm的通道维护一个集合,每发送一条数据,集合增加一个元素,每异步响应一条ack或者nack的数据,集合删除一条。SortedSet是一个有序的集合,它的有序是值大小的有序,不是插入时间的有序。JDK中waitForConfirms()方法也是使用了SortedSet集合
[
](https://blog.csdn.net/lhmyy521125/article/details/88064322)
消息确认回调
配置yml
spring:
rabbitmq:
username: guest
password: guest
host: localhost
port: 5672
#消息确认配置项
publisher-returns: true #确认消息已发送到队列(Queue)
publisher-confirm-type: correlated #确认消息已发送到交换机(Exchange)
配置相关的消息确认回调函数 ```java import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class RabbitConfig { private Logger logger = LoggerFactory.getLogger(RabbitConfig.class);
// 创建一个模版,绑定的是connectionFactory这个工厂
@Bean
public RabbitTemplate createRabbitTemplate(CachingConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
//设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
rabbitTemplate.setMandatory(true);
// 消息只要被 rabbitmq broker 接收到就会触发 confirmCallback 回调 。
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
logger.info("ConfirmCallback:"+"相关数据:"+correlationData+","+"确认情况:"+ack+","+"原因:"+cause);
}
});
// 消息未能投递到目标 queue 里将触发回调 returnCallback
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
logger.info("ReturnCallback:"+"消息:"+message+","+"回应码:"+replyCode+","+"回应信息:"+replyText+","+"交换机:"+exchange+","+"路由键:"+routingKey);
}
});
return rabbitTemplate;
}
}
两个回调函数ConfirmCallback 、 RetrunCallback在什么情况会触发
1. 消息推送到server,但是在server里找不到交换机
写个测试接口,把消息推送到名为‘non-existent-exchange’的交换机上(这个交换机是没有创建没有配置的)
```shell
@GetMapping("/TestMessageAck")
public String TestMessageAck() {
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "message: non-existent-exchange test message ";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String, Object> map = new HashMap<>();
map.put("messageId", messageId);
map.put("messageData", messageData);
map.put("createTime", createTime);
rabbitTemplate.convertAndSend("non-existent-exchange", "TestDirectRouting", map);
return "ok";
}
调用接口,查看控制台输出情况(原因里面有说,没有找到交换机’non-existent-exchange’)
结论:这种情况触发的是ConfirmCallback 回调函数
- 消息推送到server,找到交换机了,但是没找到队列
这种情况就是需要新增一个交换机,但是不给这个交换机绑定队列,我来简单地在DirectRabitConfig里面新增一个直连交换机,名叫‘lonelyDirectExchange’,但没给它做任何绑定配置操作
@Bean
DirectExchange lonelyDirectExchange() {
return new DirectExchange("lonelyDirectExchange");
}
写个测试接口,把消息推送到名为lonelyDirectExchange的交换机上(这个交换机是没有任何队列配置的)
@GetMapping("/TestMessageAck2")
public String TestMessageAck2() {
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "message: lonelyDirectExchange test message ";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String, Object> map = new HashMap<>();
map.put("messageId", messageId);
map.put("messageData", messageData);
map.put("createTime", createTime);
rabbitTemplate.convertAndSend("lonelyDirectExchange", "TestDirectRouting", map);
return "ok";
}
调用接口,查看项目的控制台输出情况
可以看到这种情况,两个函数都被调用了;
这种情况下,消息是推送成功到服务器了的,所以ConfirmCallback对消息确认情况是true;
而在RetrunCallback回调函数的打印参数里面可以看到,消息是推送到了交换机成功了,但是在路由分发给队列的时候,找不到队列,所以报了错误 NO_ROUTE 。
结论:这种情况触发的是 ConfirmCallback和RetrunCallback两个回调函数。
- 消息推送到sever,交换机和队列啥都没找到
这种情况其实一看就觉得跟①很像,没错 ,③和①情况回调是一致的,所以不做结果说明了。
结论: 这种情况触发的是 ConfirmCallback 回调函数。
- 消息推送成功
那么测试下,按照正常调用之前消息推送的接口就行,可以看到控制台输出:
结论: 这种情况触发的是 ConfirmCallback 回调函数。
消息确认机制
和生产者的消息确认机制不同,因为消息接收本来就是在监听消息,符合条件的消息就会消费下来。
所以,消息接收的确认机制主要存在三种模式:
- 自动确认
这也是默认的消息确认情况。 AcknowledgeMode.NONE
RabbitMQ成功将消息发出(即将消息成功写入TCP Socket)中立即认为本次投递已经被正确处理,不管消费者端是否成功处理本次投递。
所以这种情况如果消费端消费逻辑抛出异常,也就是消费端没有处理成功这条消息,那么就相当于丢失了消息。
一般这种情况我们都是使用try catch捕捉异常后,打印日志用于追踪数据,这样找出对应数据再做后续处理。
- 手动确认 , 这个比较关键,也是我们配置接收消息确认机制时,多数选择的模式。
消费者收到消息后,手动调用basic.ack/basic.nack/basic.reject后,RabbitMQ收到这些消息后,才认为本次投递成功。
- basic.ack用于肯定确认
- basic.nack用于否定确认(注意:这是AMQP 0-9-1的RabbitMQ扩展)
- basic.reject用于否定确认,但与basic.nack相比有一个限制:一次只能拒绝单条消息
消费者端以上的3个方法都表示消息已经被正确投递,但是basic.ack表示消息已经被正确处理。而basic.nack,basic.reject表示没有被正确处理
reject
着重讲下reject,因为有时候一些场景是需要重新入列的。
channel.basicReject(deliveryTag, true); 拒绝消费当前消息,如果第二参数传入true,就是将数据重新丢回队列里,那么下次还会消费这消息。设置false,就是告诉服务器,我已经知道这条消息数据了,因为一些原因拒绝它,而且服务器也把这个消息丢掉就行。 下次不想再消费这条消息了。
使用拒绝后重新入列这个确认模式要谨慎,因为一般都是出现异常的时候,catch异常再拒绝入列,选择是否重入列。
但是如果使用不当会导致一些每次都被你重入列的消息一直消费-入列-消费-入列这样循环,会导致消息积压。
nack
这个也是相当于设置不消费某条消息。
channel.basicNack(deliveryTag, false, true);
- 第一个参数依然是当前消息到的数据的唯一id;
- 第二个参数是指是否针对多条消息;如果是true,也就是说一次性针对当前通道的消息的tagID小于当前这条消息的,都拒绝确认。
- 第三个参数是指是否重新入列,也就是指不确认的消息是否重新丢回到队列里面去。
同样使用不确认后重新入列这个确认模式要谨慎,因为这里也可能因为考虑不周出现消息一直被重新丢回去的情况,导致积压。
实践
在消费者项目里,改为手动确认模式
修改yml文件,添加一下配置
spring:
rabbitmq:
listener:
direct:
acknowledge-mode: manual
simple:
acknowledge-mode: manual
2.添加配置文件
@Configuration
public class RabbitConfig {
private Logger logger = LoggerFactory.getLogger(RabbitConfig.class);
// 手动确认信息
// 方法一
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(CachingConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
factory.setConnectionFactory(connectionFactory);
return factory;
}
// 方法二
@Bean
public DirectRabbitListenerContainerFactory rabbitListenerContainerFactory(CachingConnectionFactory connectionFactory){
DirectRabbitListenerContainerFactory factory = new DirectRabbitListenerContainerFactory();
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
factory.setConnectionFactory(connectionFactory);
return factory;
}
}
在消费者项目里,新建MessageListenerConfig.java上添加代码相关的配置代码 ```java package com.example.demo.rabbit.config;
import com.example.demo.rabbit.consumer.MyAckReceiver; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class MessageListenerConfig {
@Autowired
private CachingConnectionFactory connectionFactory;
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setConcurrentConsumers(1);
container.setMaxConcurrentConsumers(1);
// RabbitMQ默认是自动确认,这里改为手动确认消息
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
//设置一个队列
container.setQueueNames("TestDirectQueue");
//如果同时设置多个如下: 前提是队列都是必须已经创建存在的
// container.setQueueNames("TestDirectQueue","TestDirectQueue2","TestDirectQueue3");
//另一种设置队列的方法,如果使用这种情况,那么要设置多个,就使用addQueues
//container.setQueues(new Queue("TestDirectQueue",true));
//container.addQueues(new Queue("TestDirectQueue2",true));
//container.addQueues(new Queue("TestDirectQueue3",true));
return container;
}
}
- 手动确认消息监听类,需要手动确认
```java
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class Consumer {
private Logger logger = LoggerFactory.getLogger(Consumer.class);
@RabbitHandler
@RabbitListener(queues = "queue-direct")
public void direct1(String msg, Channel channel, Message message) throws IOException {
logger.info("consume direct 1:"+msg);
MessageProperties messageProperties= message.getMessageProperties();
long deliveryTag = messageProperties.getDeliveryTag();
try {
logger.info("message from exchange:"+messageProperties.getReceivedExchange()+",queue:"+messageProperties.getConsumerQueue()
+",routingKey:"+messageProperties.getReceivedRoutingKey());
//第二个参数,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息
channel.basicAck(deliveryTag,false);
}catch (Exception e){
// 第二个参数,true会重新放回队列,所以需要自己根据业务逻辑判断什么时候使用拒绝
channel.basicReject(deliveryTag, false);
logger.error(e.getMessage());
}
}
}