• 消息何去何从
    • #mandatory 参数">#mandatory 参数
    • #immediate 参数">#immediate 参数
    • #备份交换器">#备份交换器

    消息何去何从

    1. void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)
    2. throws IOException;
    3. Copied!

    1
    2

    mandatory 和 immediate 他们都有当消息传递过程中不可达目的地时将消息 返回给生产者 的功能。
    RabbitMQ 提供的 备份交换器(Alternate Exchange) 可以将 未能被交换器路由(没有绑定队列或没有匹配的队列) 的消息存储起来,而不用返回给客户端
    那么 mandatory 、immediate 、备份交换器 他们到底是什么?有啥区别?本章就来聊一聊

    #mandatory 参数

    结论:

    • true:找不到匹配的队列,会将消息返回给生产者
    • false:找不到匹配的队列,直接丢弃

    交换器无法根据自身的类型和路由键找到一个符合条件的队列,将消息返回给生产者(Basic.Return 命令),那么生产者可以通过添加 ReturnListenner 监听器来获取被退回的消息

    1. @Test
    2. public void returntest() throws IOException, TimeoutException, InterruptedException {
    3. final String EXCHANGE_NAME = "exchange_demo";
    4. final String ROUTING_KEY = "routingky_demo";
    5. final String QUEUE_NAME = "queue_demo";
    6. final Channel channel = connection.createChannel();
    7. // 创建一个 type=direct 持久化、非自动删除的交换器
    8. channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);
    9. // 创建一个:持久化、非排他的、非自动删除的队列
    10. channel.queueDeclare(QUEUE_NAME, true, false, false, null);
    11. // 将交换器与队列通过 路由键 绑定
    12. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
    13. channel.addReturnListener(new ReturnListener() {
    14. @Override
    15. public void handleReturn(int replyCode,
    16. String replyText,
    17. String exchange,
    18. String routingKey,
    19. AMQP.BasicProperties properties,
    20. byte[] body) throws IOException {
    21. System.out.println("replyCode=" + replyCode +
    22. " ; replyText=" + replyText +
    23. " ; exchange=" + exchange +
    24. " ; routingKey=" + routingKey
    25. );
    26. System.out.println("Basic.Return 返回未路由的消息:" + new String(body));
    27. }
    28. });
    29. // 发送的时候,没有指定 ROUTING_KEY
    30. channel.basicPublish(EXCHANGE_NAME,
    31. "",
    32. true,
    33. MessageProperties.PERSISTENT_TEXT_PLAIN,
    34. "mandatory test".getBytes()
    35. );
    36. TimeUnit.SECONDS.sleep(10);
    37. }
    38. Copied!

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40

    输出的信息为

    1. replyCode=312 ; replyText=NO_ROUTE ; exchange=exchange_demo ; routingKey=xx
    2. Basic.Return 返回未路由的消息:mandatory test
    3. Copied!

    1
    2

    在 AMQP 协议层面来说,过程如下
    image.png

    #immediate 参数

    结论:为 true 时:如果交换器在将消息路由到队列时,发现 队列上并不存在任何消费者,那么该 消息将不会存入队列中。当与路由键匹配的所有队列都没有消费者时,会通过 Basic.Return 返回给生产者。
    mandatory 与 immediate 在功能上区别:

    • mandatory :要求消息至少被路由到一个队列中,要么丢弃、要么返回给生产者
    • immediate:要求至少有一个订阅者,否则就返回给生产者。

    注意:RabbitMQ 3.+ 版本不再对此标记支持,官方解释:该参数会影响镜像队列的性能,增加代码复杂性。建议采用 TTL 和 DLX 的方法替代(TTL 和 DLX 后续讲解)

    #备份交换器

    Alternate Exchange 简称 AE,如果设置 mandatory = false ,未被路由的消息会被丢弃,设置为 mandatory = true 则需要生产者自己接收未被路由的消息。
    如果不想丢失消息,又不想自己立即接收处理这些消息,则可以使用这个备份交换器,在需要的时候再去处理 这些消息
    可以通过两种方式实现:

    1. 可以在声明交换器时,增加 alternate-exchange 参数实现
    2. 可以通过策略(Policy,后续章节讲解)

    如果两种方式同时使用,前者的优先级更高,会覆盖掉 Policy 的设置。

    1. // 定义备份交换器
    2. channel.exchangeDeclare("myAe", "fanout", true, false, null);
    3. channel.queueDeclare("unroutedQueue", true, false, false, null);
    4. channel.queueBind("unroutedQueue", "myAe", "");
    5. final HashMap<String, Object> arguments = new HashMap<>();
    6. arguments.put("alternate-exhcange", "myAe");
    7. // 使用备用交换器 myAe
    8. channel.exchangeDeclare("normalExchange", "direct", true, false, arguments);
    9. channel.queueDeclare("normalQueue", true, false, false, null);
    10. channel.queueBind"normalQueue", "normalExchange", ROUTING_KEY);
    11. Copied!

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11

    上面定义了两个交换器和队列,通过 alternate-exhcange 参数来指定哪一个是备用交换器;过程如下图所示
    image.png
    其实可以看到备用交换器和普通的交换器没有太大的区别。建议设置为 fanout 类型(广播到订阅到交换器的队列中),因为:消息被重新发送到备用交换器,路由键还存在,而 fanout 类型的交换器则会忽略路由键,能确保消息被进入到备用交换器的队列中。
    对于备份交换器,总结了以下几种特殊情况:

    • 如果设置的备份交换器不存在,客户端和 RabbitMQ 服务端都不会有异常出现,此时消息会丢失。
    • 如果备份交换器没有绑定任何队列,不会有异常,此时消息会丢失
    • 如果备份交换器没有任何匹配的队列,不会有异常,此时消息会丢失
    • 如果备份交换器与 mandatory 参数一起使用,那么该参数无效