MQ基本概念
MQ的劣势
MQ的使用场景
- 生产者不需要从消费者处获得反馈。引入消息队列之前的直接调用,其接口的返回值应该为空,这才让明明下层的动作还没做,上层却当成动作做完了继续往后走,即所谓异步成为了可能。
- 容许短暂的不一致性。
- 确实是用了有效果。即解耦、提速、削峰这些方面的收益,超过加入MQ,管理MQ这些成本。
常见的MQ产品
RabbitMQ简介
AMQP,即Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,是应用层协议
的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中
间件不同产品,不同的开发语言等条件的限制。2006年,AMQP规范发布。类比HTTP。
2007年,Rabbit技术公司基于AMQP标准开发的RabbitMQ1.0发布。RabbitMQ采用Erlang语言开发。
Erlang 语言由Ericson设计,专门为开发高并发和分布式系统的一种语言,在电信领域使用广泛。
RabbitMQ中的相关概念:
- Broker:接收和分发消息的应用,RabbitMQServer就是Message Broker
- Virtual host:出于多租户和安全因素设计的,把AMQP的基本组件划分到一个虚拟的分组中,类似于网络中的namespace概念。当多个不同的用户使用同一个RabbitMQserver提供的服务时,可以划分出多个vhost,每个用户在自己的vhost创建exchange/queue等
- Connection:publisher/consumer和broker之间的TCP连接
- Channel:如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCPConnection的开销将是巨大的,效率也较低。Channel是在connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的channel进行通讯,AMQPmethod包含了channelid帮助客户端和message broker识别channel,所以channel之间是完全隔离的。Channel作为轻量级的Connection极大减少了操作系统建立TCPconnection的开销。
- Exchange: message 到达broker 的第一站,根据分发规则,匹配查询表中的routing key,分发消息到queue 中。常用的类型: direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
- Queue:消息最终被送到这里等待consumer取走
- Binding: exchange 和 queue 2l的接, binding 中以包含 routing key, Binding 信息被保存
到exchange中的查询表中,用于message的分发依据。
RabbitMQ提供了6种工作模式:简单模式、work queues、Publish/Subscribe发布与订阅模式、Routing路由模式、Topics 主题模式、RPC逃程调用模式(远程调用,不太算MQ;暂不作介绍)。
JMS
- JMS即 Java 消息服务(JavaMessage Service)应用程序接口,是一个Java平台中关于面向消息中间件的API
- JMS是JavaEE规范中的一种,类比JDBC
- 很多消息中间件都实现了JMS规范,例如:ActiveMQ。RabbitMQ 官方没有提供JMS的实现包,但是开源社区有
小结
- RabbitMQ 是基于AMQP 协议使用Erlang 语言开发的一款消息队列产品。
- RabbitMQ提供了6种工作模式。
- AMQP 是协议,类比HTTP。
- JMS 是API规范接口,类比 JDBC。
RabbitMQ工作模式
生产者
消费者
Work queues 工作队列模式
模式说明
- Work Queues:与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。
应用场景:对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。
Publish/Subscribe 发布订阅模式
模式说明
在订阅模型中,多了一个Exchange角色,而且过程略有变化:
P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
- C:消费者,消息的接收者,会一直等待消息到来
- Queue:消息队列,接收消息、缓存消息
- Exchange:交换机(X)。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:
- Fanout: 广播,将消息交给所有绑定到交换机的队列
- Direct: 定向,把消息交给符合指定routingkey的队列
- Topic: 通配符,把消息交给符合routingpattern(路由模式)的队列
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
Fanout 广播模式
-
Routing 路由模式
队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
- 消息的发送方在向Exchange发送消息时,也必须指定消息的RoutingKey
- Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的
Routingkey 与消息的Routing key 完全一致,才会接收到消息
Topic 通配符模式
必须是 单词.单词 的形式
表示 多个单词
SpringBoot提供了快速整合RabbitMQ的方式
- 基本信息再yml中配置,队列交互机以及绑定关系在配置类中使用Bean的方式配置
- 生产端直接注入RabbitTemplate完成消息发送
- 消费端直接使用@RabbitListener完成消息接收
pom 坐标
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
application.yml
spring:
rabbitmq:
host: ubuntu.wsl
username: admin
password: 123456
virtual-host: /
port: 5672
配置类config
```java package com.dove.config;
import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.boot.SpringBootConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Scope;
/**
description
*- @author Guo S.Y.
- @version V1.0
@since 2022/5/28-14:25 */ @SpringBootConfiguration public class RabbitMQConfig{
@Value(“${spring.rabbitmq.host}”) private String host;
@Value(“${spring.rabbitmq.port}”) private int port;
@Value(“${spring.rabbitmq.username}”) private String username;
@Value(“${spring.rabbitmq.password}”) private String password;
@Value(“${spring.rabbitmq.virtual-host}”) private String virtualhost;
@Bean public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualhost);
return connectionFactory;
}
@Bean @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) //必须是prototype类型 public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
return rabbitTemplate;
}
//1.交换机
@Bean("simpleExchange")
public Exchange simpleExchange(){
return ExchangeBuilder.fanoutExchange("simple.exchange").build();
}
//2.Queue 队列
@Bean("simpleQueue")
public Queue simpleQueue(){
return QueueBuilder.durable("simple.queue").build();
}
//3. 队列和交互机绑定关系 Binding
/*
1. 知道哪个队列
2. 知道哪个交换机
3. routing key
*/
@Bean
public Binding bindQueueExchange(@Qualifier("simpleQueue") Queue queue, @Qualifier("simpleExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("simple").noargs();
}
}
<a name="tyEeW"></a>
## Producer生产者
```java
package com.dove;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
/**
* <p> description </p>
*
* @author Guo S.Y.
* @version V1.0
* @since 2022/5/28-15:25
*/
@SpringBootTest
@RunWith(SpringRunner.class)
public class ProducerTest {
private final Logger myLog = LoggerFactory.getLogger(ProducerTest.class);
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSend() {
rabbitTemplate.convertAndSend("simple.exchange"
, "simple"
, "test");
}
}
Consumer消费者
package com.dove.listener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* <p> description </p>
*
* @author Guo S.Y.
* @version V1.0
* @since 2022/5/29-10:54
*/
@Component
public class MQListener {
private final Logger myLog = LoggerFactory.getLogger(MQListener.class);
@RabbitListener(queues = "simple.queue")
public void simpleQueue(String msg) {
myLog.info("收到信息:{}", msg);
}
}
RabbitMQ高级特性
消息的可靠投递
在使用RabbitMQ的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ为我们提
供了两种方式用来控制消息的投递可靠性模式。
- confirm确认模式
- return退回模式
rabbitmg整个消息投递的路径为:
producer—->rabbitmq broker—->exchange—->queue—->consumer
- 消息从producer 到exchange则返回一个confirmCallback
- 消息从exchange—gueue投递失败则会返回一个returnCallback。
我们将利用这两个callback控制消息的可靠性投递
TTL
- 设置队列过期时间使用参数 x-message-ttl, 单位 ms 毫秒,会对整个队列消息统一过期
- 设置消息过期时间使用参数 expiration , 单位 ms 毫秒,当该消息在队列头部时(消费时),会单独判断这以消息是否过期
- 如果两者都进行了额设置,以时间短的为准
死信队列
死信队列,英文缩写:DLX。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以
被重新发送到另一个交换机,这个交换机就是DLX。
死信:消息未被消费,且过期了
消息成为死信的三种情况
- 队列消息长度到达限制;
- 消费者拒接消费消息, basic Nack/basic Reject, 并且不把消息重新放入原目标队列, requeue=false;
- 原队列存在消息过期设置,消息到达超时时间未被消费;
死信队列:未被消费的过期信息的再被消费
小结:
- 死信交换机和死信队列和普通的没有区别
- 当消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队
列
消息成为死信的三种情况:
下单后,30分钟未支付,取消订单,回滚库存。
- 新用户注册成功7天后,发送短信问候。
实现方式:
- 定时器
- 延迟队列
但是,在RabbitMQ中并未提供延迟队列功能。
但是,可以使用 TTL+死信队列组合组合实现延迟队列的效果。
小结:
- 延迟队列指消息进入队列后,可以被延迟一定时间,再进行消费。
- RabbitMQ没有提供延迟队列功能,但是可以使用:TTL+DLX来实现延迟队列效果。
日志与监控
RabbitMQ日志
Rabbit MQ默认日志存放路径:
:/var/log/rabbit mq/rabbit@xxx.log
日志包含了RabbitMQ的版本号、Erlang的版本号、RabbitMQ服务节点名称、cookie的hash值、
RabbitMQ配置文件地址、内存限制、磁盘限制、默认账户guest的创建以及权限配置等等。web管控台
rabbitmqctl 管理和监控
查看队列 $ rabbitmqctl listqueues
查看环境变量 $ rabbitmqctl environment
查看exchanges $ rabbitmqctl list_exchanges
查看未被确认的队列 $ rabbitmqctl list_queues name messages_unacknowledged
查看用户 $ rabbitmqctl list_users
查看单个队列的内存使用 $ rabbitmqctl list queues name memory
查看连接 $ rabbitmqctl list_connections
查看准备就绪的队列 $ rabbitmqctl list_queues name messages_ready
查看消费者信息 $ rabbitmqctl list_consumers消息追踪-firehose
在使用任何消息中间件的过程中,难免会出现某条消息异常丢失的情况。对于RabbitMQ而言,可能
是因为生产者或消费者与RabbitMQ断开了连接,而它们与RabbitMQ又采用了不同的确认机制;也
有可能是因为交换器与队列之间不同的转发策略;甚至是交换器并没有与任何队列进行绑定,生产者
又不感知或者没有采取相应的措施;另外RabbitMQ本身的集群策略也可能导致消息的丢失。这个时
候就需要有一个较好的机制跟踪记录消息的投递过程,以此协助开发和运维人员进行问题的定位。
在RabbitMQ中可以使用Firehose和rabbitmq_tracing插件功能来实现消息追踪。
注意:打开trace会影响消息写入功能, 适当打开后请关闭。
rabbit mq ctl trace on:开启Firehose命令
rabbit mq ctl trace off:关闭Firehose命令
RabbitMQ应用问题
- 消息可靠性保障
消息补偿机制
- 消息幕等性保障
乐观锁解决方案
消息可靠性保障—消息补偿机制
消息幂等性保障-乐观锁机制
幕等性指一次和多次请求某一个资源,对于资源本身应该具有同样的结果。也就是说,其任
意多次执行对资源本身所产生的影响均与一次执行的影响相同。
在MQ中指,消费多条相同的消息,得到与消费该消息一次相同的结果。
集群搭建
摘要:实际生产应用中都会采用消息队列的集群方案,如果选择RabbitMQ那么有必要了解下它的集群方案原理
一般来说,如果只是为了学习RabbitMQ或者验证业务工程的正确性那么在本地环境或者测试环境上使用其单实例
部署就可以了,但是出于MQ中间件本身的可靠性、并发性、吞吐量和消息堆积能力等问题的考虑,在生产环境上
一般都会考虑使用RabbitMQ的集群方案。
集群方案的原理
RabbitMQ这款消息队列中间件产品本身是基于Erlang编写,Erlang语言天生具备分布式特性(通过同步Erlang集
群各节点的magic cookie来实现)。因此,RabbitMQ天然支持Clustering。这使得RabbitMQ本身不需要像
ActiveMQ、Kafka那样通过ZooKeeper分别来实现HA方案和保存集群的元数据。集群是保证可靠性的一种方式,同时可以通过水平扩展以达到增加消息吞吐量能力的目的。
单机多实例部署
由于某些因素的限制,有时候你不得不在一台机器上去搭建一个rabbitmg集群,这个有点类似zookeeper的单机
版。真实生成环境还是要配成多机集群的。有关怎么配置多机集群的可以参考其他的资料,这里主要论述如何在单
机中配置多个rabbitmg实例。
主要参考官方文档: https://www.rabbitmq.com/clustering.html
首先确保RabbitMQ运待没有问题