RabbitMQ - 图1

  • rabbitmq是放入queue之前逻辑复杂(所有连接到queue的都消费)
  • kafka是从queue(partition)消费逻辑复杂(消费者组来决定哪些节点消费哪)

    Exchange

    如果不指定exchange,使用默认exchange,默认exchange绑定 RoutingKey 与队列名称相同。

    四种Exchange:fanout,direct,topic,header

  • direct:routing key 与 bind key 匹配

  • fanout:会忽略rountingkey
  • topic:模式匹配rountingkey

    routing key & binding key

  • routing key为一个句点号“. ”分隔的字符串(我们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”

  • binding key与routing key一样也是句点号“. ”分隔的字符串
  • binding key中可以存在两种特殊字符“”与“#”,用于做模糊匹配,其中“”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)

    RabbitMQ 6种模式

    RabbitMQ Tutorials — RabbitMQ

    工作队列模式(默认exchange)

    image.png
    image.png

    发布订阅模式(fanout exchange分发)

    image.png

    路由模式(direct exchange路由)

    image.png

    主题模式(topic exchange模糊路由)

    image.png

    rpc模式

    image.png

    发送端

    同步确认模式

    ```java //将channel设置为confirm模式 channel.confirmSelect();

//发送数据 for(int i=0;i<batchCount;i++){ channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes()); }

//同步等待 if(!channel.waitForConfirms()){ System.out.println(“send message failed.”); }

  1. <a name="Ami5Z"></a>
  2. ## 异步确认模式【ConfirmListener】
  3. 不推荐?推荐使用单条的消息确认模式。
  4. ```java
  5. SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
  6. channel.confirmSelect();
  7. //异步回调
  8. channel.addConfirmListener(new ConfirmListener() {
  9. //deliveryTag:消息编号,多个发送者会重复有并发问题
  10. //multiple:是否连带前面的确认
  11. //回调和主方法不在一个线程执行
  12. public void handleAck(long deliveryTag, boolean multiple) throws IOException {
  13. if (multiple) {
  14. confirmSet.headSet(deliveryTag + 1).clear();
  15. } else {
  16. confirmSet.remove(deliveryTag);
  17. }
  18. }
  19. public void handleNack(long deliveryTag, boolean multiple) throws IOException {
  20. System.out.println("Nack, SeqNo: " + deliveryTag + ", multiple: " + multiple);
  21. if (multiple) {
  22. confirmSet.headSet(deliveryTag + 1).clear();
  23. } else {
  24. confirmSet.remove(deliveryTag);
  25. }
  26. }
  27. });
  28. while (true) {
  29. long nextSeqNo = channel.getNextPublishSeqNo();
  30. channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes());
  31. confirmSet.add(nextSeqNo);
  32. }

exchange路由确认【ReturnCallback】【mandatory】

【注意】exchange路由不到queue(并不是网络问题,重试没有意义,需要人工介入)ConfirmCallback也会返回ack=true。同时也会调用ReturnCallback(当mandatory设置为true时)

  • ConfirmCallback 只确认是否正确到达 Exchange 中


void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException;

  • mandatory【没有queue】
    • 当mandatory标志位设置为true时,如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,那么会调用basic.return方法将消息返回给生产者(Basic.Return + Content-Header + Content-Body);
    • 当mandatory设置为false时,出现上述情形broker会直接将消息扔掉
  • immediate【没有消费者】
    • true:当与消息routeKey关联的所有queue(一个或者多个)都没有消费者时,该消息会通过basic.return方法返还给生产者。

RabbitMQ之mandatory和immediate_朱小厮的博客-CSDN博客

RabbitTemplate发送消息确认:correlation

  1. 设置PublisherConfirmType & PulisherReturns

image.png

  1. 发送设置correlationData(设置id)

image.png

  1. ConfirmCallback收到correlationData

image.png

消费端

消费端确认 & 重回队列

//手动发送ack
//deliveryTag是自增的,第二参数是是否批量ack
channel.basicAck(envelope.getDeliveryTag(),false);

//第三个参数是是否重回队列,一般不用。
channel.basicNack(envelope.getDeliveryTag() , false , true);

消费端限流qos

当消费端有一定数量的消息未被ack时,RabbitMQ不会推送新消息。
例如,有两个消费者,MQ一股脑将消息推送给一个消费者。另一个没事做。
image.png
首先是在非自动确认的前提下。autoAck设置为false。

  1. // void basicQos(int prefetchSize, int prefetchCount, boolean global)
  2. //允许通过总数prefetchSize=0 不限制(rabbitmq没实现)
  3. //prefetchCount每次最大发送
  4. //global=false 只设置到当前consumer,否则设置到整个Channel(rabbitmq没实现)
  5. channel.basicQos(0, 1, false);
  6. //autoAck=false
  7. channel.basicConsume(queueName, false, new MyConsumer(channel));

basicReject / basicNack / basicRecover区别

//第二个参数是否requeue,true则重新入队列,否则丢弃或者进入死信队列。 channel.BasicReject(result.DeliveryTag, false);

//【批量】退回或删除, //参数2: 是否批量确认 true是/false否 (也就是只一条) //参数3:是否requeue channel.BasicNack(result.DeliveryTag, true, true);

//参数是是否requeue,true则重新入队列,并且尽可能的将之前recover的消息投递给其他消费者消费,而不是自己再次消费。false则消息会重新被投递给自己。 //【不单单是拒绝的问题,是请求MQ重新发送,发送给谁由参数决定】 channel.BasicRecover(true);

rabbitmq basicReject / basicNack / basicRecover区别_t0m的专栏-CSDN博客_basicnack

Queue

死信队列

image.png
设置死信队列

  1. // 这就是一个普通的交换机 和 队列 以及路由
  2. String exchangeName = "test_dlx_exchange";
  3. String routingKey = "dlx.#";
  4. String queueName = "test_dlx_queue";
  5. channel.exchangeDeclare(exchangeName, "topic", true, false, null);
  6. Map<String, Object> agruments = new HashMap<String, Object>();
  7. agruments.put("x-dead-letter-exchange", "dlx.exchange");
  8. //这个agruments属性,要设置到声明队列上
  9. channel.queueDeclare(queueName, true, false, false, agruments);
  10. channel.queueBind(queueName, exchangeName, routingKey);
  11. //要进行死信队列的声明:
  12. channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
  13. channel.queueDeclare("dlx.queue", true, false, false, null);
  14. channel.queueBind("dlx.queue", "dlx.exchange", "#");

最终test_dlx_queue中的死信都发送到dlx.queue中了。

死信消息

  • 消息被拒绝basic.reject、basic.nack并且requeue=false
  • ttl过期
  • 队列达到长度

    Spring RabbitMQ

    消费端

    SimpleMessageListenerContainer

    手动确认

    image.png

  • 这种方式不方便,不建议使用

    消费端限流

    image.png

    MessaListenerAdapter【自定义 队列-执行方法】

    image.png

    MessageConverter

    image.png

    @RabbitListener

    配合配置
    image.png

image.png
或直接放在方法上
image.png

@RabbitListener绑定exchange&queue

image.png