4.1原理

生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID(从 1 开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号,此外 broker 也可以设置basic.ack 的 multiple 域,表示到这个序列号之前的所有消息都已经得到了处理
confirm 模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息,生产者应用程序同样可以在回调方法中处理该 nack 消息
真正的消息不丢失:
1.持久化队列(生产者端设置) 2.持久化消息(生产者端设置) 3.发布确认(生产者端设置)

4.2发布策略

4.2.1开启方法

  1. Channel channel = connection.createChannel();
  2. channel.confirmSelect();

4.2.2单个确认发布

发布一条,确认一条,同步确认发布的方式
缺点是发布速度慢,会有阻塞

  1. /**
  2. * 单个确认
  3. * @throws Exception
  4. */
  5. public static void publishMessageIndividual() throws Exception {
  6. Channel channel = RabbitMqUtils.getChannel();
  7. //队列声明
  8. String queueName = UUID.randomUUID().toString();
  9. channel.queueDeclare(queueName,false,false,false,null);
  10. //开启发布确认
  11. channel.confirmSelect();
  12. //开启时间
  13. long begin = System.currentTimeMillis();
  14. for (int i = 0; i < MESSAGE_COUNT; i++)
  15. {String message = i + "";
  16. channel.basicPublish("", queueName, null, message.getBytes());
  17. //服务端返回 false 或超时时间内未返回,生产者可以消息重发
  18. boolean flag = channel.waitForConfirms();
  19. if(flag){
  20. System.out.println("消息发送成功");
  21. }
  22. }
  23. long end = System.currentTimeMillis();
  24. System.out.println("发布" + MESSAGE_COUNT + "个单独确认消息,耗时" + (end - begin) + "ms");
  25. }

4.2.3批量发布确认

与单个发布确认相比,效率提高了
缺点是当发生问题时,不能精准定位问题

  1. /**
  2. * 批量
  3. * @throws Exception
  4. */
  5. public static void publishMessageBatch() throws Exception {
  6. try (Channel channel = RabbitMqUtils.getChannel())
  7. {
  8. String queueName = UUID.randomUUID().toString();
  9. channel.queueDeclare(queueName, false, false, false, null);
  10. //开启发布确认
  11. channel.confirmSelect();
  12. //批量确认消息大小
  13. int batchSize = 100;
  14. //未确认消息个数
  15. int outstandingMessageCount = 0;
  16. long begin = System.currentTimeMillis();
  17. for (int i = 0; i < MESSAGE_COUNT; i++)
  18. {String message = i + "";
  19. channel.basicPublish("", queueName, null, message.getBytes());
  20. outstandingMessageCount++;
  21. if (outstandingMessageCount == batchSize)
  22. {channel.waitForConfirms();
  23. outstandingMessageCount = 0;
  24. }
  25. }
  26. //为了确保还有剩余没有确认消息 再次确认
  27. if (outstandingMessageCount > 0)
  28. {channel.waitForConfirms();
  29. }
  30. long end = System.currentTimeMillis();
  31. System.out.println("发布" + MESSAGE_COUNT + "个批量确认消息,耗时" + (end - begin) +
  32. "ms");
  33. }
  34. }

4.2.4异步发布确认

异步确认虽然编程逻辑比上两个要复杂,但是性价比最高,无论是可靠性还是效率都没得说,他是利用回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功
image.png

4.2.5 如何处理异步未确认消息

最好的解决的解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列, 比如说用 ConcurrentLinkedQueue 这个队列在 confirm callbacks 与发布线程之间进行消息的传递。

4.2.6 3 种发布确认速度对比

单独发布消息
同步等待确认,简单,但吞吐量非常有限。
批量发布消息
批量同步等待确认,简单,合理的吞吐量,一旦出现问题但很难推断出是那条消息出现了问题。
异步处理
最佳性能和资源使用,在出现错误的情况下可以很好地控制,但是实现起来稍微难些