RabbitMQ几种问题
- 如果消息已经到达了RabbitMQ,但是RabbitMQ宕机了,消息是不是就丢了?
没丢,RabbitMQ得到Queue有持久化机制。
- 消费者在消费消息时,如果消费者宕机了怎么办?
手动ACK,实现手动ACK:告诉RabbitMQ是否消费成功。
1,添加配置文件
spring:
rabbitmq:
host: *.*.*.*
port: 5672
username: test
password: test
virtual-host: /test
listener:
simple:
acknowledge-mode: manual//手动
2,在消费消息位置,修改方法,再手动ack
package com.example.listen;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @author xuebin
*/
@Component
public class Consumer {
@RabbitListener(queues = "boot-queue")
public void getMassage(String msg, Channel channel, Message message) throws IOException {
System.out.println("接收到消息:"+msg);
//手动ack
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}
- 生产者发送消息时,由于网络问题,导致消息没发送到RabbitMQ?
RabbitMQ提供了事务操作和confirm来保证消息额可靠性。
1,confirm方式
RabbitMQ的事务:事务可以保证消息100%传递,可以通过事务的回滚去记录日志,后面定时再次发送当前消息,
事务的操作,效率太低,加了事务操作后,比平时的操作效率至少要慢100倍。
RabbitMQ除了事务,还提供了Confirm的确认机制,这个效率比事务高很多
1,普通Confirm方式
//开启confirm
channel.confirmSelect();
//发送消息
String msg = "Hello-world!";
channel.basicPublish("","HelloWorld",null,msg.getBytes());
//判断消息是否发送成功
if (channel.waitForConfirms()){
System.out.println("生产者发布消息成功!");
}else {
System.out.println("消息发送失败");
}
2,批量Confirm方式
//开启confirm
channel.confirmSelect();
//批量发送消息
for (int i = 0; i < 10; i++) {
String msg = "Hello-world"+i;
channel.basicPublish("","HelloWorld",null,msg.getBytes());
}
//确认消息是否发送成功
//当发送的全部消息有一个失败时,就直接全部失败并抛出异常(IOException)
channel.waitForConfirmsOrDie();
3,异步Confirm方式(最常用)
//开启confirm
channel.confirmSelect();
//批量发送消息
for (int i = 0; i < 10; i++) {
String msg = "Hello-world"+i;
channel.basicPublish("","HelloWorld",null,msg.getBytes());
}
//确认消息是否发送成功
channel.addConfirmListener(new ConfirmListener() {
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("消息发送成功,标识:"+deliveryTag+",是否是批量"+multiple);
}
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("消息发送失败,标识:"+deliveryTag+",是否是批量"+multiple);
}
});
2,开启Return机制
注意:Confirm只能保证将消息发送到exchange中,无法保证消息可以被exchange分发到指定Queue。
而且exchange是不能持久化消息的,queue是可以持久化消息的。
采用Return机制来监听消息是否从exchange送到了指定的queue中。
//开启return机制
channel.addReturnListener(new ReturnListener() {
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
//当消息没有送达queue是才会执行
System.out.println(new String(body,"UTF-8")+"没有送达Queue中");
}
});
//3. 发布消息到exchange通道入口
//开启confirm
channel.confirmSelect();
for (int i = 0; i < 10; i++) {
String msg = "Hello-world"+i;
//消息的构造方法不一样,多一个参数
channel.basicPublish("","bingo",true,null,msg.getBytes());
}
channel.addConfirmListener(new ConfirmListener() {
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("消息发送成功,标识:"+deliveryTag+",是否是批量"+multiple);
}
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("消息发送失败,标识:"+deliveryTag+",是否是批量"+multiple);
}
});
System.in.read();