RabbitMQ几种问题
- 如果消息已经到达了RabbitMQ,但是RabbitMQ宕机了,消息是不是就丢了?
没丢,RabbitMQ得到Queue有持久化机制。
- 消费者在消费消息时,如果消费者宕机了怎么办?
手动ACK,实现手动ACK:告诉RabbitMQ是否消费成功。
1,添加配置文件spring:rabbitmq:host: *.*.*.*port: 5672username: testpassword: testvirtual-host: /testlistener:simple:acknowledge-mode: manual//手动2,在消费消息位置,修改方法,再手动ackpackage com.example.listen;import com.rabbitmq.client.Channel;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import java.io.IOException;/*** @author xuebin*/@Componentpublic class Consumer {@RabbitListener(queues = "boot-queue")public void getMassage(String msg, Channel channel, Message message) throws IOException {System.out.println("接收到消息:"+msg);//手动ackchannel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}}
- 生产者发送消息时,由于网络问题,导致消息没发送到RabbitMQ?
RabbitMQ提供了事务操作和confirm来保证消息额可靠性。
1,confirm方式
RabbitMQ的事务:事务可以保证消息100%传递,可以通过事务的回滚去记录日志,后面定时再次发送当前消息,事务的操作,效率太低,加了事务操作后,比平时的操作效率至少要慢100倍。RabbitMQ除了事务,还提供了Confirm的确认机制,这个效率比事务高很多1,普通Confirm方式//开启confirmchannel.confirmSelect();//发送消息String msg = "Hello-world!";channel.basicPublish("","HelloWorld",null,msg.getBytes());//判断消息是否发送成功if (channel.waitForConfirms()){System.out.println("生产者发布消息成功!");}else {System.out.println("消息发送失败");}2,批量Confirm方式//开启confirmchannel.confirmSelect();//批量发送消息for (int i = 0; i < 10; i++) {String msg = "Hello-world"+i;channel.basicPublish("","HelloWorld",null,msg.getBytes());}//确认消息是否发送成功//当发送的全部消息有一个失败时,就直接全部失败并抛出异常(IOException)channel.waitForConfirmsOrDie();3,异步Confirm方式(最常用)//开启confirmchannel.confirmSelect();//批量发送消息for (int i = 0; i < 10; i++) {String msg = "Hello-world"+i;channel.basicPublish("","HelloWorld",null,msg.getBytes());}//确认消息是否发送成功channel.addConfirmListener(new ConfirmListener() {public void handleAck(long deliveryTag, boolean multiple) throws IOException {System.out.println("消息发送成功,标识:"+deliveryTag+",是否是批量"+multiple);}public void handleNack(long deliveryTag, boolean multiple) throws IOException {System.out.println("消息发送失败,标识:"+deliveryTag+",是否是批量"+multiple);}});
2,开启Return机制
注意:Confirm只能保证将消息发送到exchange中,无法保证消息可以被exchange分发到指定Queue。而且exchange是不能持久化消息的,queue是可以持久化消息的。采用Return机制来监听消息是否从exchange送到了指定的queue中。//开启return机制channel.addReturnListener(new ReturnListener() {public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {//当消息没有送达queue是才会执行System.out.println(new String(body,"UTF-8")+"没有送达Queue中");}});//3. 发布消息到exchange通道入口//开启confirmchannel.confirmSelect();for (int i = 0; i < 10; i++) {String msg = "Hello-world"+i;//消息的构造方法不一样,多一个参数channel.basicPublish("","bingo",true,null,msg.getBytes());}channel.addConfirmListener(new ConfirmListener() {public void handleAck(long deliveryTag, boolean multiple) throws IOException {System.out.println("消息发送成功,标识:"+deliveryTag+",是否是批量"+multiple);}public void handleNack(long deliveryTag, boolean multiple) throws IOException {System.out.println("消息发送失败,标识:"+deliveryTag+",是否是批量"+multiple);}});System.in.read();
