RabbitMQ几种问题

  1. 如果消息已经到达了RabbitMQ,但是RabbitMQ宕机了,消息是不是就丢了?

没丢,RabbitMQ得到Queue有持久化机制。

  1. 消费者在消费消息时,如果消费者宕机了怎么办?

手动ACK,实现手动ACK:告诉RabbitMQ是否消费成功。

  1. 1,添加配置文件
  2. spring:
  3. rabbitmq:
  4. host: *.*.*.*
  5. port: 5672
  6. username: test
  7. password: test
  8. virtual-host: /test
  9. listener:
  10. simple:
  11. acknowledge-mode: manual//手动
  12. 2,在消费消息位置,修改方法,再手动ack
  13. package com.example.listen;
  14. import com.rabbitmq.client.Channel;
  15. import org.springframework.amqp.core.Message;
  16. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  17. import org.springframework.stereotype.Component;
  18. import java.io.IOException;
  19. /**
  20. * @author xuebin
  21. */
  22. @Component
  23. public class Consumer {
  24. @RabbitListener(queues = "boot-queue")
  25. public void getMassage(String msg, Channel channel, Message message) throws IOException {
  26. System.out.println("接收到消息:"+msg);
  27. //手动ack
  28. channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
  29. }
  30. }
  1. 生产者发送消息时,由于网络问题,导致消息没发送到RabbitMQ?

RabbitMQ提供了事务操作和confirm来保证消息额可靠性。
1,confirm方式

  1. RabbitMQ的事务:事务可以保证消息100%传递,可以通过事务的回滚去记录日志,后面定时再次发送当前消息,
  2. 事务的操作,效率太低,加了事务操作后,比平时的操作效率至少要慢100倍。
  3. RabbitMQ除了事务,还提供了Confirm的确认机制,这个效率比事务高很多
  4. 1,普通Confirm方式
  5. //开启confirm
  6. channel.confirmSelect();
  7. //发送消息
  8. String msg = "Hello-world!";
  9. channel.basicPublish("","HelloWorld",null,msg.getBytes());
  10. //判断消息是否发送成功
  11. if (channel.waitForConfirms()){
  12. System.out.println("生产者发布消息成功!");
  13. }else {
  14. System.out.println("消息发送失败");
  15. }
  16. 2,批量Confirm方式
  17. //开启confirm
  18. channel.confirmSelect();
  19. //批量发送消息
  20. for (int i = 0; i < 10; i++) {
  21. String msg = "Hello-world"+i;
  22. channel.basicPublish("","HelloWorld",null,msg.getBytes());
  23. }
  24. //确认消息是否发送成功
  25. //当发送的全部消息有一个失败时,就直接全部失败并抛出异常(IOException)
  26. channel.waitForConfirmsOrDie();
  27. 3,异步Confirm方式(最常用)
  28. //开启confirm
  29. channel.confirmSelect();
  30. //批量发送消息
  31. for (int i = 0; i < 10; i++) {
  32. String msg = "Hello-world"+i;
  33. channel.basicPublish("","HelloWorld",null,msg.getBytes());
  34. }
  35. //确认消息是否发送成功
  36. channel.addConfirmListener(new ConfirmListener() {
  37. public void handleAck(long deliveryTag, boolean multiple) throws IOException {
  38. System.out.println("消息发送成功,标识:"+deliveryTag+",是否是批量"+multiple);
  39. }
  40. public void handleNack(long deliveryTag, boolean multiple) throws IOException {
  41. System.out.println("消息发送失败,标识:"+deliveryTag+",是否是批量"+multiple);
  42. }
  43. });

2,开启Return机制

  1. 注意:Confirm只能保证将消息发送到exchange中,无法保证消息可以被exchange分发到指定Queue
  2. 而且exchange是不能持久化消息的,queue是可以持久化消息的。
  3. 采用Return机制来监听消息是否从exchange送到了指定的queue中。
  4. //开启return机制
  5. channel.addReturnListener(new ReturnListener() {
  6. public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
  7. //当消息没有送达queue是才会执行
  8. System.out.println(new String(body,"UTF-8")+"没有送达Queue中");
  9. }
  10. });
  11. //3. 发布消息到exchange通道入口
  12. //开启confirm
  13. channel.confirmSelect();
  14. for (int i = 0; i < 10; i++) {
  15. String msg = "Hello-world"+i;
  16. //消息的构造方法不一样,多一个参数
  17. channel.basicPublish("","bingo",true,null,msg.getBytes());
  18. }
  19. channel.addConfirmListener(new ConfirmListener() {
  20. public void handleAck(long deliveryTag, boolean multiple) throws IOException {
  21. System.out.println("消息发送成功,标识:"+deliveryTag+",是否是批量"+multiple);
  22. }
  23. public void handleNack(long deliveryTag, boolean multiple) throws IOException {
  24. System.out.println("消息发送失败,标识:"+deliveryTag+",是否是批量"+multiple);
  25. }
  26. });
  27. System.in.read();