• 生产者确认
    • #事物机制">#事物机制
    • #发送方确认机制">#发送方确认机制
      • #同步方式">#同步方式
      • #异步方式">#异步方式

    生产者确认

    小生产者将消息发送出去之后,消息到底有没有正确到达服务器?如果不进行特殊配置,默认情况下发送消息的操作是不会返回任何信息给生产者的。也就是说默认情况下,生产者不知道消息是否正确到达服务器。
    RabbitMQ 针对这个问题,提供了两种解决方式:

    • 事物机制
    • 发送方确认(publisher confirm)机制

      #事物机制

      RabbitMQ 客户端中与事物机制相关的方法有三个:

    • channel.txSelect:将当前的信道设置为 事物模式

    • channel.txCommit:提交事物
    • channel.txRollback:回滚事物

    开启事物(设置为事物模式)后,可以发送消息,然后提交事物,如果在提交事物前 RabbitMQ 异常崩溃或则其他原因抛出异常,可以将其捕获,然后再执行回滚事物。
    这里事物与数据库中的事物概念并不同。

    1. // 开启事物
    2. channel.txSelect();
    3. // 发送消息
    4. channel.basicPublish("", reqestQueue, properties, "message".getBytes());
    5. // 提交事物
    6. channel.txCommit();
    7. Copied!

    1
    2
    3
    4
    5
    6

    AMQP 协议流转过程如下图所示
    image.png

    1. 客户端发送 Tx.Select 开启事物
    2. Broker 回复 Tx.Select-Ok,确认事物开启
    3. 发送消息后,客户端 Tx.Commit 提交事物
    4. Broker 回复 Tx.Commit-Ok ,确认事物提交

    事物回滚的代码则如下所示

    1. try {
    2. channel.txSelect();
    3. channel.basicPublish("", reqestQueue, properties, "message".getBytes());
    4. // 故意制造异常
    5. int result = 1 / 0;
    6. channel.txCommit();
    7. } catch (Exception e) {
    8. // 回滚事物1
    9. channel.txRollback();
    10. }
    11. }
    12. Copied!

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13

    交互流程如下
    image.png
    如果要发送多条消息,则如下做

    1. channel.txSelect();
    2. for (int i = 0; i < 10; i++) {
    3. try {
    4. channel.basicPublish("", reqestQueue, properties, "message".getBytes());
    5. channel.txCommit();
    6. } catch (Exception e) {
    7. channel.txRollback();
    8. }
    9. }
    10. Copied!

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10

    对于这里笔者其实有点懵,事物模式什么时候退出?这里的 select 并不是和 commit 是一一对应的。
    事物机制解决了 消息发送方 和 RabbitMQ 之间消息确认的问题,只有消息成功被 RabbitMQ 接收,事物才能提交成功。但是事物机制对 RabbitMQ 的性能影响很大。所以提供了一个改进机制:发送方确认机制

    #发送方确认机制

    在 AMQP 协议层面提供了事物机制解决消息是否有正确到达 RabbitMQ 这个问题,但是事物机制会严重降低 RabbitMQ 的消息吞吐量。
    发送方确认机制(publisher confirm)是一种轻量级的方式。
    生产者将信道设置为 confirm(确认)模式,所有再该信道上 发布的消息 都会被 指派一个唯一的 ID(从 1 开始),当消息被匹配到队列后,RabbitMQ 会 发送一个确认(Basic.ack)给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达了目的地了。如果消息和队列是 可持久化的,那么确认消息会在 消息写入磁盘后 发出。
    RabbitMQ 回传给生产者的确认消息中的 deliveryTag 包含了确认消息的序号,此外还可以设置 channel.basicAck 方法中的 multple 参数,表示在这个序号之前的所有消息都已经得到了处理。
    image.png
    如上图所示:生产者发送了 3 条消息,开启了 mutilple,那么 RabbitMQ 回调确认时,deliveryTag = 3,表示前面的 2 条消息,包含自己都已经正确到达 RabbitMQ 了。

    • 事物机制:发送方是同步的发送一条消息后,发送端阻塞,等待 RabbitMQ 回应后,才能继续下一条消息的发送
    • 发送方确认机制:可以是异步的

    发送方确认有两种方式:

    • 同步方式:发送消息后,调用 channel.waitForConfirms 方法,等待服务器的确认返回
    • 异步方式:提供一个回调方法,服务端确认了一条或多条消息后客户端会收到回调事件

      #同步方式

      与事物机制使用类似,不过可以发布多条消息,再等待确认
    1. // 将信道设置为生产者确认模式
    2. channel.confirmSelect();
    3. for (int i = 0; i < 10; i++) {
    4. // 发布消息
    5. channel.basicPublish("", reqestQueue, properties, "message".getBytes());
    6. channel.basicPublish("", reqestQueue, properties, "message".getBytes());
    7. channel.basicPublish("", reqestQueue, properties, "message".getBytes());
    8. channel.basicPublish("", reqestQueue, properties, "message".getBytes());
    9. channel.basicPublish("", reqestQueue, properties, "message".getBytes());
    10. try {
    11. if (channel.waitForConfirms()) {
    12. // 发送的 5 条消息成功
    13. } else {
    14. // 发送失败,这一批消息都需要重发?
    15. // 不清楚他的机制,全部失败?
    16. }
    17. } catch (InterruptedException e) {
    18. e.printStackTrace();
    19. // 发送失败,这一批消息都需要重发?
    20. }
    21. }
    22. Copied!

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23

    可以看到如上的代码,批量发送后再等待并确认发送成功。但是有一个问题,当失败时(Basic.Nack 或 超时)时,就需要将这一批消息重新发送;这种情况过多的时候,性能不升返降

    #异步方式

    通过添加 ConfirmListener 的方式,来知晓是否成功还是失败

    1. final Channel channel = connection.createChannel();
    2. // 将信道设置为生产者确认模式
    3. channel.confirmSelect();
    4. channel.addConfirmListener(new ConfirmListener() {
    5. /**
    6. * 处理 RabbitMQ 的 Basic.Ack 指令
    7. * @param deliveryTag 那一条消息
    8. * @param multiple
    9. * @throws IOException
    10. */
    11. @Override
    12. public void handleAck(long deliveryTag, boolean multiple) throws IOException {
    13. System.out.println("发送成功");
    14. }
    15. /**
    16. * 处理 RabbitMQ 的 Basic.Nack 指令
    17. * @param deliveryTag
    18. * @param multiple
    19. * @throws IOException
    20. */
    21. @Override
    22. public void handleNack(long deliveryTag, boolean multiple) throws IOException {
    23. System.out.println("发送失败");
    24. // 可进行消息的重发处理
    25. }
    26. });
    27. for (int i = 0; i < 10; i++) {
    28. // 发布消息
    29. channel.basicPublish("", reqestQueue, properties, "message".getBytes());
    30. channel.basicPublish("", reqestQueue, properties, "message".getBytes());
    31. channel.basicPublish("", reqestQueue, properties, "message".getBytes());
    32. channel.basicPublish("", reqestQueue, properties, "message".getBytes());
    33. channel.basicPublish("", reqestQueue, properties, "message".getBytes());
    34. }
    35. Copied!

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37

    在性能方面,QPS 对比如下
    image.png
    普通 confirm 就是发送一条就 waitForConfirms 一次。可见异步方式效率是最高的