- 1 安装
- 1 MQ引言
- 2 RabbitMQ入门
- 2.1 RabbitMQ简介">2.1 RabbitMQ简介
- ">
- 2.2 RabbitMQWeb管理界面">2.2 RabbitMQWeb管理界面
- 2.3 RabbitMQ支持的消息模型">2.3 RabbitMQ支持的消息模型
- 3 RabbitMQ进阶
- 发送回调
- 发送确认
- 3.3 死信队列
- 3.5 集群模式
1 安装
1 下载rabbitmq镜像
docker pull rabbitmq
2 创建容器启动
docker run rabbitmq
-p 15672:15672 -p 5672:5672 --将容器的端口 5672(应用访问端口)和 15672 (控制台Web端口号)映射到主机中;
--name rabbitmq
--h myRabbit
-d
3 启动管理面板
rabbitmq-plugins enable rabbitmq_management
1 MQ引言
1.1 什么是MQ
MQ
(Message Quene) : 翻译为 消息队列
,通过典型的 生产者
和消费者
模型,生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,轻松的实现系统间解耦。别名为 消息中间件
通过利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。
1.2 MQ的作用
常用的MQ有,ActiveMQ、RabbitMQ,Kafka等;
RabbitMQ比Kafka可靠,Kafka更适合IO高吞吐的处理,一般应用在大数据日志处理或对实时性(少量延迟),可靠性(少量丢数据)要求稍低的场景使用,比如ELK日志收集。
1.2 MQ的使用的注意点
生产端的可靠性投递
消费端的消费幂等处理
消息队列的消息堆积能力,可靠性,高可用,低延迟
1.3 幂等性
唯一ID+指纹码(业务规则)机制,利用数据库主键去重
利用redis的原子性
2 RabbitMQ入门
2.1 RabbitMQ简介
基于AMQP协议,erlang语言开发,官方教程:https://www.rabbitmq.com/#getstarted
2.2 RabbitMQWeb管理界面
安装RabbitMQ,RabbitMQ默认占用的是15672端口,所以在浏览器访问localhost:15672
- Connections:无论生产者还是消费者,都需要与RabbitMQ建立连接后才可以完成消息的生产和消费,在这里可以查看连接情况
- Channels:通道,建立连接后,会形成通道,消息的投递获取依赖通道
- Exchanges:交换机,用来实现消息的路由
- Queues:队列,即消息队列,消息存放在队列中,等待消费,消费后被移除队列
- Bingdings:绑定,交换机和队列的绑定关系
2.3 RabbitMQ支持的消息模型
2.3.1 直连
该模型中只有一个生产者和一个消费者,生产者将消息发送到消息队列,生产者对消息队列进行监听,从消息队列取出消息进行消费。
2.3.2 任务
多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。
默认RabbitMQ将按顺序将每个消息发送给下一个使用者,无论两个消费者对消息的处理速度是否一致,其能消耗的消息数都是平均分配的,这种分发消息的方式称为循环。
可以通过关闭自动确认并设置每次能消费的消息个数来实现能者多劳。
// 关闭自动确认,需要手动确认
channel.basicConsume("work", false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者2:" + new String(body));
// 手动确认
// 参数:确认队列中哪个具体消息、是否开启多个消息同时确认
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
2.3.3 广播
- 可以有多个消费者
- 每个消费者有自己的queue(队列)
- 每个队列都要绑定到Exchange(交换机)
- 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列
- 交换机把消息发送给绑定过的所有队列
- 队列的消费者都能拿到消息。实现一条消息被多个消费者消费
2.3.4 路由
- 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
- 生产者在 向 Exchange发送消息时,也必须指定消息的 RoutingKey。
- Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息
2.3.5 主题
在路由的基础上结合通配符使用
统配符:
*
:匹配恰好1个单词#
:匹配一个或多个单词
如:
audit.#
:匹配audit.irs.corporate
或者audit.irs
等audit.*
:只能匹配audit.irs
*.audit.#
:中间必须是audit,audit前有一个或多个单词,后有一个单词3 RabbitMQ进阶
3.1 消息高可靠
3.1.1 避免消息丢失
生产者确认发送到MQ服务器(事务,确认机制)
- MQ服务器不丢数据(消息持久化)
- 消费者确认消费掉消息(事务,确认机制)
开启事务会大幅降低消息发送及接收效率,使用的相对较少,因此生产环境一般都采取消息确认模式。
3.1.2 生产者确认
生产者确认机制有很严重的性能问题,如果每秒钟只有几百的消息量,可以使用。
// 开启消息确认机制
channel.confirmSelect();
// 消息是否正常发送到交换机
channel.addConfirmListener((long deliveryTag, boolean multiple) -> {
System.out.println("消息发送成功!");
}, (long deliveryTag, boolean multiple) -> {
// 此种情况无法演示
System.out.println("消息发送失败!");
});
3.1.3 消费者确认
RabbitMQ有一个ACK机制。当消费者获取消息后,会向RabbitMQ发送回执ACK,告知消息已经被接收。
- 自动ACK:消息一旦被接收,消费者自动发送ACK
手动ACK:消息接收后,不会发送ACK,需要手动调用
private final static String QUEUE_NAME = "simple_queue"; public static void main(String[] argv) throws Exception { // 获取到连接 Connection connection = ConnectionUtil.getConnection(); // 创建通道 final Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 定义队列的消费者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { // body 即消息体 String msg = new String(body); System.out.println(" [x] received : " + msg + "!"); // 手动进行ACK channel.basicAck(envelope.getDeliveryTag(), false); } }; // 监听队列,第二个参数false,手动进行ACK channel.basicConsume(QUEUE_NAME, false, consumer); }
3.1.4 消息持久化
要将消息持久化,前提是:队列、Exchange都持久化
(1)交换机持久化
(2)队列持久化
(3)消息持久化3.2 SpringBoot结合RabbitMQ
3.2.1 依赖和并配置
(1)添加POM依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
(2)配置application.yml ```java spring: rabbitmq: host: 192.168.56.101 username: fengge password: fengge virtual-host: /fengge
发送回调
publisher-returns: true
发送确认
publisher-confirm-type: correlated listener:
simple: #手动确认 acknowledge-mode: manual
<a name="VjJ36"></a>
#### 3.2.2 生产者
**(1)生产者发送消息**
```java
// 注入RabbitTemplate
@Autowired
private RabbitTemplate rabbitTemplate;
// topic 动态路由 订阅模式
@Test
public void testTopic(){
rabbitTemplate.convertAndSend("topics","user.save","user.save 路由信息");
}
(2)生产者确认
/**
* @Description 消息发送确认
* <p>
* ConfirmCallback 只确认消息是否正确到达 Exchange 中
* ReturnCallback 消息没有正确到达队列时触发回调,如果正确到达队列不执行
* <p>
* 1. 如果消息没有到exchange,则confirm回调,ack=false
* 2. 如果消息到达exchange,则confirm回调,ack=true
* 3. exchange到queue成功,则不回调return
* 4. exchange到queue失败,则回调return
* @Author qy
*/
@Configuration
@Slf4j
public class ProducerAckConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this); //指定 ConfirmCallback
rabbitTemplate.setReturnCallback(this); //指定 ReturnCallback
}
/**
* 确认消息是否正确到达 Exchange 中
* @param correlationData
* @param ack
* @param cause
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
log.info("消息发送成功:" + JSON.toJSONString(correlationData));
} else {
log.info("消息发送失败:{} 数据:{}", cause, JSON.toJSONString(correlationData));
}
}
/**
* 消息没有正确到达队列时触发回调,如果正确到达队列不执行
* @param message
* @param replyCode
* @param replyText
* @param exchange
* @param routingKey
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
// 反序列化对象输出
System.out.println("消息主体: " + new String(message.getBody()));
System.out.println("应答码: " + replyCode);
System.out.println("描述:" + replyText);
System.out.println("消息使用的交换器 exchange : " + exchange);
System.out.println("消息使用的路由键 routing : " + routingKey);
}
}
3.2.3 消费者
(1)消费者消费消息
使用 @RabbitListener 注解标记方法,声明这个方法是一个消费者方法,需要指定下面的属性:
bindings:指定绑定关系,可以有多个。值是@QueueBinding的数组。@QueueBinding包含下面属性:
value:这个消费者关联的队列。值是@Queue,代表一个队列
exchange:队列所绑定的交换机,值是@Exchange类型
key:队列和交换机绑定的RoutingKey
@Component
public class Listener {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "spring.test.queue", durable = "true"),
exchange = @Exchange(
value = "spring.test.exchange",
ignoreDeclarationExceptions = "true",
type = ExchangeTypes.TOPIC
),
key = {"a.#"}))
public void listen(String msg){
System.out.println("接收到消息:" + msg);
}
}
(2)消费者确认
springboot-rabbit提供了三种消息确认模式(配置yml文件 ):
- NONE:不确认模式(不管程序是否异常只要执行了监听方法,消息即被消费。相当于rabbitmq中的自动确认,这种方式不推荐使用)
- AUTO:自动确认模式(默认,消费者没有异常会自动确认,有异常则不确认,无限重试,导致程序死循环。不要和rabbit中的自动确认混淆)
MANUAL:手动确认模式(需要手动调用channel.basicAck确认,可以捕获异常控制重试次数,甚至可以控制失败消息的处理方式)
@Component public class Listener { @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "spring.test.queue", durable = "true"), exchange = @Exchange( value = "spring.test.exchange", ignoreDeclarationExceptions = "true", type = ExchangeTypes.TOPIC ), key = {"a.*"})) public void listen(String msg, Channel channel, Message message) throws IOException { try { System.out.println("接收到消息:" + msg); int i = 1 / 0; // 确认收到消息,false只确认当前consumer一个消息收到,true确认所有consumer获得的消息 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { if (message.getMessageProperties().getRedelivered()) { System.out.println("消息重试后依然失败,拒绝再次接收"); // 拒绝消息,不再重新入队(如果绑定了死信队列消息会进入死信队列,没有绑定死信队列则消息被丢弃,也可以把失败消息记录到redis或者mysql中),也可以设置为true再重试。 channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); } else { System.out.println("消息消费时出现异常,即将再次返回队列处理"); // Nack消息,重新入队(重试一次) channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } e.printStackTrace(); } } }
3.3 死信队列
3.3.1 什么是死信
“死信”是RabbitMQ中的一种消息机制,当你在消费消息时,如果队列里的消息出现以下情况:
消息被否定确认,使用 channel.basicNack 或 channel.basicReject ,并且此时requeue 属性被设置为false。
- 消息在队列的存活时间超过设置的TTL时间。
- 消息队列的消息数量已经超过最大队列长度
3.3.2 死信队列的使用
- 配置业务队列,绑定到业务交换机上
- 为业务队列配置死信交换机(DLX)和路由key
- 为死信交换机配置死信队列(DLQ)
```java
/**
- 声明业务交换机
*
- @return */ @Bean public TopicExchange topicExchange() { return new TopicExchange(“spring.test.exchange”, true, false); }
- 声明业务交换机
*
/**
* 声明业务队列
* 并把死信交换机绑定到业务队列
* @return
*/
@Bean
public Queue queue() {
Map
/**
* 业务队列绑定到业务交换机
*
* @return
*/
@Bean public Binding binding() { return new Binding(“spring.test.queue”, Binding.DestinationType.QUEUE, “spring.test.exchange”, “a.b”, null); }
/**
* 声明死信交换机
* @return
*/
@Bean public TopicExchange deadExchange(){ return new TopicExchange(“dead-exchange”, true, false); }
/**
* 声明死信队列
* @return
*/
@Bean public Queue deadQueue(){ return new Queue(“dead-queue”, true, false, false); }
/**
* 把死信队列绑定到死信交换机
* @return
*/
@Bean public Binding deadBinding() { return new Binding(“dead-queue”, Binding.DestinationType.QUEUE, “dead-exchange”, “msg.dead”, null); }
<a name="EropN"></a>
### 3.4 延时队列
延时队列就是用来存放需要在指定时间被处理的元素的队列
<a name="ZZ9ei"></a>
#### 3.4.1 过期时间TTL
消息的TTL(Time To Live)就是消息的存活时间,单位是毫秒。我们可以对队列或者消息设置TTL,消息如果在TTL设置的时间内没有被消费,则会成为“死信”。如果同时配置了队列的TTL和消息的TTL,那么较小的那个值将会被使用。
<a name="SJuga"></a>
#### 3.4.2 声明延时队列
队列设置TTL:
```java
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 6000);
channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);
消息设置TTL:
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.expiration("6000");
AMQP.BasicProperties properties = builder.build();
channel.basicPublish(exchangeName, routingKey, mandatory, properties, "msg body".getBytes());
设置队列的TTL属性,那么一旦消息过期,就会被队列丢弃,
而设置消息TTL属性,即使过期,消息也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间。
如果不设置TTL,表示消息永远不会过期,如果将TTL设置为0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃。
3.4.3 延时队列的使用
生产者生产一条延时消息,根据需要延时时间的不同,利用不同的routingkey将消息路由到不同的延时队列,每个队列都设置了不同的TTL属性,并绑定在同一个死信交换机中,消息过期后,根据routingkey的不同,又会被路由到不同的死信队列中,消费者只需要监听对应的死信队列进行处理即可。
3.4.4 使用延时队列插件
下载rabbitmq_delayed_message_exchange插件(https://www.rabbitmq.com/community-plugins.html ),然后解压放置到RabbitMQ的插件目录。
接下来,进入RabbitMQ的安装目录下的sbin目录,执行命令(rabbitmq-plugins enable rabbitmq_delayed_message_exchange)让该插件生效,然后重启RabbitMQ。
3.5 集群模式
3.5.1 主备模式
一般在并发和数据量不高的情况下,这种模型非常好用且简单。<br /> 主节点可以提供读写服务,备份节点不提供读写服务。只有当主节点产生故障或出现宕机,会自动切换到备用节点,备用节点继续提供读写服务。当原来的主节点恢复后,会自动加入变成备用节点。<br />主从模式下,主节点会提供读写服务,从节点只提供读的服务。<br />主备模式下,主节点会提供读写服务,主节点正常情况下备份节点不提供任何读写。
3.5.2 远程模式
3.5.3 多活模式
多活模式要依赖rabbitmq的federation插件,能够实现持续可靠的AMQP通信,并且,配置和Shovel比较起来会更加的简单