感觉自己还是把握不住mq消息模型,先鸽
一: RabbitMq相关概念
问题:
- 复杂接口高峰访问,-秒杀
- 一个功能调用多个微服务接口,接口间没有依赖关系
- 多个数据源,其中一个数据更新
1.1 什么是消息队列(MQ)
队列:先进先出的数据结构, 存放消息的数据结构模型
MQ是消息通信的模型,并不是具体实现。现在实现MQ的有两种主流方式:AMQP、JMS。
两者间的区别和联系:
- JMS是定义了统一的接口,来对消息操作进行统一;AMQP是通过规定协议来统一数据交互的格式
- JMS限定了必须使用Java语言;AMQP只是协议,不规定实现方式,因此是跨语言的。
- JMS规定了两种消息模型;而AMQP的消息模型更加丰富
1.2 常见的mq产品
- ActiveMQ:基于JMS
- RabbitMQ:基于AMQP协议,erlang语言开发,稳定性好
- RocketMQ:基于JMS,阿里巴巴产品,目前交由Apache基金会
- Kafka:分布式消息系统,高吞吐量
1.2.1 RabbitMQ
RabbitMQ是基于AMQP的一款消息管理系统
官方教程:http://www.rabbitmq.com/getstarted.html
docker run -d --name rabbitmq --publish 5671:5671 \
--publish 5672:5672 --publish 4369:4369 --publish 25672:25672 --publish 15671:15671 --publish 15672:15672 \
rabbitmq:management
注:
4369 — erlang发现口
5672 —client端通信口
15672 — 管理界面ui端口
25672 — server间内部通信口
1.2.2 RabbitMQ 工作模型
(1)Broker:中介。提供一种传输服务,维护一条从生产者到消费者的传输线路,保证消息数据能按照指定的方式传输。粗略的可以将图中的RabbitMQ Server当作Broker。
(2)Exchange:消息交换机。指定消息按照什么规则路由到哪个队列Queue。生产者不能直接和Queue建立连接,而是通过交换机进行消息分发。
(3)Queue:消息队列。消息的载体,每条消息都会被投送到一个或多个队列中。
(4)Binding:绑定。作用就是将Exchange和Queue按照某种路由规则绑定起来。
(5)RoutingKey:路由关键字。消息所携带的标志,Exchange根据RoutingKey进行消息投递。
(6)Vhost:虚拟主机。一个Broker可以有多个虚拟主机,用作不同用户的权限分离。一个虚拟主机持有一组Exchange、Queue和Binding。
(7)Producer:消息生产者。主要将消息投递到对应的Exchange上面。一般是独立的程序。
(8)Consumer:消息消费者。消息的接收者,一般是独立的程序。
(9)Channel:消息通道,也称信道,是连接消费者和Broker的虚拟连接,如果直接让消费者和Broker建立TCP的连接,会让Broker有性能损耗。在客户端的每个连接里可以建立多个Channel,每个Channel代表一个会话任务。
1.2.2 路由消息模型
P(producer/ publisher):生产者,一个发送消息的用户应用程序。
C(consumer):消费者,消费和接收有类似的意思,消费者是一个主要用来等待接收消息的用户应用程序
队列(红色区域):rabbitmq内部类似于邮箱的一个概念。虽然消息流经rabbitmq和你的应用程序,但是它们只能存储在队列中。队列只受主机的内存和磁盘限制,实质上是一个大的消息缓冲区。许多生产者可以发送消息到一个队列,许多消费者可以尝试从一个队列接收数据。
消息类型
目前共四种类型:direct、tanout、topic、headers ,header匹配AMQP消息的 header 而不是路由键,headers 交换器和 direct 交换器完全一致,但性能差能多,目前几乎用不到了,
Direct
全匹配式传递。当RoutingKey和消息标志完全一样才会存放到对应的队列
Topic
广播式全部传递。息都会被投递到所有与此Exchange绑定的queue中
Fanout
匹配式传递。 # 表示0个或多个单词, *表示1个
二.springBoot整合RabbitMq
Spring-amqp是对AMQP协议的抽象实现,而spring-rabbit 是对协议的具体实现,也是目前的唯一实现。底层使用的就是RabbitMQ。
2.1基础使用
2.1.1引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.2.2配置文件
spring:
rabbitmq:
host: 192.168.1.83
username: guest
password: guest
virtual-host: /
template:
exchange: gmall.item.exchange
publisher-confirms: true
- virtual-host 指定虚拟主机
- template:有关
AmqpTemplate
的配置- exchange:缺省的交换机名称,此处配置后,发送消息如果不指定交换机就会使用这个
- publisher-confirms:生产者确认机制,确保消息会正确发送,如果发送失败会有错误回执,从而触发重试
2.2.3 相关api
- AmqpAdmin 声明交换机(Exchange) , 队列 ,绑定(Binding)等资源对象; ```java @Autowired private AmqpAdmin amqpAdmin
@Test
public void createBinding() {
Binding binding = new Binding("hello-java-queue",
Binding.DestinationType.QUEUE,
"hello-java-exchange",
"hello.java",
null);
amqpAdmin.declareBinding(binding);
log.info("Binding[{}]创建成功:","hello-java-binding");
}
@Test
public void create() {
HashMap<String, Object> arguments = new HashMap<>();
//死信队列
arguments.put("x-dead-letter-exchange", "order-event-exchange")
;
arguments.put("x-dead-letter-routing-key", "order.release.order");
arguments.put("x-message-ttl", 60000); // 消息过期时间 1分钟
Queue queue = new Queue("order.delay.queue", true, false, false, arguments);
amqpAdmin.declareQueue(queue);
log.info("Queue[{}]创建成功:","order.delay.queue");
}
@Test
public void createExchange() {
Exchange directExchange = new DirectExchange("hello-java-exchange",true,false);
amqpAdmin.declareExchange(directExchange);
log.info("Exchange[{}]创建成功:","hello-java-exchange");
}
2. **RabbitTemplate **访问(发送和接收消息)的帮助类
```java
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void sendMessageTest() {
OrderReturnReasonEntity reasonEntity = new OrderReturnReasonEntity();
reasonEntity.setId(1L);
reasonEntity.setCreateTime(new Date());
reasonEntity.setName("reason");
reasonEntity.setStatus(1);
reasonEntity.setSort(2);
String msg = "Hello World";
//1、发送消息,如果发送的消息是个对象,会使用序列化机制,将对象写出去,对象必须实现Serializable接口
//2、发送的对象类型的消息,可以是一个json
rabbitTemplate.convertAndSend("hello-java-exchange","hello2.java",
reasonEntity,new CorrelationData(UUID.randomUUID().toString()));
log.info("消息发送完成:{}",reasonEntity);
}
RabbitListener 与 RabbitHandler 注解接收消息
RabbitListener 可以放在类和方法上
RabbitHandler 只能放在方法上,用来重载不同参数的方法
2.2 数据丢失问题
消息从生产者到中间件,中间件中交换机到消息队列,中间件到消费者过程中数据都有可能丢失,如何避免此种问题呢?
事务机制:
客户端中与事务机制相关的方法有3个channel.txSelect,channel.txCommit,channel.txRollback。
- channel.txSelect 用于开启事务;
- channel.txCommit 用于提交事务;
- channel.txRollback 用于回滚事务。
在通过 channel.txSelect 方法开启事务之后,我们便可以发送消息给 RabbitMQ了,如果事务提交成功,则消息一定到达了 RabbitMQ 中,如果在事务提交执行之前由于 RabbitMQ异常崩溃或者其他原因抛出异常,这个时候我们便可以将其捕获,进而通过执行 channel.txRollback 方法来实现事务回滚。
- RabbitMQ消息确认机制
但因为事务机制容易堵塞,不推荐使用
回调函数
- publisher confirmCallback 投递到exchange
- publisher returnCallback 未投递到 queue 退回
确认机制
- consumer ack 机制
2.2.1 生产者可靠抵达-confirmCallback
```java spring.rabbitmq.publisher-confirm-type: correlated
• 在创建 connectionFactory 的时候设置 PublisherConfirms(true) 选项,开启 confirmcallback 。<br /> • CorrelationData:用来表示当前消息唯一性。<br /> • 消息只要被 broker 接收到就会执行 confirmCallback,如果是 cluster 模式,需要所有 broker 接收到才会调用 confirmCallback。 <br />• 被 broker 接收到只能表示 message 已经到达服务器,并不能保证消息一定会被投递 到目标 queue 里。所以需要用到接下来的 returnCallback
<a name="bhSjM"></a>
### 2.2.2 生产者可靠抵达- returnCallbackb
```java
# 开启发送端消息抵达Queue确认
spring.rabbitmq.publisher-returns=true
# 只要消息抵达Queue,就会异步发送优先回调returnfirm
spring.rabbitmq.template.mandatory=true
- confrim 模式只能保证消息到达 broker,不能保证消息准确投递到目标 queue 里。在有 些业务场景下,我们需要保证消息一定要投递到目标 queue 里,此时就需要用到 return 退回模式。
这样如果未能投递到目标 queue 里将调用 returnCallback ,可以记录下详细到投递数 据,定期的巡检或者自动纠错都需要这些数据
2.2.3 消费者- Ack 消息确认机制
开启手动确认
# 手动ack消息,不使用默认的消费端确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual
监听参数内有channel通道参数,通过通道方法确认参数
确认消息:// 参数二:是否批量确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
拒绝消息:
// 参数二:是否重新入队,false时消息不再重发,如果配置了死信队列则进入死信队列,没有死信就会被丢弃
//重新入队会重新获取消息
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
不确认消息
// 参数二:是否批量; 参数三:是否重新回到队列,true重新入队
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
改造消费者监听器代码如下:
@Component
public class Listener {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "spring.test.queue", durable = "true"),
exchange = @Exchange(
value = "spring.test.exchange",
ignoreDeclarationExceptions = "true",
type = ExchangeTypes.TOPIC
),
key = {"a.*"}))
public void listen(String msg, Channel channel, Message message) throws IOException {
try {
System.out.println("接收到消息:" + msg);
int i = 1 / 0;
// 确认收到消息,false只确认当前consumer一个消息收到,true确认所有consumer获得的消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
if (message.getMessageProperties().getRedelivered()) {
System.out.println("消息重试后依然失败,拒绝再次接收");
// 拒绝消息,不再重新入队(如果绑定了死信队列消息会进入死信队列,没有绑定死信队列则消息被丢弃,也可以把失败消息记录到redis或者mysql中),也可以设置为true再重试。
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
} else {
System.out.println("消息消费时出现异常,即将再次返回队列处理");
// Nack消息,重新入队(重试一次)
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
e.printStackTrace();
}
}
}