RabbitMQ 基础

初步了解 MQ

  1. 队列能干啥?
    削峰、系统解耦、消息分发、异步、日志收集
  2. 为什么要用 RabbitMQ?

    市面上很多 MQ。为什么要选择 RabbitMQ ?又或者说什么情况下适合选择 RabbitMQ ?

    • 支持持久化(内存、文件方式)
    • 消息延迟性低
    • 支持 amqp 协议 (高级消息队列协议)
    • 多语言支持
    • 消费消息支持 pull 和 push
    • 具有高可用性,分布式系统
    • 单机吞吐量:万级 相较 kafka(十万级)较低
    • 丢失率低、可以控制重复度 (kafka不能保证不重复)
    • 灵活的路由:在消息进入队列之前,通过Exchange来路由消息。对于典型的路由功能,Rabbit已经提供了一些内置的Exchange来实现。针对更复杂的路由功能,可以将多个Exchange绑定在一起,也通过插件机制实现自己的Exchange。
  3. AMQP 协议
    AMQP 中增加了 Exchange 和 Binging 的角色。生产者把消息发布到Exchange 上,消息最终到达队列并被消费者接收,而 Bingding 决定交换器的消息应该发送到哪个队列。
    RabbitMQ 基础 - 图1

RabbitMQ

RabbitMQ的整体架构:

RabbitMQ 基础 - 图2

RabbitMQ的消息流转

RabbitMQ 基础 - 图3

RabbitMQ各组件功能


RabbitMQ 基础 - 图4

RabbitMQ 多种Exchange 类型

  • direct:直接连接交换机。消息中的路由键(routing key )如果和Binding中的binding key 一致,交换机就将信息从exchange发送到对应的队列中。路由键和队列名完全匹配。
    4.png
    RabbitMQ 基础 - 图6
  • fanout:无路由交换机。每个发到fanout类型交换器的消息都会分发到所有的与它绑定的队列中去。fanout不处理路由键,只是简单的将队列绑定到交换机上。fanout转发消息是最快的。
    1594609137295.png
    RabbitMQ 基础 - 图8
  • topic:主题路由匹配交换机。topic 通过模式匹配分配消息的路由键属性。将队列绑定到某个模式上,然后路由键和模式进行匹配。
    • routing key为一个句点号“. ”分隔的字符串(我们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”
    • binding key与routing key一样也是句点号“. ”分隔的字符
    • binding key中可以存在两种特殊字符“”与“#”,用于做模糊匹配,其中“”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)

1594609173882.png
RabbitMQ 基础 - 图10

TTL

TTL:time to live. 生存时间。可以在消息发送的时候,指定过期时间。也可以在创建Exchange时指定。从消息进入队列开始算起,超过指定的时间,则将过期的队列数据清除。

DLX

  1. DLX: dead letter exchange . 死信队列。当一个队列中的信息变成死信,则会自动发送到另一个Exchange.这个Exchange就是死信队列。
    还可以设置一个死信路由键x-dead-letter-routing-key.设置queue的arguments,spring amqp和客户端的设置基本类似,无非是在生成队列时,一个是调用Queue的构造器传入arguments,一个是channel.declareQueue传入arguments。
  2. 消息变成死信的几种情况:
  • 消息被拒绝
  • 消息TTL过期
  • 队列达到最大长度
  1. 设置TTL(TTL存在两种)
  • 每个队列中消息的TTL在queue的arguments中设置arguments.put(“x-message-ttl”, 6000);
  • 每个消息的TTL,在消息头中设置
  1. //spring amqp
  2. Message expirationMsg=new Message(message.getBytes(),MessagePropertiesBuilder.newInstance().setExpiration(originalExpiration).build());
  3. rabbitTemplate.send(exchange, key, expirationMsg, correlationId);
  4. //客户端:AMQP.BasicProperties properties = new AMQP.BasicProperties();
  5. properties.setExpiration("60000");
  6. channel.basicPublish(exchangeName,routingKey,mandatory,properties,"ttlTestMessage".getBytes());

两种TTL的区别在于,判断是否过期的时候,在queue上设置的ttl只需要判断queue头信息即可,在message上设置的ttl需要判断每个消息头。

生产者端数据丢失

生产者Confirm机制

生产者如何判断消息投递成功?amqp本身有事务,可以控制数据回滚。其次,还可以使用confirm机制进行应答。

  • 在channel上开启channel.confirmSelect();
  • 在channel上开启监听:addConfirmListener
  1. /**
  2. * @version V1.0
  3. * @Description: https://blog.csdn.net/qq_35387940/article/details/100514134
  4. * @Auther: ly
  5. * @Date: 2020/7/15 16:03
  6. */
  7. @Configuration
  8. public class RabbitConfig {
  9. /**
  10. * 推送消息存在四种情况:
  11. ①消息推送到server,但是在server里找不到交换机
  12. 结论: ①这种情况触发的是 ConfirmCallback 回调函数。
  13. exchange 错误,queue 正确,confirm被回调, ack=false
  14. ②消息推送到server,找到交换机了,但是没找到队列
  15. 结论:②这种情况触发的是 ConfirmCallback和RetrunCallback两个回调函数。
  16. exchange 正确,queue 错误 ,confirm被回调, ack=true; return被回调 replyText:NO_ROUTE
  17. ③消息推送到sever,交换机和队列啥都没找到
  18. 结论: ③这种情况触发的是 ConfirmCallback 回调函数。
  19. exchange 错误,queue 错误,confirm被回调, ack=false
  20. ④消息推送成功
  21. 结论: ④这种情况触发的是 ConfirmCallback 回调函数。ack=true
  22. */
  23. @Bean
  24. public RabbitTemplate createRabbitTemolate(ConnectionFactory connectionFactory){
  25. RabbitTemplate rabbitTemplate = new RabbitTemplate();
  26. rabbitTemplate.setConnectionFactory(connectionFactory);
  27. //设置开启mandatory,才能触发回调函数,无论消息推送结果怎么样,都强制调用回调函数
  28. rabbitTemplate.setMandatory(true);
  29. rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
  30. @Override
  31. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  32. System.out.println("ConfirmCallback: "+"相关数据:"+correlationData);
  33. System.out.println("ConfirmCallback: "+"确认情况:"+ack);
  34. System.out.println("ConfirmCallback: "+"原因:"+cause);
  35. }
  36. });
  37. rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
  38. @Override
  39. public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
  40. System.out.println("ReturnCallback: "+"消息:"+message);
  41. System.out.println("ReturnCallback: "+"回应码:"+replyCode);
  42. System.out.println("ReturnCallback: "+"回应信息:"+replyText);
  43. System.out.println("ReturnCallback: "+"交换机:"+exchange);
  44. System.out.println("ReturnCallback: "+"路由键:"+routingKey);
  45. }
  46. });
  47. return rabbitTemplate;
  48. }
  49. }

Return 消息机制

Return listener 用于处理不可被路由的消息。既不能从Exchange到Queue的消息。基础API中有个关键的配置项Mandatory:如果为true,监听器会收到路由不可达的消息,然后进行处理。如果为false,broker端会自动删除该消息。

消息队列端数据丢失

处理消息队列丢数据的情况,一般是开启持久化磁盘的配置。这个持久化配置可以和confirm机制配合使用,你可以在消息持久化磁盘后,再给生产者发送一个Ack信号。这样,如果消息持久化磁盘之前,rabbitMQ阵亡了,那么生产者收不到Ack信号,生产者会自动重发。
那么如何持久化呢,这里顺便说一下吧,其实也很容易,就下面两步
①、将queue的持久化标识durable设置为true,则代表是一个持久的队列
②、发送消息的时候将deliveryMode=2
这样设置以后,rabbitMQ就算挂了,重启后也能恢复数据。在消息还没有持久化到硬盘时,可能服务已经死掉,这种情况可以通过引入mirrored-queue即镜像队列,但也不能保证消息百分百不丢失(整个集群都挂掉)

  1. /**
  2. * https://www.cnblogs.com/chongaizhen/p/11093489.html
  3. * 第二个参数:queue的持久化是通过durable=true来实现的。
  4. * 第三个参数:exclusive:排他队列,如果一个队列被声明为排他队列,该队列仅对首次申明它的连接可见,并在连接断开时自动删除。这里需要注意三点:  
  5. 1. 排他队列是基于连接可见的,同一连接的不同信道是可以同时访问同一连接创建的排他队列;   2.“首次”,如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同;   3.即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的,这种队列适用于一个客户端发送读取消息的应用场景。
  6. * 第四个参数:自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。
  7. * @param
  8. * @return
  9. * @Author
  10. */
  11. @Bean
  12. public Queue queue() {
  13. Map<String, Object> arguments = new HashMap<>();
  14. arguments.put("x-message-ttl", 25000);//25秒自动删除
  15. Queue queue = new Queue("topic.messages", true, false, true, arguments);
  16. return queue;
  17. }
  1. MessageProperties properties=new MessageProperties();
  2. properties.setContentType(MessageProperties.DEFAULT_CONTENT_TYPE);
  3. properties.setDeliveryMode(MessageProperties.DEFAULT_DELIVERY_MODE);//持久化设置
  4. properties.setExpiration("2018-12-15 23:23:23");//设置到期时间
  5. Message message=new Message("hello".getBytes(),properties);
  6. this.rabbitTemplate.sendAndReceive("exchange","topic.message",message);

消费者端数据丢失

消费端确认消息的方式

当消费者消费数据的时候,没有办法保证一定会消费成功。AcknowledgeMode 有 NONE (不确认模式)、MANUAL (手动确认模式)和 AUTO(自动确认模式)。

①自动确认模式,消费者挂掉,待ack的消息回归到队列中。消费者抛出异常,消息会不断的被重发,直到处理成功。不会丢失消息,即便服务挂掉,没有处理完成的消息会重回队列,但是异常会让消息不断重试。

②手动确认模式,RabbitMQ手动应答方式有ACK(已经正确投递,且已经被正确处理)和NACK(已经正确投递,但没有被正确处理,批量处理)模式、Reject(已经正确投递,但没有被正确处理,单个处理)模式。

③不确认模式,acknowledge=”none” 不使用确认机制,只要消息发送完成会立即在队列移除,无论客户端异常还是断开,只要发送完就移除,不会重发。

  1. /** 设置ack的两种方式 */
  2. //1. spring amqp :spring.rabbitmq.listener.simple.acknowledge-mode=auto
  3. simpleRabbitListenerContainerFactory.setAcknowledgeMode(AcknowledgeMode.AUTO);
  4. //2. 客户端:channel.basicConsumer中的参数
  5. //第二个参数设为true为自动应答,false为手动ack
  6. channel.basicConsume("队列名", true, new DefaultConsumer(channel);

AcknowledgeMode MANUAL

确认消息 ACK

已经正确投递,且已经被正确处理。如果对数据完整性要求比较高,这个时候我们可以设置ACK为手动应答模式,既只有当消费端成功消费消息后,才将消息清除。

  1. /** 消费端成功消费消息后,需要给 rabbitmq 确认消息. 只要开启手动确认模式,代码中就必须要有该语句。*/
  2. //deliveryTag:消息在mq中的唯一标识
  3. //multiple:是否批量.true:将一次性ack所有小于deliveryTag的消息。
  4. public void basicAck(long deliveryTag, boolean multiple);
  5. long deliveryTag = message.getMessageProperties().getDeliveryTag();
  6. channel.basicAck(deliveryTag, true);

确认消息 NACK

Nack则可以一次性拒绝多个消息。这是RabbitMQ对AMQP规范的一个扩展。
通过RejectRequeuConsumer可以看到当requeue参数设置为true时(让未消费成功的数据重回队列头部),消息发生了重新投递(一般不建议使用该方式)。可以选择将requeue参数设置为false,让mq丢弃该消息。然后再将该消息重新发送到队尾。

  1. //deliveryTag:消息在mq中的唯一标识
  2. //multiple:是否批量(和qos设置类似的参数)
  3. //requeue:是否需要重回队列。或者丢弃或者重回队首再次消费
  4. public void basicNack(long deliveryTag,boolean multiple,boolean requeue);

确认消息 Reject

Reject在拒绝消息时,可以使用requeue标识,告诉RabbitMQ是否需要重新发送给别的消费者。不重新发送,一般这个消息就会被RabbitMQ丢弃。Reject一次只能拒绝一条消息。

  1. //deliveryTag:消息在mq中的唯一标识
  2. //requeue:是否需要重回队列。或者丢弃或者重回队首再次消费
  3. public void basicReject(long deliveryTag, boolean requeue);
  4. channel.basicReject(envelope.getDeliveryTag(),true);

AcknowledgeMode AUTO

AcknowledgeMode 除了 NONE 和 MANUAL 之外还有 AUTO ,它会根据方法的执行情况来决定是否确认还是拒绝(是否重新入queue)

如果消息成功被消费(成功的意思是在消费的过程中没有抛出异常),则自动确认

当抛出 AmqpRejectAndDontRequeueException 异常的时候,则消息会被拒绝,且 requeue = false(不重新入队列)

当抛出 ImmediateAcknowledgeAmqpException 异常,则消费者会被确认

其他的异常,则消息会被拒绝,且 requeue = true(如果此时只有一个消费者监听该队列,则有发生死循环的风险,多消费端也会造成资源的极大浪费,这个在开发过程中一定要避免的)。可以通过 setDefaultRequeueRejected(默认是true)去设置

  1. @Beanpublic SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
  2. SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
  3. container.setConnectionFactory(connectionFactory);
  4. container.setQueueNames("consumer_queue"); // 监听的队列
  5. container.setAcknowledgeMode(AcknowledgeMode.AUTO); // 根据情况确认消息
  6. container.setMessageListener((MessageListener) (message) -> {
  7. System.out.println("====接收到消息=====");
  8. System.out.println(new String(message.getBody())); //抛出NullPointerException异常则重新入队列
  9. //throw new NullPointerException("消息消费失败");
  10. //当抛出的异常是AmqpRejectAndDontRequeueException异常的时候,则消息会被拒绝,且requeue=false
  11. //throw new AmqpRejectAndDontRequeueException("消息消费失败");
  12. //当抛出ImmediateAcknowledgeAmqpException异常,则消费者会被确认
  13. throw new ImmediateAcknowledgeAmqpException("消息消费失败");
  14. });
  15. return container;
  16. }

消费端自定义监听

消费端消费消息可以分为两种方式,一种是pull(拉)消费端通过while循环调用consumer.nextDelivry()来获取数据。该种情况,死循环会消耗CPU资源。一种是push(推)是由mq将消息推送过来。

  1. Basic.Consume将信道(Channel)置为接收模式,直到取消队列的订阅为止。
  2. 在接受模式期间,RabbitMQ会不断的推送消息给消费者。
  3. 当然推送消息的个数还是受Basic.Qos的限制。
  4. 如果只想从队列获得单条消息而不是持续订阅,建议还是使用Basic.Get进行消费。
  5. 但是不能将Basic.Get放在一个循环里来代替Basic.Consume,这样会严重影响RabbitMQ的性能。
  6. 如果要实现高吞吐量,消费者理应使用Basic.Consume方法。

消费端限流

RabbitMQ提供了一种qos(服务质量保证)功能。即在非自动确认消息的前提下(非ACK),如果一定数目的消息(通过基于consume或者channel设置qos的值)未被确认前,不进行消费新的消息。

  1. // prefetchSize:消息体大小限制;0为不限制
  2. // prefetchCount:RabbitMQ同时给一个消费者推送的消息个数。即一旦有N个消息还没有ack,则该consumer将block掉,直到有消息ack。默认是1.
  3. // global:限流策略的应用级别。consumer[false]、channel[true]。
  4. void BasicQos(unit prefetchSize, unshort prefetchCount, bool global);
  5. channel.basicQos(...);

参考:RabbitMQ (https://www.jianshu.com/p/78847c203b76)