- 前言
- 1、autoDelete自动删除属性
- 2、exclusive排它性属性
- 3、durable交换机、队列持久化
- 4、消息持久化(区别于durable)
- 5、arguments参数设置
- 5.1、Message TTL(x-message-ttl)
- 5.2、Auto Expire(x-expires)
- 5.3、Max Length(x-max-length)
- 5.4、Max Length Bytes(x-max-length-bytes)
- 5.5、Dead letter exchange(x-dead-letter-exchange)
- 5.6、Dead letter routing key(x-dead-letter-routing-key)
- 5.7、Maximum priority(x-max-priority)
- 5.8、Lazy mode(x-queue-mode=lazy)
- 5.9、Master locator(x-queue-master-locator)
- 6、Mandatory
- 7、RabbitMQ发送消息,附带BasicProperties属性介绍
前言
关于队列的声明,如果使用同一套参数进行声明了,就不能再使用其他参数来声明此队列了,要么删除该队列,可以使用命令行删除也可以在RabbitMQ Management上删除,要么给队列重新起一个名字。
1、autoDelete自动删除属性
在RabbitMQ中声明交换器、消息队列时,可设置autoDelete属性(表示是否自动删除?)。如下所示:
如果autoDelete=true,表示自动删除,此处我们要理解,自动删除的条件是什么?
这里的关键是,自动删除的条件是向后的。RabbitMQ的整个消息流是:生产者发布消息和routingKey—> 经由交换器,exchange再依据routingKey —> 到达指定的消息队列 —> 消息队列推送消息到已经订阅此消息的消费者。
再来解释 “ 自动删除的条件是向后的 ” 这句话:
对于exchange交换器,向前是生产者发布的消息和routingKey,这不能作为exchange自动删除的条件。exchange向后是绑定另一个交换器或 绑定消息队列,这就是exchange交换器删除的条件。
总结:exchange自动删除的条件,有队列或者交换器绑定了本交换器,然后当所有队列或交换器都与本交换器解除绑定,且autoDelete=true时,此交换器就会被自动删除。
如何解除本交换机和交换机或队列绑定?
直接调用Channel对象中的queueUnbind方法即可完成解绑操作,代码如下:
- queue 表示队列名称;
- exchange 表示本交换机名称;
- routingKey 表示队列与交换机绑定时的routingKey值,这个值一定要正确,否则会解绑失败。
举个栗子:
channel.queueUnbind("queueName", "exchangeName","routingKey");
———————————-
对于消息队列,向前是与exchange的绑定关系,这不能作为队列自动删除的条件。队列向后是被消费者订阅。这就是队列删除的条件。
总结:消息队列自动删除的条件,有消息者订阅本队列,然后所有消费者都解除订阅此队列(即消费者断开连接或宕机),且autoDelete=true时,此队列会自动删除,即使此队列中还有消息。
————————————————————————————————————————————————————
:::info
在Queue中: 当监听此消息队列的所有消费者客户端断开连接后(或当没有任何消费者监听此队列时),是否自动删除 此消息队列? true:删除 ,false:不删除。
注意:是所有消费者都断开连接或宕机,只要有一个消费者还正常连接,即使设置为true,也不会删除此队列。
比如:
当Queue中的 autoDelete 属性被设置为true时,当消费者断开连接或宕机后,则会自动删除此消息队列【即使此队列中还有消息,即消息也一并删除了】。而后如果生产者还在一直发送消息,那么当消费者重新启动恢复正常后,会接收到最新的消息,而宕机期间的消息则会丢失。
当Quere中的 autoDelete 属性被设置为false时,当消费者断开连接或宕机后,不会删除此消息队列。而后如果生产者还在一直发送消息,那么当消费者重新启动恢复正常后,会接收到包括宕机期间的消息,消息不会丢失。
所以个人建议:Queue中的 autoDelete属性设置为false。
————————————————————————————————————————————
在Exchange中:当所有队列或交换器都与本交换器解除绑定,是否自动 删除本交换机? true:删除, false:不删除。
注意:是所有队列或交换器,只要有一个队列或交换机还正常绑定,即使设置为true,也不会删除此交换机。
:::
2、exclusive排它性属性
[ɪkˈskluːsɪv],单词释义:独占的、排它的。
true时表示:只允许有一个消费者监听此消息 队列 ,且当一个消息队列绑定了一个交换机,那么此队列再绑定其他交换机,也会抛出异常。
3、durable交换机、队列持久化
[ˈdjʊərəbl] ,单词释义:耐用的、持久的。交换机、消息队列都有该属性。
durable: 交换机、消息队列是否持久化?
3.1、队列持久化
消息队列默认是存放到内存中的,如果[rabbitmq](https://so.csdn.net/so/search?q=rabbitmq&spm=1001.2101.3001.7020) broker服务器重启,消息队列会消失(可以通过rabbitmqctl stop_app关闭服务器,然后rabbitmqctl start_app重启服务器)。<br />如果想重启之后,消息队列还存在,就要使此消息队列持久化(即设置durable=true),即队列会保存到Erlang自带的Mnesia数据库中,当rabbitmq重启之后会自动读取该数据库。可以登录RabbitMQ Management—> Queues中可以看到之前声明的队列还存在。<br />原生API实现方式:
boolean durable = true;
channel.queueDeclare(QUEUE_NAME, durable, false, false, arguments);
3.2、exchange的持久化
在申明exchange的时候,有个参数:durable。当该参数为true,则对该exchange做持久化,重启rabbitmq服务器后,该exchange不会消失。durable的默认值为true
小结:建议交换机和队列的durable,都设置为true。
4、消息持久化(区别于durable)
在生产环境中偶尔会由于一些原因导致 RabbitMQ 重启,在 RabbitMQ 重启期间,生产者消息投递失败,会 导致消息的丢失,需要手动处理和恢复。队列和交互机可以进行持久化操作,即rabbitmq重启不会导致队列和交换机的丢失。那么同样,我们也需要对消息进行持久化操作,保证消息的可靠性。
如果要设置消息持久化必须先设置队列持久化,不然队列不持久化,而消息设置了持久化,队列都不存在了,消息存在还有什么意义。消息持久化需要将交换机持久化、队列持久化、消息持久化,才能最终达到消息持久化的目的。
4.1、如何实现消息持久化?
方式一:设置deliveryMode=2(1: 非持久化 2:持久化)
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
channel.queueDeclare(QUEUE_NAME, true, false, false, arguments);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
String message = "Hello RabbitMQ: ";
// 设置消息持久化
AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties().builder();
properties.deliveryMode(2);
channel.basicPublish(EXCHANGE_NAME, "", properties.build(), message.getBytes("UTF-8"));
方式二:设置BasicProperties为MessageProperties.PERSISTENT_TEXT_PLAIN
channel.queueDeclare(QUEUE_NAME, true, false, false, arguments);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
String message = "Hello RabbitMQ: ";
channel.basicPublish(EXCHANGE_NAME, "", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"))
5、arguments参数设置
arguments [ˈɑːgjʊmənts] 争论、参数,主要用来设置消息队列中的消息的一些属性。
可参考博客:https://blog.csdn.net/fsgsggd/article/details/81349553
5.1、Message TTL(x-message-ttl)
设置整个队列的所有消息的生命周期;也可以在生产者发布消息时,单独为某个消息指定剩余生存时间(单位毫秒);类似于redis中的ttl,生存时间到了,消息会被从队列中删除,注意是消息被删除,而不是队列被删除。
特性Features=TTL。
示列1:原生API
单独为某条消息设置过期时间:6秒,如下所示:
AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties().builder().expiration(“6000”);
channel.basicPublish(EXCHANGE_NAME, “”, properties.build(), message.getBytes(“UTF-8”));
示列2:springboot整合rabbitmq,队列中的所有消息都有同样的过期时间。
声明队列时需要设置x-message-ttl属性,并设置过期时间,推送到该队列中的所有消息,都会有一个10秒后过期的属性。
设置前,先去RabbitMQ UI管理面板中删除该队列,否则启动项目时报错。
示列如下所示:
@Configuration
public class RabbitConfig {
//Direct交换机
@Bean
DirectExchange DirectExchange() {
return new DirectExchange("DirectExchange", true, false);
}
//队列DirectQueue
@Bean
public Queue DirectQueue() {
Map<String, Object> arguments = new HashMap<>();
// 队列中的消息未被消费20秒过期
arguments .put("x-message-ttl", 10000);
return new Queue("DirectQueue", true, false, false, arguments );
}
//将队列和交换机绑定
@Bean
Binding bindingDirect() {
return BindingBuilder.bind(DirectQueue()).to(DirectExchange()).with("DirectRouting");
}
}
上面参数的布尔值参考下图:
可以看到设置成功后:该对列出现了TTL属性。
点击该队列:看到时间为10s了,如下所示:
示列3:springboot整合rabbitmq,指定某条消息的过期时间:
@RestController
public class SendMessageController {
@Autowired
RabbitTemplate rabbitTemplate;
@GetMapping("/sendMessage")
public void sendMessage() {
MessageProperties messageProperties = new MessageProperties();
//设置过期时间
messageProperties.setExpiration("10000");
//这个参数是用来做消息的唯一标识
//发布消息时使用,存储在消息的headers中(即消息头中)
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
User user = new User(1, "张三" + 1);
Message message = new Message(JSON.toJSONString(user).getBytes(StandardCharsets.UTF_8), messageProperties);
rabbitTemplate.convertAndSend("DirectExchange", "DirectRoutingKey", message, correlationData);
}
}
:::info
注意🎃:
RabbitMQ只会过期淘汰队列头部的消息。如果单独给一条消息设置ttl,先入队列的消息过期时间如果设置比较长,后入队列的设置时间比较短,那么消息就不会及时的被淘汰,会导致消息的堆积问题。
:::
5.2、Auto Expire(x-expires)
队列生存周期。当队列在指定的时间没有被访问(consume, basicGet, queueDeclare…),队列就会被删除。
Features=Exp。
5.3、Max Length(x-max-length)
限定队列可以容纳的消息的最大条数,超过指定条数将会把最早的几条删除掉, 类似于mongodb中的固定集合,例如保存最新的100条消息,。
Feature=Lim。
5.4、Max Length Bytes(x-max-length-bytes)
限定队列最大占用的空间大小, 一般受限于内存、磁盘的大小,。
Features=Lim B。
5.5、Dead letter exchange(x-dead-letter-exchange)
当队列消息长度大于最大长度、或者消息过期的等,将从队列中删除的消息推送到指定的交换机中去。而不是直接丢弃掉消息。
Features=DLX。
5.6、Dead letter routing key(x-dead-letter-routing-key)
将删除的消息推送到指定交换机的指定路由键的消息队列中去。
Feature=DLK。
5.7、Maximum priority(x-max-priority)
优先级队列,声明队列时先定义最大优先级值(定义最大值一般不要太大),在发布消息的时候指定该消息的优先级, 优先级更高(数值更大的)的消息先被消费。
支持消息优先级的MQ有Beanstalkd、RabbitMQ(版本3.5.0)。支持0-255个优先级,但建议0-10个优先级。优先级对队列使用时,优先级越多,对cpu和内存使用会增加。
示列1:
@Configuration
public class DirectRabbitConfig {
//队列
@Bean
public Queue DirectQueue() {
Map<String, Object> map = new HashMap<>();
//一般设置0到10;数字越大,优先级越高
map.put("x-max-priority", 10);
return new Queue("DirectQueue", true, false, false, map);
}
//Direct交换机
@Bean
DirectExchange DirectExchange() {
return new DirectExchange("DirectExchange", true, false);
}
//将队列和交换机绑定
@Bean
Binding bindingDirect() {
return BindingBuilder.bind(DirectQueue()).to(DirectExchange()).with("DirectRouting");
}
}
5.8、Lazy mode(x-queue-mode=lazy)
Lazy Queues: 先将消息保存到磁盘上,不放在内存中,当消费者开始消费的时候,才加载到内存中。
如果未设置,则队列将保留消息在内存缓存,以尽可能快地传递消息。
5.9、Master locator(x-queue-master-locator)
集群相关设置,将队列设置为主位置模式,确定在节点集群上声明时,队列主位置所依据的规则。
6、Mandatory
mandatory是在发送一条Basic.Publish RPC命令时一起传递的参数,告诉RabbitMq如果消息不可路由,应该通过Basic.Return RPC命令将消息返回给发布者。
具体执行流程如下图所示:
设置mandatory开启了消息故障检测模式后,它只会让RabbitMq向你通知失败,而不会通知成功。
进一步,当交换器根据自身和路由键无法找到合适的队列时,当mandatory设置为true,会通过Basic.Return RPC命令将消息返回给发布者;当mandatory设置为false时,该消息被直接丢弃。
那么,发布者如何得到消息发布失败后的Basic.Return的内容呢,通常会调用channel.addReturnListener()回调方法。
方式1:原生API
方式2:springboot+整个RabbitMQ中,用ConfirmBack+ReturnBack就行。
7、RabbitMQ发送消息,附带BasicProperties属性介绍
7.1、BasicPropertie属性字段详解:
- contentType:消息的内容类型,如:text/plain
- contentEncoding:消息内容编码
- headers:设置消息的header,类型为Map
- deliveryMode:1(nopersistent)非持久化,2(persistent)持久化
- priority:消息的优先级
- correlationId:关联ID
- replyTo:用于指定回复的队列的名称
- expiration:消息的失效时间
- messageId:消息ID
- timestamp:消息的时间戳
- type:类型
- userId:用户ID
- appId:应用程序ID
- custerId:集群ID
7.2、BasicProperties使用详解
方式1:原生API,MessageProperties类默认提供了6种不同默认值的BasicProperties属性值对象。
生产者发送消息:
AMQP.BasicProperties.Builder properties = MessageProperties.PERSISTENT_TEXT_PLAIN.builder();
properties.messageId("消息ID");
properties.deliveryMode(2);
/**
* 发布消息
* 发布到不存在的交换器将导致信道级协议异常,该协议关闭信道,
* exchange: 要将消息发送到的交换器
* routingKey: 路由KEY
* props: 消息的其它属性,如:路由头等
* body: 消息体
*/
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, properties.build(), message.getBytes());
消费者接收消息:
DeliverCallback deliverCallback = (consumerTag, delivery)->{
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message);
BasicProperties properties = delivery.getProperties();
System.out.println("deliveryMode:"+properties.getDeliveryMode()
+"-消息ID"+properties.getMessageId());
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
System.out.println("调用"+consumerTag);
});
方式2:springboot整个rabbitmq:这个我个人觉得是在sendandcovent生产者发布消息的业务代码中去写。
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("arg1", "aaa");
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
.deliveryMode(2) // //持久化存在
.contentEncoding("UTF-8") // 编码方式
.expiration("10000") //过期时间10秒,10秒后如果没有被消费,消息会自动清除
.headers(headers) //自定义属性,方便消费者取用
.build();
END