2022年10月24日11:40:56

前言

关于队列的声明,如果使用同一套参数进行声明了,就不能再使用其他参数来声明此队列了,要么删除该队列,可以使用命令行删除也可以在RabbitMQ Management上删除,要么给队列重新起一个名字。


1、autoDelete自动删除属性

在RabbitMQ中声明交换器、消息队列时,可设置autoDelete属性(表示是否自动删除?)。如下所示:
image.png
如果autoDelete=true,表示自动删除,此处我们要理解,自动删除的条件是什么?
这里的关键是,自动删除的条件是向后的。RabbitMQ的整个消息流是:生产者发布消息和routingKey—> 经由交换器,exchange再依据routingKey —> 到达指定的消息队列 —> 消息队列推送消息到已经订阅此消息的消费者。

再来解释 “ 自动删除的条件是向后的 ” 这句话:
对于exchange交换器,向前是生产者发布的消息和routingKey,这不能作为exchange自动删除的条件。exchange向后是绑定另一个交换器或 绑定消息队列,这就是exchange交换器删除的条件。
总结:exchange自动删除的条件,有队列或者交换器绑定了本交换器,然后当所有队列或交换器都与本交换器解除绑定,且autoDelete=true时,此交换器就会被自动删除。
如何解除本交换机和交换机或队列绑定?
直接调用Channel对象中的queueUnbind方法即可完成解绑操作,代码如下:
image.png

  • queue 表示队列名称;
  • exchange 表示本交换机名称;
  • routingKey 表示队列与交换机绑定时的routingKey值,这个值一定要正确,否则会解绑失败

举个栗子:

  1. channel.queueUnbind("queueName", "exchangeName","routingKey");

———————————-
  对于消息队列,向前是与exchange的绑定关系,这不能作为队列自动删除的条件。队列向后是被消费者订阅。这就是队列删除的条件。
总结:消息队列自动删除的条件,有消息者订阅本队列,然后所有消费者都解除订阅此队列(即消费者断开连接或宕机),且autoDelete=true时,此队列会自动删除,即使此队列中还有消息。
———————————————————————————————————————————————————— :::info 在Queue中: 当监听此消息队列的所有消费者客户端断开连接后(或当没有任何消费者监听此队列时),是否自动删除 此消息队列? true:删除 ,false:不删除。
注意:是所有消费者都断开连接或宕机,只要有一个消费者还正常连接,即使设置为true,也不会删除此队列。
比如:
当Queue中的 autoDelete 属性被设置为true时,当消费者断开连接或宕机后,则会自动删除此消息队列【即使此队列中还有消息,即消息也一并删除了】。而后如果生产者还在一直发送消息,那么当消费者重新启动恢复正常后,会接收到最新的消息,而宕机期间的消息则会丢失
当Quere中的 autoDelete 属性被设置为false时,当消费者断开连接或宕机后,不会删除此消息队列。而后如果生产者还在一直发送消息,那么当消费者重新启动恢复正常后,会接收到包括宕机期间的消息,消息不会丢失。
所以个人建议:Queue中的 autoDelete属性设置为false。
————————————————————————————————————————————
在Exchange中:当所有队列或交换器都与本交换器解除绑定,是否自动 删除本交换机? true:删除, false:不删除。
注意:是所有队列或交换器,只要有一个队列或交换机还正常绑定,即使设置为true,也不会删除此交换机。 :::


2、exclusive排它性属性

[ɪkˈskluːsɪv],单词释义:独占的、排它的。
true时表示:只允许有一个消费者监听此消息 队列 ,且当一个消息队列绑定了一个交换机,那么此队列再绑定其他交换机,也会抛出异常


3、durable交换机、队列持久化

[ˈdjʊərəbl] ,单词释义:耐用的、持久的。交换机、消息队列都有该属性
durable: 交换机、消息队列是否持久化?

3.1、队列持久化

  1. 消息队列默认是存放到内存中的,如果[rabbitmq](https://so.csdn.net/so/search?q=rabbitmq&spm=1001.2101.3001.7020) broker服务器重启,消息队列会消失(可以通过rabbitmqctl stop_app关闭服务器,然后rabbitmqctl start_app重启服务器)。<br />如果想重启之后,消息队列还存在,就要使此消息队列持久化(即设置durable=true),即队列会保存到Erlang自带的Mnesia数据库中,当rabbitmq重启之后会自动读取该数据库。可以登录RabbitMQ Management—> Queues中可以看到之前声明的队列还存在。<br />原生API实现方式:
  1. boolean durable = true;
  2. channel.queueDeclare(QUEUE_NAME, durable, false, false, arguments);

3.2、exchange的持久化

在申明exchange的时候,有个参数:durable。当该参数为true,则对该exchange做持久化,重启rabbitmq服务器后,该exchange不会消失。durable的默认值为true

小结:建议交换机和队列的durable,都设置为true。


4、消息持久化(区别于durable)

在生产环境中偶尔会由于一些原因导致 RabbitMQ 重启,在 RabbitMQ 重启期间,生产者消息投递失败,会 导致消息的丢失,需要手动处理和恢复。队列和交互机可以进行持久化操作,即rabbitmq重启不会导致队列和交换机的丢失。那么同样,我们也需要对消息进行持久化操作,保证消息的可靠性。
如果要设置消息持久化必须先设置队列持久化不然队列不持久化,而消息设置了持久化,队列都不存在了,消息存在还有什么意义。消息持久化需要将交换机持久化、队列持久化、消息持久化,才能最终达到消息持久化的目的。

4.1、如何实现消息持久化?

方式一:设置deliveryMode=2(1: 非持久化 2:持久化)

  1. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
  2. channel.queueDeclare(QUEUE_NAME, true, false, false, arguments);
  3. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
  4. String message = "Hello RabbitMQ: ";
  5. // 设置消息持久化
  6. AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties().builder();
  7. properties.deliveryMode(2);
  8. channel.basicPublish(EXCHANGE_NAME, "", properties.build(), message.getBytes("UTF-8"));

方式二:设置BasicProperties为MessageProperties.PERSISTENT_TEXT_PLAIN

  1. channel.queueDeclare(QUEUE_NAME, true, false, false, arguments);
  2. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
  3. String message = "Hello RabbitMQ: ";
  4. channel.basicPublish(EXCHANGE_NAME, "", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"))

5、arguments参数设置

arguments [ˈɑːgjʊmənts] 争论、参数,主要用来设置消息队列中的消息的一些属性。
可参考博客:https://blog.csdn.net/fsgsggd/article/details/81349553

5.1、Message TTL(x-message-ttl)

设置整个队列的所有消息的生命周期;也可以在生产者发布消息时,单独为某个消息指定剩余生存时间(单位毫秒);类似于redis中的ttl,生存时间到了,消息会被从队列中删除,注意是消息被删除,而不是队列被删除。
特性Features=TTL。
示列1:原生API
单独为某条消息设置过期时间:6秒,如下所示:

  1. AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties().builder().expiration(“6000”);
  2. channel.basicPublish(EXCHANGE_NAME, “”, properties.build(), message.getBytes(“UTF-8”));

示列2:springboot整合rabbitmq,队列中的所有消息都有同样的过期时间。
声明队列时需要设置x-message-ttl属性,并设置过期时间,推送到该队列中的所有消息,都会有一个10秒后过期的属性。
设置前,先去RabbitMQ UI管理面板中删除该队列,否则启动项目时报错。
示列如下所示:

  1. @Configuration
  2. public class RabbitConfig {
  3. //Direct交换机
  4. @Bean
  5. DirectExchange DirectExchange() {
  6. return new DirectExchange("DirectExchange", true, false);
  7. }
  8. //队列DirectQueue
  9. @Bean
  10. public Queue DirectQueue() {
  11. Map<String, Object> arguments = new HashMap<>();
  12. // 队列中的消息未被消费20秒过期
  13. arguments .put("x-message-ttl", 10000);
  14. return new Queue("DirectQueue", true, false, false, arguments );
  15. }
  16. //将队列和交换机绑定
  17. @Bean
  18. Binding bindingDirect() {
  19. return BindingBuilder.bind(DirectQueue()).to(DirectExchange()).with("DirectRouting");
  20. }
  21. }

上面参数的布尔值参考下图:
image.png
可以看到设置成功后:该对列出现了TTL属性。
image.png
点击该队列:看到时间为10s了,如下所示:
image.png
示列3:springboot整合rabbitmq,指定某条消息的过期时间:

  1. @RestController
  2. public class SendMessageController {
  3. @Autowired
  4. RabbitTemplate rabbitTemplate;
  5. @GetMapping("/sendMessage")
  6. public void sendMessage() {
  7. MessageProperties messageProperties = new MessageProperties();
  8. //设置过期时间
  9. messageProperties.setExpiration("10000");
  10. //这个参数是用来做消息的唯一标识
  11. //发布消息时使用,存储在消息的headers中(即消息头中)
  12. CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
  13. User user = new User(1, "张三" + 1);
  14. Message message = new Message(JSON.toJSONString(user).getBytes(StandardCharsets.UTF_8), messageProperties);
  15. rabbitTemplate.convertAndSend("DirectExchange", "DirectRoutingKey", message, correlationData);
  16. }
  17. }

:::info 注意🎃:
RabbitMQ只会过期淘汰队列头部的消息。如果单独给一条消息设置ttl,先入队列的消息过期时间如果设置比较长,后入队列的设置时间比较短,那么消息就不会及时的被淘汰,会导致消息的堆积问题。 :::

5.2、Auto Expire(x-expires)

队列生存周期。当队列在指定的时间没有被访问(consume, basicGet, queueDeclare…),队列就会被删除。
Features=Exp。

5.3、Max Length(x-max-length)

限定队列可以容纳的消息的最大条数,超过指定条数将会把最早的几条删除掉, 类似于mongodb中的固定集合,例如保存最新的100条消息,。
Feature=Lim。

5.4、Max Length Bytes(x-max-length-bytes)

限定队列最大占用的空间大小, 一般受限于内存、磁盘的大小,。
Features=Lim B。

5.5、Dead letter exchange(x-dead-letter-exchange)

当队列消息长度大于最大长度、或者消息过期的等,将从队列中删除的消息推送到指定的交换机中去。而不是直接丢弃掉消息。
Features=DLX。

5.6、Dead letter routing key(x-dead-letter-routing-key)

将删除的消息推送到指定交换机的指定路由键的消息队列中去。
Feature=DLK。

5.7、Maximum priority(x-max-priority)

优先级队列,声明队列时先定义最大优先级值(定义最大值一般不要太大),在发布消息的时候指定该消息的优先级, 优先级更高(数值更大的)的消息先被消费。
支持消息优先级的MQ有Beanstalkd、RabbitMQ(版本3.5.0)。支持0-255个优先级,但建议0-10个优先级。优先级对队列使用时,优先级越多,对cpu和内存使用会增加。
示列1:

  1. @Configuration
  2. public class DirectRabbitConfig {
  3. //队列
  4. @Bean
  5. public Queue DirectQueue() {
  6. Map<String, Object> map = new HashMap<>();
  7. //一般设置0到10;数字越大,优先级越高
  8. map.put("x-max-priority", 10);
  9. return new Queue("DirectQueue", true, false, false, map);
  10. }
  11. //Direct交换机
  12. @Bean
  13. DirectExchange DirectExchange() {
  14. return new DirectExchange("DirectExchange", true, false);
  15. }
  16. //将队列和交换机绑定
  17. @Bean
  18. Binding bindingDirect() {
  19. return BindingBuilder.bind(DirectQueue()).to(DirectExchange()).with("DirectRouting");
  20. }
  21. }

5.8、Lazy mode(x-queue-mode=lazy)

Lazy Queues: 先将消息保存到磁盘上,不放在内存中,当消费者开始消费的时候,才加载到内存中。
如果未设置,则队列将保留消息在内存缓存,以尽可能快地传递消息。

5.9、Master locator(x-queue-master-locator)

集群相关设置,将队列设置为主位置模式,确定在节点集群上声明时,队列主位置所依据的规则。


6、Mandatory

mandatory是在发送一条Basic.Publish RPC命令时一起传递的参数,告诉RabbitMq如果消息不可路由,应该通过Basic.Return RPC命令将消息返回给发布者。
具体执行流程如下图所示:
image.png
设置mandatory开启了消息故障检测模式后,它只会让RabbitMq向你通知失败,而不会通知成功。
进一步,当交换器根据自身和路由键无法找到合适的队列时,当mandatory设置为true,会通过Basic.Return RPC命令将消息返回给发布者;当mandatory设置为false时,该消息被直接丢弃。
那么,发布者如何得到消息发布失败后的Basic.Return的内容呢,通常会调用channel.addReturnListener()回调方法。
方式1:原生API
image.png
方式2:springboot+整个RabbitMQ中,用ConfirmBack+ReturnBack就行。


7、RabbitMQ发送消息,附带BasicProperties属性介绍

7.1、BasicPropertie属性字段详解:

  • contentType:消息的内容类型,如:text/plain
  • contentEncoding:消息内容编码
  • headers:设置消息的header,类型为Map
  • deliveryMode:1(nopersistent)非持久化,2(persistent)持久化
  • priority:消息的优先级
  • correlationId:关联ID
  • replyTo:用于指定回复的队列的名称
  • expiration:消息的失效时间
  • messageId:消息ID
  • timestamp:消息的时间戳
  • type:类型
  • userId:用户ID
  • appId:应用程序ID
  • custerId:集群ID

7.2、BasicProperties使用详解
方式1:原生API,MessageProperties类默认提供了6种不同默认值的BasicProperties属性值对象。
生产者发送消息:

  1. AMQP.BasicProperties.Builder properties = MessageProperties.PERSISTENT_TEXT_PLAIN.builder();
  2. properties.messageId("消息ID");
  3. properties.deliveryMode(2);
  4. /**
  5. * 发布消息
  6. * 发布到不存在的交换器将导致信道级协议异常,该协议关闭信道,
  7. * exchange: 要将消息发送到的交换器
  8. * routingKey: 路由KEY
  9. * props: 消息的其它属性,如:路由头等
  10. * body: 消息体
  11. */
  12. channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, properties.build(), message.getBytes());

消费者接收消息:

  1. DeliverCallback deliverCallback = (consumerTag, delivery)->{
  2. String message = new String(delivery.getBody(), "UTF-8");
  3. System.out.println(" [x] Received '" + message);
  4. BasicProperties properties = delivery.getProperties();
  5. System.out.println("deliveryMode:"+properties.getDeliveryMode()
  6. +"-消息ID"+properties.getMessageId());
  7. };
  8. channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
  9. System.out.println("调用"+consumerTag);
  10. });

方式2:springboot整个rabbitmq:这个我个人觉得是在sendandcovent生产者发布消息的业务代码中去写。

  1. Map<String, Object> headers = new HashMap<String, Object>();
  2. headers.put("arg1", "aaa");
  3. AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
  4. .deliveryMode(2) // //持久化存在
  5. .contentEncoding("UTF-8") // 编码方式
  6. .expiration("10000") //过期时间10秒,10秒后如果没有被消费,消息会自动清除
  7. .headers(headers) //自定义属性,方便消费者取用
  8. .build();

END