1. 消息可靠性


  • RabbitMQ 的消息可靠性,一般是业务系统接入消息中间件时首要考虑的问题,一般通过三个方面保障:
    • 发送可靠性:确保消息成功发送到 Broker。
    • 存储可靠性:Broker 对消息持久化,确保消息不会丢失。
    • 消费可靠性:确保消息成功被消费。

1. 发送可靠性

  • 一般消息发送可靠性分为 3 个层级:
    • At most once:最多一次,消息可能会丢失,但绝不会重复传输。
    • At least once:最少一次,消息绝不会丢失,但可能会重复传输。
    • Exactly once:恰好一次,每条消息肯定会被传输一次且仅传输一次。
  • RabbitMQ 支持其中的“最多一次”和“最少一次”。
    • 其中“最少一次”投递实现需要考虑以下这几个方面的内容:
      • 消息生产者需要开启事务机制或者 publisher confirm 机制,以确保消息可以可靠地传输到 RabbitMQ 中。
      • 消息生产者需要配合使用 mandatory 参数或者备份交换器来确保消息能够从交换器路由到队列中,进而能够保存下来而不被丢弃。
    • “最多一次”的方式无需考虑以上那些方面,生产者随意发送,不过这样很难确保消息会成功发送。

image.png

2. 消费可靠性

  • 消费者在消费消息的同时,需要将 autoAck 设置为 false,然后通过手动确认的方式去确认已经正确消费的消息,以免在消费端引起不必要的消息丢失。

3. 代码示例

  1. // 可靠生产
  2. // https://www.rabbitmq.com/confirms.html
  3. public class Producer {
  4. public static void main(String[] args) throws IOException, TimeoutException {
  5. // 1. 创建连接工厂
  6. ConnectionFactory factory = new ConnectionFactory();
  7. // 2. 设置连接属性
  8. factory.setHost("114.67.85.157");
  9. factory.setUsername("admin");
  10. factory.setPassword("admin");
  11. Connection connection = null;
  12. Channel channel = null;
  13. try {
  14. // 3. 从连接工厂获取连接
  15. connection = factory.newConnection("生产者");
  16. // 4. 从连接中创建通道
  17. channel = connection.createChannel();
  18. // 进入 confirm 模式,每次发送消息,rabbitmq 处理之后会返回一个对应的回执消息
  19. AMQP.Confirm.SelectOk selectOk = channel.confirmSelect();
  20. // 增加监听器
  21. ArrayList<String> queues = new ArrayList<>();
  22. channel.addConfirmListener(new ConfirmListener() {
  23. @Override
  24. public void handleAck(long deliveryTag, boolean multiple) throws IOException {
  25. // deliveryTag 同一个 channel 中此条消息的编号。
  26. // 业务...
  27. System.out.println("受理成功 " + queues.get((int) deliveryTag) + " " + multiple);
  28. }
  29. @Override
  30. public void handleNack(long deliveryTag, boolean multiple) throws IOException {
  31. // 失败重发
  32. // queues.get((int) deliveryTag)
  33. System.out.println("受理失败 " + deliveryTag);
  34. }
  35. });
  36. // 受理 fanout 类型的交换器
  37. channel.exchangeDeclare("ps_test", "fanout");
  38. for (int i = 0; i < 10; i ++) {
  39. // 消息内容
  40. String message = "Hello confirm " + i;
  41. queues.add(message);
  42. // 发送消息到 ps_test 交换器上
  43. AMQP.BasicProperties basicProperties = new AMQP.BasicProperties();
  44. channel.basicPublish("ps_test", "", basicProperties, message.getBytes());
  45. System.out.println("消息 " + message + "已发送!");
  46. }
  47. // 等待 20 秒
  48. Thread.sleep(20 * 1000);
  49. } catch (InterruptedException e) {
  50. e.printStackTrace();
  51. } finally {
  52. if (channel != null && channel.isOpen()) {
  53. channel.close();
  54. }
  55. if (connection != null && connection.isOpen()) {
  56. connection.close();
  57. }
  58. }
  59. }
  60. }
  1. /**
  2. * 消息确认机制
  3. */
  4. public class Consumer {
  5. private static Runnable receive = () -> {
  6. ConnectionFactory factory = new ConnectionFactory();
  7. factory.setHost("114.67.85.157");
  8. factory.setUsername("admin");
  9. factory.setPassword("admin");
  10. Connection connection = null;
  11. Channel channel = null;
  12. final String clientName = Thread.currentThread().getName();
  13. try {
  14. connection = factory.newConnection("消费者-" + clientName);
  15. // 死信队列:专门用来存储出错、出异常的数据
  16. channel = connection.createChannel();
  17. channel.exchangeDeclare("dlq_exchange", "fanout");
  18. channel.queueDeclare("dlq_queue1", false, false, false, null);
  19. channel.queueBind("dlq_queue1", "dlq_exchange", "");
  20. channel = connection.createChannel();
  21. channel.exchangeDeclare("ps_test", "fanout");
  22. String queueName = "queue1";
  23. // 队列中有死信产生时,消息会转发到交换器 dlq_exchange
  24. Map<String, Object> args = new HashMap<>();
  25. args.put("x-dead-letter-exchange", "dlq_exchange");
  26. channel.queueDeclare(queueName, false, false, false, args);
  27. channel.queueBind(queueName, "ps_test", "");
  28. Channel finalChannel = channel;
  29. channel.basicConsume(queueName, false, "消费者-手动回执",
  30. new DefaultConsumer(finalChannel) {
  31. @Override
  32. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  33. try {
  34. System.out.println("收到消息:" + new String(body));
  35. // TODO 业务
  36. long deliveryTag = envelope.getDeliveryTag();
  37. // 模拟业务处理耗时
  38. Thread.sleep(1000);
  39. // 正常消费
  40. finalChannel.basicAck(deliveryTag, false);
  41. // 异常消费
  42. // finalChannel.basicNack(deliveryTag, false, true);
  43. } catch (InterruptedException e) {
  44. // 异常消费, requeue参数 true 重发,false 不重发(丢弃或者移到 DLQ 死信队列)
  45. // finalChannel.basicNack(envelope.getDeliveryTag(), false, false);
  46. e.printStackTrace();
  47. }
  48. }
  49. });
  50. System.out.println(clientName + " 开始接收消息");
  51. System.in.read();
  52. } catch (TimeoutException e) {
  53. e.printStackTrace();
  54. } catch (IOException e) {
  55. e.printStackTrace();
  56. } finally {
  57. if (channel != null && channel.isOpen()) {
  58. try {
  59. channel.close();
  60. } catch (IOException e) {
  61. e.printStackTrace();
  62. } catch (TimeoutException e) {
  63. e.printStackTrace();
  64. }
  65. }
  66. if (connection != null && connection.isOpen()) {
  67. try {
  68. connection.close();
  69. } catch (IOException e) {
  70. e.printStackTrace();
  71. }
  72. }
  73. }
  74. };
  75. public static void main(String[] args) {
  76. new Thread(receive, "c1").start();
  77. }
  78. }

2. RabbitMQ 插件机制


  • RabbitMQ 支持插件,通过插件可以扩展多种核心功能:支持多种协议、系统状态监控、其他 AMQP 0-9-1 交换类型、节点联合等。许多功能都是通过插件实现的。

插件列表

  • RabbitMQ 内置一些插件,通过 rabbitmq-plugins list 命令可以查看插件列表。

启用插件

  • 通过 rabbitmq-plugins 命令可以启用或禁用插件。
    rabbitmq-plugins enable plugin-name
    rabbitmq-plugins disable plugin-name
    

常用插件

  • rabbitmq_auth_mechanism_ssl:身份验证机制插件,允许 RabbitMQ 客户端使用 x509 证书和 TLS(PKI)证书进行身份验证。
  • rabbitmq_event_exchange:事件分发插件,使客户端可以接收到 Broker 的 queue.deleted、exchange.created、binding.created 等事件。
  • rabbitmq_management:基于 Web 界面的管理/监控插件。
  • rabbitmq_management_agent:启用 rabbitmq_management 时,会自动启用此插件,用于在 Web 管理中查看集群节点。
  • rabbitmq_mqtt:MQTT 插件,使 RabbitMQ 支持 MQTT 插件。
  • rabbitmq_web_mqtt:使 RabbitMQ 支持通过 WebSocket 订阅消息,基于 MQTT 协议传输。