~RabbitMQ - 图1

RabbitMQ发送端

发送端参数

void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException;

  • mandatory:
    • 当mandatory标志位设置为true时,如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,那么会调用basic.return方法将消息返回给生产者(Basic.Return + Content-Header + Content-Body);
    • 当mandatory设置为false时,出现上述情形broker会直接将消息扔掉
  • immediate:
    • true:当与消息routeKey关联的所有queue(一个或者多个)都没有消费者时,该消息会通过basic.return方法返还给生产者。

RabbitMQ之mandatory和immediate_朱小厮的博客-CSDN博客

RabbitMQ之消息确认机制(事务+Confirm)_朱小厮的博客-CSDN博客_rabbitmq消息确认机制
消息确认两种模式:

  1. 事务机制
  2. channel设置成confirm模式

    事务机制

    1. try {
    2. channel.txSelect();
    3. channel.basicPublish(exchange, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
    4. int result = 1 / 0;
    5. channel.txCommit();
    6. } catch (Exception e) {
    7. e.printStackTrace();
    8. channel.txRollback();
    9. }
    只有消息成功被broker接受,事务提交才能成功,否则我们便可以在捕获异常进行事务回滚操作同时进行消息重发

    conform模式

    1. 同步确认

    1. channel.confirmSelect();
    2. channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes());
    3. if(!channel.waitForConfirms()){
    4. System.out.println("send message failed.");
    5. }

    2. 同步批量确认

    1. channel.confirmSelect();
    2. for(int i=0;i<batchCount;i++){
    3. channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes());
    4. }
    5. if(!channel.waitForConfirms()){
    6. System.out.println("send message failed.");
    7. }

    3. 异步确认【ConfirmListener】

    ```java SortedSet confirmSet = Collections.synchronizedSortedSet(new TreeSet()); channel.confirmSelect(); channel.addConfirmListener(new ConfirmListener() { public void handleAck(long deliveryTag, boolean multiple) throws IOException {
    1. if (multiple) {
    2. confirmSet.headSet(deliveryTag + 1).clear();
    3. } else {
    4. confirmSet.remove(deliveryTag);
    5. }
    } public void handleNack(long deliveryTag, boolean multiple) throws IOException {
    1. System.out.println("Nack, SeqNo: " + deliveryTag + ", multiple: " + multiple);
    2. if (multiple) {
    3. confirmSet.headSet(deliveryTag + 1).clear();
    4. } else {
    5. confirmSet.remove(deliveryTag);
    6. }
    } });

while (true) { long nextSeqNo = channel.getNextPublishSeqNo(); channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes()); confirmSet.add(nextSeqNo); }

  1. <a name="E3ypl"></a>
  2. ### exchange路由确认【ReturnCallback】【mandatory】
  3. **【注意】exchange路由不到queue**,ConfirmCallback也会返回**ack=true**。同时也会调用**ReturnCallback(当mandatory设置为true时)**
  4. <a name="Dbk48"></a>
  5. ### RabbitTemplate消息确认
  6. ![image.png](https://cdn.nlark.com/yuque/0/2021/png/204517/1619344851618-2df5d4f9-e206-4c89-8129-c42b0fa81dec.png#height=164&id=X4led&margin=%5Bobject%20Object%5D&name=image.png&originHeight=327&originWidth=996&originalType=binary&ratio=1&size=168282&status=done&style=none&width=498)
  7. <a name="1QFZb"></a>
  8. #### PublisherConfirmType
  9. @Deprecated<br />public void setSimplePublisherConfirms(boolean simplePublisherConfirms) 简单设置bool的方法已过期。
  10. - NONE:是禁用发布确认模式,是**默认值**
  11. - CORRELATED:是发布消息成功到交换器后会触发回调方法
  12. - SIMPLE:经测试有两种效果,其一效果和CORRELATED值一样会触发回调方法,其二在发布消息成功后使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法等待broker节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie方法如果返回false则会关闭channel,则接下来无法发送消息到broker;
  13. [springboot rabbitmq属性配置spring.rabbitmq.publisher-confirm和spring.rabbitmq.publisher-confirm-type详解_明洋的专栏-CSDN博客](https://blog.csdn.net/yaomingyang/article/details/108410286)
  14. <a name="TMKsC"></a>
  15. #### PublisherReturns & mandatory优先级
  16. - spring.rabbitmq.template.**mandatory**属性的**优先级高于**spring.rabbitmq.**publisher-returns**的优先级
  17. - spring.rabbitmq.template.mandatory属性可能会返回三种值null、false、true,
  18. - spring.rabbitmq.template.mandatory结果为true、false时会忽略掉spring.rabbitmq.publisher-returns属性的值
  19. - spring.rabbitmq.template.mandatory结果为null(即不配置)时结果由spring.rabbitmq.publisher-returns确定
  20. [RabbitMQ学习笔记:mandatory、publisher-confirms、publisher-return属性区别_明洋的专栏-CSDN博客](https://mingyang.blog.csdn.net/article/details/106857104)
  21. <a name="hESp7"></a>
  22. #### 如何确认指定消息【correlation】
  23. 1. 设置PublisherConfirmType & PulisherReturns
  24. ![image.png](https://cdn.nlark.com/yuque/0/2021/png/204517/1619344851618-2df5d4f9-e206-4c89-8129-c42b0fa81dec.png#height=191&id=W6qA0&margin=%5Bobject%20Object%5D&name=image.png&originHeight=327&originWidth=996&originalType=binary&ratio=1&size=168282&status=done&style=none&width=581)
  25. 2. 发送设置correlationData
  26. ![image.png](https://cdn.nlark.com/yuque/0/2021/png/204517/1619346121020-d6815ae9-d36c-4551-a6c5-f02756d31d65.png#height=254&id=e4u2z&margin=%5Bobject%20Object%5D&name=image.png&originHeight=302&originWidth=710&originalType=binary&ratio=1&size=116308&status=done&style=none&width=597)
  27. 3. ConfirmCallback收到correlationData
  28. ![image.png](https://cdn.nlark.com/yuque/0/2021/png/204517/1619344009977-11a058b9-a1d5-4826-bd7f-389b2aef9e04.png#height=225&id=woq4o&margin=%5Bobject%20Object%5D&name=image.png&originHeight=381&originWidth=995&originalType=binary&ratio=1&size=198161&status=done&style=none&width=587.5)
  29. <a name="9VoKq"></a>
  30. ##
  31. <a name="1W7qf"></a>
  32. # Message
  33. <a name="8ZaUa"></a>
  34. ## Message属性
  35. message由payload和properties组成<br />常用属性:
  36. - delivery mode:是否持久化
  37. - headers:自定义
  38. ![image.png](https://cdn.nlark.com/yuque/0/2021/png/204517/1619451761278-1dfb49e8-99c7-4f09-90df-6c704a592ab2.png#height=194&id=hhiFD&margin=%5Bobject%20Object%5D&name=image.png&originHeight=387&originWidth=985&originalType=binary&ratio=1&size=286377&status=done&style=none&width=492.5)<br />[五、RabbitMQ的消息属性(读书笔记) - XuePeng77的个人空间 - OSCHINA - 中文开源技术交流社区](https://my.oschina.net/u/2450666/blog/3027667)
  39. <a name="6108da56"></a>
  40. ## Message消息状态
  41. ![image.png](https://cdn.nlark.com/yuque/0/2021/png/204517/1619331239638-91d77a4b-da3c-4910-98eb-379f50845645.png#height=61&id=yjuoJ&margin=%5Bobject%20Object%5D&name=image.png&originHeight=121&originWidth=586&originalType=binary&ratio=1&size=19229&status=done&style=none&width=293)<br />unacked:发送到consumer了。<br />ready:可消费consumer连接断开,**unacked变ready**。
  42. <a name="gPEFw"></a>
  43. # exchange
  44. 如果不指定exchange,使用默认exchange,**默认exchange绑定 RoutingKey 与队列名称相同。**
  45. <a name="rCeRp"></a>
  46. ## 四种Exchange:fanout,direct,topic,header
  47. - direct:routing key 与 bind key 匹配
  48. - fanout:会**忽略rountingkey**
  49. - topic:模式匹配rountingkey
  50. <a name="fGRUZ"></a>
  51. ## routing key & binding key
  52. - routing key为一个句点号“. ”分隔的字符串(我们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”
  53. - binding key与routing key一样也是句点号“. ”分隔的字符串
  54. - binding key中可以存在两种特殊字符“*”与“#”,用于做模糊匹配,其中“*”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)
  55. <a name="17OCE"></a>
  56. # queue
  57. ```java
  58. Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
  59. Map<String, Object> arguments) throws IOException;
  • durable:当RabbitMQ崩溃重启后 queue 仍然存在;
  • exclusive:排他队列,如果一个队列被声明为排他队列,该队列仅对首次申明它的连接可见,并在连接断开时自动删除。这里需要注意三点:
      1. 排他队列是基于连接可见的,同一连接的不同信道是可以同时访问同一连接创建的排他队列;
    • 2.“首次”,如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同;
    • 3.即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的,这种队列适用于一个客户端发送读取消息的应用场景。
  • autoDelete:最后一个监听被移除后queue自动删除。
  • arguments
    • x-message-ttl:消息过期时间
      • 队列中消息的过期时间,可以指定message过期时间也可以指定queue整体过期时间。
      • 索引如果第一个消息的延时时长很长,而第二个消息的延时时长很短,则第二个消息并不会优先得到执行
    • x-overflow:溢出策略
      • drop-head:删除头部,删除消息会进入死信队列
      • reject-publish:拒绝新的消息,拒绝新消息不进入死信队列
    • x-dead-letter-exchange:设置死信队列

【好文】一文带你搞定RabbitMQ延迟队列 - 弗兰克的猫 - 博客园

死信队列

image.png

死信消息

  • 消息被拒绝basic.reject、basic.nack并且requeue=false
  • ttl过期
  • 队列达到长度

DLX也是一个Exchange。
设置死信队列

  1. // 这就是一个普通的交换机 和 队列 以及路由
  2. String exchangeName = "test_dlx_exchange";
  3. String routingKey = "dlx.#";
  4. String queueName = "test_dlx_queue";
  5. channel.exchangeDeclare(exchangeName, "topic", true, false, null);
  6. Map<String, Object> agruments = new HashMap<String, Object>();
  7. agruments.put("x-dead-letter-exchange", "dlx.exchange");
  8. //这个agruments属性,要设置到声明队列上
  9. channel.queueDeclare(queueName, true, false, false, agruments);
  10. channel.queueBind(queueName, exchangeName, routingKey);
  11. //要进行死信队列的声明:
  12. channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
  13. channel.queueDeclare("dlx.queue", true, false, false, null);
  14. channel.queueBind("dlx.queue", "dlx.exchange", "#");

最终test_dlx_queue中的死信都发送到dlx.queue中了。

消费端

原始api:
image.png

消费端确认&重回队列

//手动发送ack
//deliveryTag是自增的,第二参数是是否批量ack
channel.basicAck(envelope.getDeliveryTag(),false);

//第三个参数是是否重回队列,一般不用。
channel.basicNack(envelope.getDeliveryTag() , false , true);

消费端限流qos

当消费端有一定数量的消息未被ack时,RabbitMQ不会推送新消息。
首先是在非自动确认的前提下。autoAck设置为false。
假设prefetch值设为10,共有两个consumer。意味着每个consumer每次会从queue中预抓取 10 条消息到本地缓存着等待消费。

  1. // void basicQos(int prefetchSize, int prefetchCount, boolean global)
  2. //允许通过总数prefetchSize=0 不限制(rabbitmq没实现)
  3. //prefetchCount每次最大发送
  4. //global=false 只设置到当前consumer,否则设置到整个Channel(rabbitmq没实现)
  5. channel.basicQos(0, 1, false);
  6. //autoAck=false
  7. channel.basicConsume(queueName, false, new MyConsumer(channel));

整合spring

rabbitmq/RabbitReceiver.java at master · liyaochu/rabbitmq

rabbitmq basicReject / basicNack / basicRecover区别

  1. //第二个参数是否requeue,true则重新入队列,否则丢弃或者进入死信队列。
  2. channel.BasicReject(result.DeliveryTag, false);
  3. //【批量】退回或删除,
  4. //参数2: 是否**批量** true是/false否 (也就是只一条)
  5. //参数3:是否requeue
  6. channel.BasicNack(result.DeliveryTag, true, true);
  7. //参数是是否requeue,true则重新入队列,并且尽可能的将之前recover的消息投递给其他消费者消费,而不是自己再次消费。false则消息会重新被投递给自己。
  8. //【不单单是拒绝的问题,是请求server重新发送,发送给谁由参数决定】
  9. channel.BasicRecover(true);

rabbitmq basicReject / basicNack / basicRecover区别_t0m的专栏-CSDN博客_basicnack

Spring RabbitMQ

SimpleMessageListenerContainer【接收消息】

  1. @Bean
  2. public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
  3. SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
  4. container.setConnectionFactory(connectionFactory);
  5. container.setQueueNames("zhihao.debug","zhihao.error","zhihao.info");
  6. container.setMessageListener((MessageListener) message -> {
  7. System.out.println("====接收到"+message.getMessageProperties().getConsumerQueue()+"队列的消息=====");
  8. System.out.println(message.getMessageProperties());
  9. System.out.println(new String(message.getBody()));
  10. });
  11. return container;
  12. }

~RabbitMQ - 图4
【RabbitMQ分析】01 SimpleMessageListenerContainer原理分析_god_86的专栏-CSDN博客

MessageListenerAdapter

进一步,将MessageListenerAdapter设置到SimpleMessageListenerContainer

  1. @Bean
  2. public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
  3. SimpleMessageListenerContainer messageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
  4. messageListenerContainer.setQueues(topicQueue(), topicQueueAgain());
  5. messageListenerContainer.setConcurrentConsumers(1);
  6. messageListenerContainer.setMaxConcurrentConsumers(5);
  7. messageListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);
  8. messageListenerContainer.setConsumerTagStrategy(queue -> queue + "_" + UUID.randomUUID().toString());
  9. MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(new MessageDelegate());
  10. messageListenerContainer.setMessageListener(listenerAdapter);
  11. return messageListenerContainer;
  12. }

MessageConverter 【类型转换】

  • JavaTypeMapper设置到MessageConverter中【JavaTypeMapper不好用】
    • 反序列化时要求发送的类和接受的类完全一样(字段,类名,包路径)。
  • MessageConverter设置到MessageListenerAdapter中
  • MessageListenerAdapter设置到messageListenerContainer中。 ```java MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate()); adapter.setDefaultListenerMethod(“consumeMessage”);

Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(); DefaultJackson2JavaTypeMapper defaultJackson2JavaTypeMapper = new DefaultJackson2JavaTypeMapper(); jackson2JsonMessageConverter.setJavaTypeMapper(defaultJackson2JavaTypeMapper); adapter.setMessageConverter(jackson2JsonMessageConverter);

  1. - **可以直接设置ClassMapper**
  2. - [[RabbitMQ]Jackson2JsonMessageConverter转换实体类常的问题_Lycode-CSDN博客](https://blog.csdn.net/qq_31897023/article/details/103875594)
  3. ![image.png](https://cdn.nlark.com/yuque/0/2021/png/204517/1619365887447-c2916a82-1954-4688-94da-88881231dfdf.png#height=186&id=OwfJS&margin=%5Bobject%20Object%5D&name=image.png&originHeight=300&originWidth=705&originalType=binary&ratio=1&size=60703&status=done&style=none&width=436)![image.png](https://cdn.nlark.com/yuque/0/2021/png/204517/1619366404678-fda2d4f2-95ae-4124-ba2e-e6ffe597ec58.png#height=445&id=BV9Yj&margin=%5Bobject%20Object%5D&name=image.png&originHeight=889&originWidth=1126&originalType=binary&ratio=1&size=124430&status=done&style=none&width=563)
  4. <a name="M5ruH"></a>
  5. # RabbitMQ集群
  6. 1. 普通模式
  7. 1. 镜像模式
  8. <a name="pOiyX"></a>
  9. ## 普通模式
  10. 默认共享元数据,不共享队列数据。<br />![](https://cdn.nlark.com/yuque/0/2021/jpeg/204517/1619408290842-69a510a2-262e-458b-9847-648fc6e02335.jpeg#height=343&id=DZjdL&originHeight=413&originWidth=539&originalType=binary&ratio=1&size=0&status=done&style=none&width=447)
  11. <a name="89pWR"></a>
  12. ## 镜像队列
  13. 共享数据<br />![](https://cdn.nlark.com/yuque/0/2021/png/204517/1619409278123-b0dfe62c-a159-428f-9840-1012421d8c60.png#height=365&id=H5xIf&originHeight=674&originWidth=758&originalType=binary&ratio=1&size=0&status=done&style=none&width=411)

$ rabbitmqctl set_policy [-p Vhost] Name Pattern Definition [Priority] -p Vhost: 可选参数,针对指定vhost下的queue进行设置 Name: policy的名称 Pattern: queue的匹配模式(正则表达式) Definition: 镜像定义,包括三个部分ha-mode, ha-params, ha-sync-mode ha-mode: 指明镜像队列的模式,有效值为 all/exactly/nodes all: 表示在集群中所有的节点上进行镜像 exactly: 表示在指定个数的节点上进行镜像,节点的个数由ha-params指定 nodes: 表示在指定的节点上进行镜像,节点名称通过ha-params指定 ha-params: ha-mode模式需要用到的参数 ha-sync-mode: 进行队列中消息的同步方式,有效值为automatic和manual priority: 可选参数,policy的优先级 ``` RabbitMQ集群原理与部署

高可用

客户端负载均衡

image.png
官方不建议使用

A client can connect as normal to any node within a cluster. If that node fails and the rest of the cluster survives, then the client should notice the closed connection, and should be able to reconnect to some surviving member of the cluster. Generally, it’s not advisable to bake in node hostnames or IP addresses into client applications: this introduces inflexibility and will require client applications to be edited, recompiled and redeployed should the configuration of the cluster change or the number of nodes in the cluster change. Instead, we recommend a more abstracted approach: this could be a dynamic DNS service which has a very short TTL configuration, or a plain TCP load balancer, or some sort of mobile IP achieved with pacemaker or similar technologies. In general, this aspect of managing the connection to nodes within a cluster is beyond the scope of RabbitMQ itself, and we recommend the use of other technologies designed specifically to solve these problems.

Load-balanced connection between Spring AMQP java client and nodes of RabbitMQ cluster? - Stack Overflow

服务端负载均衡

~RabbitMQ - 图6
Haproxy轮询

RabbitMQ集群间通信

  • Federation

image.png
image.png

  • Shovel:使用简单

image.png
image.png

可靠消息

image.png