感觉自己还是把握不住mq消息模型,先鸽

一: RabbitMq相关概念

问题:

  1. 复杂接口高峰访问,-秒杀
  2. 一个功能调用多个微服务接口,接口间没有依赖关系
  3. 多个数据源,其中一个数据更新

解决方案: 消息队列

1.1 什么是消息队列(MQ)

队列:先进先出的数据结构, 存放消息的数据结构模型

MQ是消息通信的模型,并不是具体实现。现在实现MQ的有两种主流方式:AMQP、JMS

RabbitMq - 图1

RabbitMq - 图2
两者间的区别和联系:

  • JMS是定义了统一的接口,来对消息操作进行统一;AMQP是通过规定协议来统一数据交互的格式
  • JMS限定了必须使用Java语言;AMQP只是协议,不规定实现方式,因此是跨语言的。
  • JMS规定了两种消息模型;而AMQP的消息模型更加丰富

1.2 常见的mq产品

RabbitMq - 图3

  • ActiveMQ:基于JMS
  • RabbitMQ:基于AMQP协议,erlang语言开发,稳定性好
  • RocketMQ:基于JMS,阿里巴巴产品,目前交由Apache基金会
  • Kafka:分布式消息系统,高吞吐量

1.2.1 RabbitMQ

RabbitMQ是基于AMQP的一款消息管理系统

官网: http://www.rabbitmq.com/

官方教程:http://www.rabbitmq.com/getstarted.html

  1. docker run -d --name rabbitmq --publish 5671:5671 \
  2. --publish 5672:5672 --publish 4369:4369 --publish 25672:25672 --publish 15671:15671 --publish 15672:15672 \
  3. rabbitmq:management

注:
4369 — erlang发现口
5672 —client端通信口

15672 — 管理界面ui端口
25672 — server间内部通信口

1.2.2 RabbitMQ 工作模型

image.png
(1)Broker:中介。提供一种传输服务,维护一条从生产者到消费者的传输线路,保证消息数据能按照指定的方式传输。粗略的可以将图中的RabbitMQ Server当作Broker。

(2)Exchange:消息交换机。指定消息按照什么规则路由到哪个队列Queue。生产者不能直接和Queue建立连接,而是通过交换机进行消息分发。

(3)Queue:消息队列。消息的载体,每条消息都会被投送到一个或多个队列中。

(4)Binding:绑定。作用就是将Exchange和Queue按照某种路由规则绑定起来。

(5)RoutingKey:路由关键字。消息所携带的标志,Exchange根据RoutingKey进行消息投递。

(6)Vhost:虚拟主机。一个Broker可以有多个虚拟主机,用作不同用户的权限分离。一个虚拟主机持有一组Exchange、Queue和Binding。

(7)Producer:消息生产者。主要将消息投递到对应的Exchange上面。一般是独立的程序。

(8)Consumer:消息消费者。消息的接收者,一般是独立的程序。

(9)Channel:消息通道,也称信道,是连接消费者和Broker的虚拟连接,如果直接让消费者和Broker建立TCP的连接,会让Broker有性能损耗。在客户端的每个连接里可以建立多个Channel,每个Channel代表一个会话任务。

1.2.2 路由消息模型

RabbitMq - 图5
P(producer/ publisher):生产者,一个发送消息的用户应用程序。

C(consumer):消费者,消费和接收有类似的意思,消费者是一个主要用来等待接收消息的用户应用程序

队列(红色区域):rabbitmq内部类似于邮箱的一个概念。虽然消息流经rabbitmq和你的应用程序,但是它们只能存储在队列中。队列只受主机的内存和磁盘限制,实质上是一个大的消息缓冲区。许多生产者可以发送消息到一个队列,许多消费者可以尝试从一个队列接收数据。

消息类型

目前共四种类型:direct、tanout、topic、headers ,header匹配AMQP消息的 header 而不是路由键,headers 交换器和 direct 交换器完全一致,但性能差能多,目前几乎用不到了,

  1. Direct

    全匹配式传递。当RoutingKey和消息标志完全一样才会存放到对应的队列
    image.png

  2. Topic

    广播式全部传递。息都会被投递到所有与此Exchange绑定的queue中
    image.png

  3. Fanout

    匹配式传递。 # 表示0个或多个单词, *表示1个

image.png

二.springBoot整合RabbitMq

Spring-amqp是对AMQP协议的抽象实现,而spring-rabbit 是对协议的具体实现,也是目前的唯一实现。底层使用的就是RabbitMQ。

2.1基础使用

2.1.1引入依赖

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>

2.2.2配置文件

  1. spring:
  2. rabbitmq:
  3. host: 192.168.1.83
  4. username: guest
  5. password: guest
  6. virtual-host: /
  7. template:
  8. exchange: gmall.item.exchange
  9. publisher-confirms: true
  • virtual-host 指定虚拟主机
  • template:有关AmqpTemplate的配置
    • exchange:缺省的交换机名称,此处配置后,发送消息如果不指定交换机就会使用这个
  • publisher-confirms:生产者确认机制,确保消息会正确发送,如果发送失败会有错误回执,从而触发重试

    2.2.3 相关api

  1. AmqpAdmin 声明交换机(Exchange) , 队列 ,绑定(Binding)等资源对象; ```java @Autowired private AmqpAdmin amqpAdmin
  1. @Test
  2. public void createBinding() {
  3. Binding binding = new Binding("hello-java-queue",
  4. Binding.DestinationType.QUEUE,
  5. "hello-java-exchange",
  6. "hello.java",
  7. null);
  8. amqpAdmin.declareBinding(binding);
  9. log.info("Binding[{}]创建成功:","hello-java-binding");
  10. }
  11. @Test
  12. public void create() {
  13. HashMap<String, Object> arguments = new HashMap<>();
  14. //死信队列
  15. arguments.put("x-dead-letter-exchange", "order-event-exchange")
  16. ;
  17. arguments.put("x-dead-letter-routing-key", "order.release.order");
  18. arguments.put("x-message-ttl", 60000); // 消息过期时间 1分钟
  19. Queue queue = new Queue("order.delay.queue", true, false, false, arguments);
  20. amqpAdmin.declareQueue(queue);
  21. log.info("Queue[{}]创建成功:","order.delay.queue");
  22. }
  23. @Test
  24. public void createExchange() {
  25. Exchange directExchange = new DirectExchange("hello-java-exchange",true,false);
  26. amqpAdmin.declareExchange(directExchange);
  27. log.info("Exchange[{}]创建成功:","hello-java-exchange");
  28. }
  1. 2. **RabbitTemplate **访问(发送和接收消息)的帮助类
  2. ```java
  3. @Autowired
  4. private RabbitTemplate rabbitTemplate;
  5. @Test
  6. public void sendMessageTest() {
  7. OrderReturnReasonEntity reasonEntity = new OrderReturnReasonEntity();
  8. reasonEntity.setId(1L);
  9. reasonEntity.setCreateTime(new Date());
  10. reasonEntity.setName("reason");
  11. reasonEntity.setStatus(1);
  12. reasonEntity.setSort(2);
  13. String msg = "Hello World";
  14. //1、发送消息,如果发送的消息是个对象,会使用序列化机制,将对象写出去,对象必须实现Serializable接口
  15. //2、发送的对象类型的消息,可以是一个json
  16. rabbitTemplate.convertAndSend("hello-java-exchange","hello2.java",
  17. reasonEntity,new CorrelationData(UUID.randomUUID().toString()));
  18. log.info("消息发送完成:{}",reasonEntity);
  19. }
  1. RabbitListener 与 RabbitHandler 注解接收消息

    RabbitListener 可以放在类和方法上
    RabbitHandler 只能放在方法上,用来重载不同参数的方法
    image.png

    2.2 数据丢失问题

    消息从生产者到中间件,中间件中交换机到消息队列,中间件消费者过程中数据都有可能丢失,如何避免此种问题呢?

  • 事务机制:

    客户端中与事务机制相关的方法有3个channel.txSelect,channel.txCommit,channel.txRollback

    • channel.txSelect 用于开启事务;
    • channel.txCommit 用于提交事务;
    • channel.txRollback 用于回滚事务。

在通过 channel.txSelect 方法开启事务之后,我们便可以发送消息给 RabbitMQ了,如果事务提交成功,则消息一定到达了 RabbitMQ 中,如果在事务提交执行之前由于 RabbitMQ异常崩溃或者其他原因抛出异常,这个时候我们便可以将其捕获,进而通过执行 channel.txRollback 方法来实现事务回滚。

  • RabbitMQ消息确认机制

但因为事务机制容易堵塞,不推荐使用
image.png
回调函数

  • publisher confirmCallback 投递到exchange
  • publisher returnCallback 未投递到 queue 退回

确认机制

  • consumer ack 机制

    2.2.1 生产者可靠抵达-confirmCallback

    ```java spring.rabbitmq.publisher-confirm-type: correlated
  1. 在创建 connectionFactory 的时候设置 PublisherConfirms(true) 选项,开启 confirmcallback 。<br /> CorrelationData:用来表示当前消息唯一性。<br /> 消息只要被 broker 接收到就会执行 confirmCallback,如果是 cluster 模式,需要所有 broker 接收到才会调用 confirmCallback <br />• broker 接收到只能表示 message 已经到达服务器,并不能保证消息一定会被投递 到目标 queue 里。所以需要用到接下来的 returnCallback
  2. <a name="bhSjM"></a>
  3. ### 2.2.2 生产者可靠抵达- returnCallbackb
  4. ```java
  5. # 开启发送端消息抵达Queue确认
  6. spring.rabbitmq.publisher-returns=true
  7. # 只要消息抵达Queue,就会异步发送优先回调returnfirm
  8. spring.rabbitmq.template.mandatory=true
  • confrim 模式只能保证消息到达 broker,不能保证消息准确投递到目标 queue 里。在有 些业务场景下,我们需要保证消息一定要投递到目标 queue 里,此时就需要用到 return 退回模式。
  • 这样如果未能投递到目标 queue 里将调用 returnCallback ,可以记录下详细到投递数 据,定期的巡检或者自动纠错都需要这些数据

    2.2.3 消费者- Ack 消息确认机制

    开启手动确认

    1. # 手动ack消息,不使用默认的消费端确认
    2. spring.rabbitmq.listener.simple.acknowledge-mode=manual

    监听参数内有channel通道参数,通过通道方法确认参数
    确认消息:

    1. // 参数二:是否批量确认
    2. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

    拒绝消息:

    1. // 参数二:是否重新入队,false时消息不再重发,如果配置了死信队列则进入死信队列,没有死信就会被丢弃
    2. //重新入队会重新获取消息
    3. channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);

    不确认消息

    1. // 参数二:是否批量; 参数三:是否重新回到队列,true重新入队
    2. channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);

    改造消费者监听器代码如下:

    1. @Component
    2. public class Listener {
    3. @RabbitListener(bindings = @QueueBinding(
    4. value = @Queue(value = "spring.test.queue", durable = "true"),
    5. exchange = @Exchange(
    6. value = "spring.test.exchange",
    7. ignoreDeclarationExceptions = "true",
    8. type = ExchangeTypes.TOPIC
    9. ),
    10. key = {"a.*"}))
    11. public void listen(String msg, Channel channel, Message message) throws IOException {
    12. try {
    13. System.out.println("接收到消息:" + msg);
    14. int i = 1 / 0;
    15. // 确认收到消息,false只确认当前consumer一个消息收到,true确认所有consumer获得的消息
    16. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    17. } catch (Exception e) {
    18. if (message.getMessageProperties().getRedelivered()) {
    19. System.out.println("消息重试后依然失败,拒绝再次接收");
    20. // 拒绝消息,不再重新入队(如果绑定了死信队列消息会进入死信队列,没有绑定死信队列则消息被丢弃,也可以把失败消息记录到redis或者mysql中),也可以设置为true再重试。
    21. channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
    22. } else {
    23. System.out.println("消息消费时出现异常,即将再次返回队列处理");
    24. // Nack消息,重新入队(重试一次)
    25. channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
    26. }
    27. e.printStackTrace();
    28. }
    29. }
    30. }