1、基本概念
什么是MQ
消息队列,本质是一个队列,FIFO先进先出,是一种跨进程的通信机制,用于上下游传递消息。实现“逻辑解耦+物理解耦”的消息通信服务,使用了MQ之后,消息发送上游只需要依赖MQ,不需要依赖其他服务
为什么要使用MQ
流量消峰
峰值的流量做一个缓缓冲的作用,进行排队,例如一个订单系统一秒能处理1w次,而如果超过该值,可能回崩溃,但是在一些秒杀场景下,并发数肯定不止该值,因此将请求打击在MQ中,在由MQ将请求发送至服务器,慢慢处理,可以导致服务器不发生崩溃,但是由于排队的存在,用户的体验会存在一定的下降【当然,肯定比直接崩溃好】
应用解耦
比如一个订单系统由多个子系统——支付系统、库存系统、物流系统,任何一个子系统的故障,都会导致订单系统整体故障,当使用了消息队列解耦后,将系统 分开,哪个系统故障了,就去修理哪个,修理完后再继续完成后续服务,从而不用使得整个系统发生故障啊
异步处理
有些任务是异步的,比如A调用B,而B结局任务需要很长时间,但是A需要知道B什么时候执行完,一般由两种方式,A过一段时间去调用B的查询API查询,或者A提供一个回调函数,B执行完后调用API通知A服务。这是不太好的,可以使用MQ,A调用B服务后,只需要让B完成服务的时候发生一条消息给MQ,MQ会将该消息转发为A,通知A做新的操作,这样A不会频繁发送查询API,也不需要提供回调函数,只需要让B服务结束后发一条消息到MQ中
MQ分类
RabbitMQ介绍
是一个消息中间件,它接受并转发消息。可以当作是一个快递站点的功能,快递站的功能是接受快递,而MQ的功能是接受、存储和转发消息数据
四大核心概念
生产者
发布消息的一方
交换机
交换机与队列是一对多的关系,存在一个绑定关系
队列
每一个队列对应一个消费者
消费者
消费者即使用、接受消息的一方
六大核心模式
名词介绍
Broker:接受和分发消息的应用,RabbitMQServer 就是Message Broker
Connection:publisher/consumer和broker之间的TCP连接
Channel:如果每一次访问MQ都建立一个Connection,那么消费是巨大的,效率也低。Channel是在connection内部简历的逻辑连接,如果应用程序支持多线程,通常每一个thread创建单独的channel进行通讯,AMQP method 包含了channel id帮助客户端和message broker 识别channel,所以channel之间是完全隔离的。Channel作为轻量级的Connection极大减少了操作系统建立TCP连接的开销,每次发消息只占用一个Channel,一个Channel可以发送多次消息。
Exchange:message到达broker的第一站,根据分发规则,匹配查询表中给的routing key,分发消息到queue中去。常用的类型由:direct(point to point),topic(publish -subscribe)和fanout(multicast)
安装
官方安装指南
好像按照官方的东西复制到一个配置文件上,再依次执行指令好像就可以了
一些常用指令
/sbin/service rabbitmq-server stop # 关闭服务
rabbitmq-plugins enable rabbitmq_management # 安装可视化控件,但是得先关闭服务
/sbin/service rabbitmq-server status # 查看服务的状态
systemctl stop firewalld # 关闭防火墙
http://192.168.200.132:15672 # 进入后台管理页面
rabbitmqctl add_user admin 123 # 添加一个mq账户,管理后台就能之际登录了
rabbitmqctl set_user_tags admin administrator # 设置超级管理员
rabbitmqctl list_users # 查看有哪些用户
2、实操
第一个小demo
导入依赖
<!--指定 jdk 编译版本--> <build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build> <dependencies>
<!--rabbitmq 依赖客户端-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.8.0</version>
</dependency>
<!--操作文件流的一个依赖-->
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
</dependency>
</dependencies>
编写代码
package com.atguigu.rabbitmq.one;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
public static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
// 用户名和密码
factory.setHost("192.168.200.132");
factory.setUsername("admin");
factory.setPassword("123");
// 通过链接设置信道啥的
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
/**
* 生产一个队列
* 1、队列名称
* 2、队列里的消息是否需要持久化,默认消息存储再内存中
* 3、该队列是否只供一个消费者进行消费,是否进行消息共享,true可以多个消费者消费 false只能一个消费者消费
* 4、是否自动删除, 最后一个消费者断开链接以后,是否自动删除,true 是 false部删除
* 5、其他参数
*/
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
String message = "hello,world" ; // 消息体
/**
* 发布消息
* 1、发送到那个交换机 ,这里先为null
* 2、路由的key值是哪个,本次是队列的名称
* 3、其他参数信息
* 4、发送消息的消息体 以字节的形式发送
*/
channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); // 编辑好就可以发送了
System.out.println("消息发送完毕");
}
}
SLF4J: Failed to load class “org.slf4j.impl.StaticLoggerBinder”.
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
消息发送完毕
此后再web面板也可以看到变动
消费者代码处理
package com.atguigu.rabbitmq.one;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {
// 队列名
public static final String QUEUE_NAME = "hello";
// 接受消息
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
// 用户名和密码和IP
factory.setHost("192.168.200.132");
factory.setUsername("admin");
factory.setPassword("123");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明两个接口 一个接受消息接口,一个取消消息时接口 看下面注释
DeliverCallback deliverCallback = (consumerTag,message)->{
// 随便写一点逻辑先
System.out.println(new String(message.getBody())); // 把消息体展示以下
};
CancelCallback cancelCallback = (consumerTag)->{
System.out.println("消费消息被i终端");
};
/**
* 消费者消费消息
* 1、消费那个队列
* 2、消费之后是否需要自动答应 true为开启 false关闭
* 3、消费者为成功消费时的函数回调
* 4、消费者取消消费的回调
*/
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
工作队列模式
避免立即执行资源密集型任务,而不得不等待它完成。相反我们安排任务在之后执行。我们把任务封装成消息并将其发送到队列。而在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务
什么情况呢?就比如下图中显示的
即队列给工作线程轮询发送消息,一个消息只被处理一次,不被处理多次,三个工作线程是竞争的关系
下面这个案例中,将启动两个工作线程,一个消息发送线程,另一个是消息接受线程。
下面编写一个接收者工作线程,其内容如下, 并且可以通过设置其解释器而让其多线程运行,只需要勾选后再点一次开启Main方法即可
多个不同的work01进程被运行,可以认为是多线程状态
生产者Task01.java
package com.atguigu.rabbitmq.two;
import com.atguigu.rabbitmq.util.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import java.util.Scanner;
/**
* 任务发布者,即生产者
*/
public class Task01 {
public static final String QUEUE_NAME="hello";
public static void main(String[] args) throws Exception {
// 准备发送大佬消息
Channel channel = RabbitMqUtils.getChannel();
// 队列的声明
/**
* 生产一个队列
* 1、队列名称
* 2、队列里的消息是否需要持久化,默认消息存储再内存中
* 3、该队列是否只供一个消费者进行消费,是否进行消息共享,true可以多个消费者消费 false只能一个消费者消费
* 4、是否自动删除, 最后一个消费者断开链接以后,是否自动删除,true 是 false部删除
* 5、其他参数
*/
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
// 从控制台当中接受信息
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
String message= scanner.next();
/**
* 1、表示发送到哪个交换机
* 2、路由的key值时哪个 本次是队列的名称
* 3、其他参数信息
* 4、发送消息的消息体,需要用字节流传输
*/
System.out.println("发送的消息是:"+message);
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
}
}
}
消费者Worker01
package com.atguigu.rabbitmq.two;
import com.atguigu.rabbitmq.util.RabbitMqUtils;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
/**
* 相当于消费者,由多个消费者
*/
public class Worker01 {
public static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// 直接进行消息的发送和接受,channel则通过工具类获取了
/**
* 消费者消费消息
* 1、消费那个队列
* 2、消费之后是否需要自动答应 true为开启 false关闭
* 3、消费者为成功消费时的函数回调
* 4、消费者取消消费的回调
*/
DeliverCallback deliverCallback =(consumerTag, message)->{
System.out.println("接收到的消息:"+new String(message.getBody()));
};
CancelCallback cancelCallback = (consumerTag)->{
System.out.println(consumerTag+"消息被取消掉咯,取消");
};
System.out.println("c1等待接受消息....");
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
消息应答
当消息被消费者接收到后,消费者需要去处理消息,但是万一这个处理的时间比较久,在运行的时候消息挂了会怎么样呢?这需要引入一个消息应答的机制,消息应答就是:消费者在接受到消息并且处理了消息之后,告诉MQ它已经处理了,rabbitMq就可以将该消息删除了。
自动应答
消息发送后就被认为接受成功,这种模式需要在高吞吐量和数据安全方面做一个权衡,因为这种模式如果消息在接受到之前,消费者那边出现链接或者关闭channel,那么消息就丢失了,当然这一方面这种消费者模式那边可以传递过载的消息,没有对传递的消息数量进行限制,当然这样可能使得消费者那边由于接受太多来不及处理的消息,导致消息的积压,最终使得内存耗尽,最终这些消费者线程被i操作系统杀死,所以这种模式使用在消费者可以高校并以某种速率处理这些消息的情况下,也就是说,自动应答,可能并不是很靠谱,需要消费者那边有保障
消息应答的方法
Channel.basicAck #用于肯定确认
Channel.basicNack # 用于否定确认
Channel.basicReject # 用于否定确认,比basicNack少了一个参数
还涉及到批量应答于不批量应答,
批量应答:该信道的所有消息都按照该策略执行,将后续的消息也应答了
不批应答:队列头消息按照该策略执行,只应答一个消息
消息队列重新入队
该策略可以保证消息的不丢失
该图描述了对于未确认的消息会如何处理,
图1,c1消息被队列正常发送,发送给信道1处理
图2,消息1发送后并没有返回ack确认信息
图3,消息队列意识到了消息1没有返回ack,因此会将消息1重新入队,同时c2发送了确认ack,那么消息队列中给的消息2就可以被处理掉。
图4,消息队列重新发送消息1,将其发送给信道2
持久化
队列持久化
持久化可以保证在服务宕机的时候保证数据可以被持久化到硬盘
对于持久化声明,只需要在声明的时候将其声明未true为即可持久化,但是如果该该队列已经开启,那么则必须得先删除该队列,再次开启后才生效,即可使得队列持久化
channel.queueDeclare(QUEUE_NAME,true,false,false,null); // 第二个参数为durable 即可否持久化
消息持久化
上述的是队列信道的持久化,而并不是直接针对消息,针对消息的持久化,需要在消费者端设置,并且消息的持久化并不能完全保证不丢失消息。尽管它告诉RabbitMq将消息保存到磁盘,但是这里依然存在当消息刚准备存储在磁盘的时候,但是还没有存储完,消息还缓存的一个间隔点。此时并没有真正地写入磁盘。保证持久性并不强,但是对于大部分地简单队列还是足够应付地。在生产者发布消息的时候修改发送信道,即可使得消息持久化
channel.basicPublish(“”,TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes(“UTF-8”));
不公平分发
RabbitMq中,默认使用轮询分发,即公平分发,给每个消费者都分配数量相近的任务,而对于系统吞吐量来说,这样并不一定是好的,因为有可能一个管道被分到时间长的任务,而另一个管道分配到时间端的任务,那么就可能导致再后来的分配任务中,完成快的管道完成了任务,然后时间长的管道一直在执行任务,导致快的管道资源被浪费了,没有被利用到,不公平分发就是为了解决这种窘境。
对于不公平分发的设置,需要在消费者处修改。
int perfetchCount = 1;
channel.basicQos(perfetchCount); //设置为不公平分发
在多个消费者中都需要声明上述代码
预取值
预取值也是一种任务分发的方式
但是区别于公平分发和不公平分发,预取值会决定发给哪些管道多少任务
用处:可以不公平地分发消息,对于那些性能好的机器,可以提升其prefetch值,对于性能差地机器,可以适当降低,达到总和利用率的效果
发布确认原理
发布确认的作用,是保证消息的不丢失。
在消息持久化的过程中,需要保证消息队列达到以下三个要求,才能保证消息的持久化得到确认
1、要求设置队列开启持久化
2、要求设置队列接受消息持久化
3、发布确认
开启发布确认的方法
发布确认默认是灭偶开启的,如果开启需要调用方法confirmSelect,每当想要使用发布确认,都需要在channel上调用该方法
channel.confirmSelect();
单个确认发布
简单的确认方式,是一种同步确认发布的方式,也就是发布一个消息之后只有确认发布,后续的消息才能继续发布。
最大的缺点:发布速度特别慢,这种方式仅仅能够提供每秒吞吐量为百级别,当然,这对一些程序来说已经够了
public static void publishMessageIndividually() throws Exception{
Channel channel = RabbitMqUtils.getChannel();
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName,true,false,false,null);
// 开启发布确认
channel.confirmSelect();
// 记录开始时间
long begin = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i+"";
channel.basicPublish("",queueName,null,message.getBytes());
// 同步的消息等待确认
boolean flag = channel.waitForConfirms();
}
long end = System.currentTimeMillis();
System.out.println("发布"+MESSAGE_COUNT+"条数据 单独确认花费时间"+(end-begin)+"ms");
}
批量确认发布
一次性发出多个,但是确认是批量的,也就是说,一批内可能存在某条消息发送失败,但是只能知道是该批的,无法知道是该批中具体的哪一个。
public static void publishMessageBatch() throws Exception{
Channel channel = RabbitMqUtils.getChannel();
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName,true,false,false,null);
// 开启发布确认
channel.confirmSelect();
// 记录开始时间
long begin = System.currentTimeMillis();
// 批量确认消息的大小
int batchSize = 100;
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i+"";
channel.basicPublish("",queueName,null,message.getBytes());
// 同步的消息等待确认
// 拿到100条消息的时候,批量确认依次
if (i%batchSize == 0){
channel.waitForConfirms(); // 每100条确认一次
}
}
long end = System.currentTimeMillis();
System.out.println("发布"+MESSAGE_COUNT+"条数据 批量确认花费时间"+(end-begin)+"ms");
}
异步逻辑确认发布
利用回调函数,对于发出的消息,会“稍后告诉你”,代码实现的逻辑会复杂的许多,但是在效率和利用率和持久性上都是具有优良的表现
代码如下
public static void publishMessageAsync() throws Exception{
Channel channel = RabbitMqUtils.getChannel();
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName,true,false,false,null);
// 开启发布确认
channel.confirmSelect();
// 记录开始时间
long begin = System.currentTimeMillis();
// 核心代码
// 准备监听器,监听哪些消息确认,哪些没确认
/**
* 1、消息的标记
* 2、是否为批量确认
*/
ConfirmCallback ackCallback = (deliveryTag,multiple)->{
// 确认成功时回调函数
};
ConfirmCallback noAckCallback = (deliveryTag,multiple)->{
// 确认失败时回调函数
System.out.println("未确认消息:"+deliveryTag);
};
/**
* 1、监听哪些消息成功了
* 2、监听哪些消息失败了
*/
channel.addConfirmListener(ackCallback,noAckCallback);
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = "消息"+i;
channel.basicPublish("",queueName,false,null,message.getBytes());
}
long end = System.currentTimeMillis();
System.out.println("发布"+MESSAGE_COUNT+"条数据 异步批量确认花费时间"+(end-begin)+"ms");
}
从结果可以看到,是消息发完了,而消息处理的监听器还在持续工作,说明他们之间并没有明确的先后关系。而对于这些消息,我们并不能在发送阶段保证其确认,对于那些未被成功确认的消息,我们需要去做一些操作
思路:在消息发生时将所有消息添加到一个map结构的容器中,在消息成功接收时将消息从这个map中移除,剩下的就是未被成功接收到的消息
5、交换机
在之前的内容章节中,一致都是以某一个队列为核心进行消息的发送和交付。假设工作队列背后,每个任务都能切好交付给一个消费者,在这一部分中,我们将做一些完全不同的事情,我们将消息传达给多个消费者。这种模式称为“发布/订阅”
为了说明这种模式,我们将构建一个生产者一对多消费者的模式,即一个消息能被多个消费者消费,这就好比微信公众号平台发布的推文,被订阅者都能看到,就是这种关系。交换机则是该消费模式的关键对象。
核心思想是:生产者生产消息不会直接发送到队列。实际上,通常生产者都不知道自己会发送消息到哪些队列中。
相反,生产者只能将消息发送到交换机,交换机的内容非常简单,一方面他将接受来自生产者的消息,另一方面,将他们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消息放到特点的队列还是说丢弃他们。这就由交换机的类型来决定了
交换机类型
- 直接(direct)
- 主题(topic)
- 标题(headers)
- 扇出(fanout)
扇出(fanout)
Fanout类型十分简单,它将所有接收到的消息广播到它知道的所有队列中。系统中默认有些exchange类型
两个消息接收者Receiver01和Receiver02除类名不一样,代码完全一致,如下 ```java package com.atguigu.rabbitmq.five;
import com.atguigu.rabbitmq.util.RabbitMqUtils; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback;
public class ReceiveLogs01 {
// 交换机的名称
public static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
// 交换机类型声明
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
// 队列声明
/**
* 生产一个临时队列,队列的名称时随机的
* 当消费者断开与队列链接的时候,队列就自动删除
*/
String queueName = channel.queueDeclare().getQueue();
/**
* 绑定交换机个队列
*/
channel.queueBind(queueName,EXCHANGE_NAME,""); // fanout类型的交换机不需要routingKey
System.out.println("等待接受消息,把接受消息打印在频幕上");
DeliverCallback deliverCallback = (arg1,arg2)->{
System.out.println(new String(arg2.getBody()));
};
channel.basicConsume(queueName,true,deliverCallback,arg->{
});
}
}
一个发送者EmitLog代码如下
```java
package com.atguigu.rabbitmq.five;
import com.atguigu.rabbitmq.util.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import java.util.Scanner;
import java.util.concurrent.Executors;
/**
* 消息的发布,发给交换机
*/
public class EmitLog {
public static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
// 交换机类型声明
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
String message = scanner.next();
channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes("UTF-8"));
System.out.println("消息发送:"+message);
}
}
}
直接交换机
直接交换机不同于fanout交换机,直接交换机可以指定那些队列接受而非广播给所有队列,根据routingKey来区别需要发送的队列和不发送消息的队列。就是以一个routingKey唯一绑定一个queue,从而当发送的时候,指定routingkey时,消息就发送到routingKey对应的那个队列之中。代码见
topic主题交换机
在前两种交换机中,direct能有选择性地使得交换机发送内容给队列,但仍存在局限性,比如接受的类型有info.base和info.advantage,但是某个队列指向要info.base的消息,那么这个时候direct就办不到了。这个之后只能使用topic类型
发送到类型死topic交换机的消息的routing_key不能随意编写,必须满足一个条件按,它必须是一个担此列表,以点号分隔开。这些单词可以是任意单词,比如说”stack.usd.nyse”,”myse.vmw”,”quick.orange,rabbit”这种类型的。当然这个单词列表最多不能超过255个租户额
在这个规则列表中,其中有两个替换符是大家需要注意的
星号可以额代替一个单词
#井号可以代替零个或者多个单词
特别的,当一个队列绑定的是#,那么这个队列将接受所有数据【因为其匹配所有规则】。如果该队列绑定没有#和出现,那么该队列绑定类型就是direct了,一个规则匹配一个队列,一个队列可以被多个规则所匹配,有点像direct交换机,几个队列可以被多个routingKey匹配,只不过这里是规则而不是直接匹配
通过后两个规则可以匹配Q2,通过第一个规则可以匹配Q1
代码,接受者1
package com.atguigu.rabbitmq.Seven;
import com.atguigu.rabbitmq.util.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.util.concurrent.Executors;
/**
* 消费者从C1
*/
public class ReceiveLogsTopic1 {
// 交换机的名称
public static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
String queueName = "Q1";
channel.queueDeclare(queueName,false,false,false,null);
// 绑定规则
channel.queueBind(queueName,EXCHANGE_NAME,"*.orange.*");
System.out.println("等待接受消息");
DeliverCallback deliverCallback = (arg1, arg2)->{
System.out.println(new String(arg2.getBody()));
System.out.println("接受队列"+queueName+"绑定的键:"+arg2.getEnvelope().getRoutingKey());
};
channel.basicConsume(queueName,true,deliverCallback,arg->{
});
}
}
接收者2
package com.atguigu.rabbitmq.Seven;
import com.atguigu.rabbitmq.util.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
/**
* 消费者从C1
*/
public class ReceiveLogsTopic2 {
// 交换机的名称
public static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
String queueName = "Q2";
channel.queueDeclare(queueName,false,false,false,null);
// 绑定规则
channel.queueBind(queueName,EXCHANGE_NAME,"*.*.rabbit");
channel.queueBind(queueName,EXCHANGE_NAME,"lazy.#");
System.out.println("等待接受消息");
DeliverCallback deliverCallback = (arg1, arg2)->{
System.out.println(new String(arg2.getBody()));
System.out.println("接受队列"+queueName+"绑定的键:"+arg2.getEnvelope().getRoutingKey());
};
channel.basicConsume(queueName,true,deliverCallback,arg->{
});
}
}
发送者代码
package com.atguigu.rabbitmq.Seven;
import com.atguigu.rabbitmq.util.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;
public class EmitLogTopic {
public static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
// 创建一个map来装消息和结果
Map<String,String> map = new HashMap<>();
map.put("quick.orange.rabbit","Q1Q2");
map.put("lazy.orange.elephant","Q1Q2");
map.put("quick.orange.fox","Q1");
for (Map.Entry<String, String> buildingKeyEntry : map.entrySet()) {
String routintKey = buildingKeyEntry.getKey();
String message = buildingKeyEntry.getValue();
channel.basicPublish(EXCHANGE_NAME,routintKey,null,message.getBytes("UTF-8"));
System.out.println("生产者发出消息"+message);
}
}
}
无名交换机
第一个参数时交换机的名称,空字符串表示默认或者无名交换机,消息能路由发送到队列中其实由routingKey(bindingkey)绑定key指定的,如果它存在的话。
我们之前发布消息时,对交换机的设置为空字符串,其实就是一个无名交换机
channel.basicPublish(“”,TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes(“UTF-8”));
临时队列
临时队列指那些不带有持久化功能的队列,即一但服务关闭,下次再启动时,该些队列就不存在了
Features那一列为D,即代表为持久化队列,可以看到没有非持久化队列,因为被我重启服务后非持久化队列都没了。。。。
6、死信队列
概念上来说,死信就是无法被消费的消息,一般有producer生产消息到broker或直接到queue里,consumer从queue去除消息进行消费,但是某些时候由于特殊原因导致queue队列中的某些消息无法被消费,这样的消息如果没有进行后续的处理,就变成了死信,有了死信就自然会有死信队列。
应用场景:为了保证订单业务的消息数据不丢失,需要使用到RabbitMQ的死信队列机制,当消息消费发送异常时,将消息投入死信队列中,还有比如说,用户在商城下单成功并点击支付后在指定事件为支付时消息自动失效。
死信的来源
1、消息TTL过期
2、队列达到最大长度(队列满了,无法在田家庵到数据mq中)
3、消息被拒绝(basic.reject或basic.nack)并且requeue = true
如果消息在进入队列时发生以上三种情况之一的,就会进入如意死信交换机,死信交换机将消息发送给死信队列
延迟队列
对于c2来说,如果是由于ttl被过期了而进入死信队列中,假设ttl为10s,那其实便是延迟10s后到达c2,对于c1和c2我们采用不同的逻辑,例如可以实现以下场景
1、订单在十分种之内未支付则自动取消
2、新建的店铺,如果在十天之内都没有上传过商品,则自动发送消息提醒
3、用户注册成功后,如果三天内没有登录则进行短信提醒。
4、用户发起退款,如果三天内没有得到处理则通知相关人员
5、预定会议后,需要在预定时间点前10分钟通知各个与会人员参加会议
这些场景都有一个特点,那边使用偶两套处理逻辑,一套是正常的处理逻辑,而另一套是超时处理逻辑。需要在某个时间点发发生前或者发生后进行数据的检查
下面集合Spring Boot进行一套代码的演示
其从声明到绑定,通过该代码的配置关系中得以体现
package com.atguigu.rabbitmq.springbootrabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class TtlQueueConfig {
// 普通交换机名称
public static final String X_EXCHANGE = "X";
// 死信交换机名称
public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
// 普通队列名称
public static final String QUEUE_A = "QA";
public static final String QUEUE_B = "QB";
// 死信队列名称
public static final String DEAD_LETTER_QUEUE = "QD";
// 一个新的普通队列 QC
public static final String QUEUE_C = "QC";
@Bean("queueC")
public Queue queueC(){
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
arguments.put("x-dead-letter-routing-key","YD");
// arguments.put("x-message-ttl",xx) 由于ttl的时间可以达到随时变动的效果,因此不固定写死
return QueueBuilder.durable(QUEUE_C).withArguments(arguments).build();
}
@Bean
public Binding queueCBindingX(@Qualifier("queueC") Queue queueC,
@Qualifier("xExchange") DirectExchange xExchange
){
return BindingBuilder.bind(queueC).to(xExchange).with("XC");
}
// 声明x交换机
@Bean("xExchange")
public DirectExchange xExchange(){
return new DirectExchange(X_EXCHANGE);
}
@Bean("yExchange")
public DirectExchange yExchange(){
return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
}
@Bean("queueA")
public Queue queueA(){
Map<String, Object> arguments = new HashMap<>();
// 设置死信交换机
arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
arguments.put("x-dead-letter-routing-key","YD");
// 设置ttl 10s
arguments.put("x-message-ttl",10000);
return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();
}
@Bean("queueB")
public Queue queueB(){
Map<String, Object> arguments = new HashMap<>();
// 设置死信交换机
arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
arguments.put("x-dead-letter-routing-key","YD");
// 设置ttl 40s
arguments.put("x-message-ttl",40000);
return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();
}
// 死信队列
@Bean("queueD")
public Queue queueD(){
return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
}
// 绑定
@Bean
public Binding queueABindingX(@Qualifier("queueA") Queue queueA,
@Qualifier("xExchange") DirectExchange xExchange
){
return BindingBuilder.bind(queueA).to(xExchange).with("XA");
}
@Bean
public Binding queueBBindingX(@Qualifier("queueB") Queue queueB,
@Qualifier("xExchange") DirectExchange xExchange
){
return BindingBuilder.bind(queueB).to(xExchange).with("XB");
}
@Bean
public Binding queueBBindingY(@Qualifier("queueD") Queue queueD,
@Qualifier("yExchange") DirectExchange yExchange
){
return BindingBuilder.bind(queueD).to(yExchange).with("YD");
}
}
生产者-一个controller接受请求然后发送消息
package com.atguigu.rabbitmq.springbootrabbitmq.controller;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
@Slf4j(topic = "c.SendMsgController")
@RestController
@RequestMapping("/ttl")
public class SendMsgController {
// 开始发消息
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMsg/{message}")
public void sendMsg(@PathVariable(name = "message")String message){
log.info("当前时间:{},发送一条信息给两个ttl队列:{}",new Date().toString(),message);
// 使用SpringBoot的template进行发消息
rabbitTemplate.convertAndSend("X","XA","消息来自ttl为10s的队列"+message);
rabbitTemplate.convertAndSend("X","XB","消息来自ttl为40s的队列"+message);
}
@GetMapping("/sendExpirationMsg/{message}/{ttlTime}")
public void sendMsg(@PathVariable(name = "message")String message,@PathVariable(name = "ttlTime")String ttlTime)
{
log.info("当前时间:{},发送一条时长{}毫秒的信息给ttl队列QC:{}",new Date().toString(),ttlTime,message);
// 在msg中定制过期发送时长
rabbitTemplate.convertAndSend("X","XC",message,message1 ->{
message1.getMessageProperties().setExpiration(ttlTime);
return message1;
} );
}
}
发送两个请求
http://localhost:8080/ttl/sendExpirationMsg/你好 1/20000
http://localhost:8080/ttl/sendExpirationMsg/你好 2/2000
结果如下
但是对于这些消息,他们遵循了一个先来后到的原则,即如果一条消息延迟20s,然后第二条消息接着发送,延迟2s,那么总会在20s之后才收到两条消息,而不是说在第二条消息发出后2s,收到第二条消息,然后第一条消息发出20s后收到第一条消息,跟我们所想要的效果大致不一样。到第一个消息结束后才指向第二个,这是不好的。因为队列是先进先出的【好像没毛病】
如果想要是实现我们想要的效果,那么就要使用rabbitMq的插件。
由延迟交换机来保证消息的延迟发送。即声明交换机的时候声明交换机为延迟类型
两端代码如下
package com.atguigu.rabbitmq.springbootrabbitmq.config;
import com.sun.corba.se.spi.ior.ObjectKey;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class DelayQueueConfig {
// 交换机
public static final String DELAYED_QUEUE_NAME = "delay.queue";
// 队列
public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
// routingKey
public static final String DELAYED_ROUTING_KYE = "delayed.routingKey";
@Bean
public Queue delayedQueue(){
return new Queue(DELAYED_QUEUE_NAME);
}
@Bean // 自定义交换机
public CustomExchange delayedExchange(){
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-delayed-type","direct");
/**
* 参数
* 1、接交换机名称
* 2、交换机的类型
* 3、是否需要持久阿虎 true /false
* 4、是否需要自动删除 true/false
* 5、其他的参数
*/
return new CustomExchange(DELAYED_EXCHANGE_NAME,"x-delayed-message",true,false,arguments);
}
// 交换机和队列进行绑定
@Bean
public Binding delayedQueueBindingDelayedExchange(@Qualifier("delayedQueue")Queue queue,@Qualifier("delayedExchange")CustomExchange customExchange){
return BindingBuilder.bind(queue).to(customExchange).with(DELAYED_ROUTING_KYE).noargs();// 无参?
}
}
package com.atguigu.rabbitmq.springbootrabbitmq.consumer;
import com.atguigu.rabbitmq.springbootrabbitmq.config.DelayQueueConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Date;
/**
* 基于插件的延迟消息
*/
@Slf4j
@Component
public class DelayQueueConsumer {
// 监听该队列的消息
@RabbitListener(queues = DelayQueueConfig.DELAYED_QUEUE_NAME)
public void receiveDelayQueue(Message message){
String msg =new String(message.getBody()) ;
log.info("当前时间:{},收到延迟队列的消息:{}",new Date().toString(),msg);
}
}
controller中
// 开始发送消息 基于插件的 消息 及 延迟 的时间
@GetMapping("/sendDelayMsg/{message}/{delayTime}")
public void sendMsg(@PathVariable String message,@PathVariable Integer delayTime ){
log.info("当前时间:{},发送一条时长{}毫秒的信息给ttl队列:{}",new Date().toString(),delayTime,message);
rabbitTemplate.convertAndSend(DelayQueueConfig.DELAYED_EXCHANGE_NAME,DelayQueueConfig.DELAYED_ROUTING_KYE,message, message1 ->{
message1.getMessageProperties().setDelay(delayTime); // 与上面的发送格式不太一样
return message1;
} );
}
8、发布高级确认
在生产情况中,可能有一些原因导致mq被重启,在rabbitmq重启期间生产者消息投递失败,导致消息丢失,需要收到您处理和恢复。于是如何才能使RabbitMq消息可靠投递呢?特别实在极端的情况下,当RabbitMQ集群不可用的时候,无法投递的消息该如何处理呢】
当中间的交换机、队列有其中一个不存在时,会导致消息的丢失
采用新的架构,一步一步确认,如果实在交换机阶段丢失,该如何处理,如果是在队列阶段丢失,该如何处理
8.2 回退消息
在仅开启了生产者机制的情况下,交换机会接受到消息后,会直接给消息生产者发送确认消息,如果发现消息不可路由,那么消息会直接被丢弃,此时生产者是不知道消息被丢弃这个时间的,那么如何让MQ知道该进行那些操作呢?详见下面这几端代码
Confirm配置类
package com.atguigu.rabbitmq.springbootrabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 配置类 目的是为了发布确认(高级)
*/
@Configuration
public class ConfirmConfig {
// 交换机
public static final String EXCHANGE_NAME = "confirm_exchange";
// 队列
public static final String CONFIRM_QUEUE_NAME = "confirm_queue";
// routingKey
public static final String CONFIRM_ROUTING_KEY = "key1";
// 声明
@Bean("confirmExchange")
public DirectExchange confirmExchange(){
return new DirectExchange(EXCHANGE_NAME);
}
@Bean("confirmQueue")
public Queue confirmQueue(){
return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
}
@Bean
public Binding confirmBinding(@Qualifier("confirmExchange")DirectExchange directExchange,@Qualifier("confirmQueue")Queue queue)
{
return BindingBuilder.bind(queue).to(directExchange).with(CONFIRM_ROUTING_KEY);
}
}
MyCallBack实现确认接口和回退接口
package com.atguigu.rabbitmq.springbootrabbitmq.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
*
*/
@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback{
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 注入
*/
@PostConstruct
public void init(){
// 凡是实现地接口 都要在这里注入
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnsCallback(this);
}
/**
* 审核时候会调用
* 1、发消息 交换机收到了 回调
* 1.2 correlationData 保存回调消息的ID及相关
* 1.2 交换机收到消息 true
* 1.3 cause null
* 2、发消息 交换机接受失败了 回调
* 2.2 correlationData保存回调消息的ID及夏哥哥信息
* 2.2 交换机收到消息,ack = true
* 2.3 cause失败的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String casue) {
String id = correlationData == null?"":correlationData.getId();
if (ack){
log.info("交换机已经收到id为:{}的消息",id);
}else{
log.info("交换机还未收到id为:{}的消息,由于原因:{}",id,casue);
}
}
// 可以在消息传递过程中不可达目的地时将消息返回给生产者
// 只有不可达目的地地时候 才进行回退
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
// 发生下下哦i回退
log.error("消息{},被交换机{}退回,原因:{},路由:{}",new String(returnedMessage.getMessage().getBody()),returnedMessage.getExchange(),returnedMessage.getReplyText(),returnedMessage.getRoutingKey());
}
}
消费者,正常接收到消费时地处理
package com.atguigu.rabbitmq.springbootrabbitmq.consumer;
import com.atguigu.rabbitmq.springbootrabbitmq.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class Consumer {
@RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)
public void receiveConfirmMessage(Message message){
System.out.println(new String(message.getBody()));
log.info("接收到队列confirm.queue的消息",new String(message.getBody()));
}
}
消息请求接口
package com.atguigu.rabbitmq.springbootrabbitmq.controller;
import com.atguigu.rabbitmq.springbootrabbitmq.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/confirm")
@Slf4j
public class ProducerController {
@Autowired
RabbitTemplate rabbitTemplate;
// 发消息
@GetMapping("/sendMessage/{message}")
public void sendMessage(@PathVariable String message ){
// 在回调接口处会用到这个东西,这个东西对应回调接口中的那个参数
CorrelationData correlationData = new CorrelationData("1");
rabbitTemplate.convertAndSend(ConfirmConfig.EXCHANGE_NAME,ConfirmConfig.CONFIRM_ROUTING_KEY,message,correlationData);
log.info("发送的消息是:{}",message);
}
}
8.3消息备份
消息虽然可以有确认接口和回退接口,但是如果有一个备份接口,那么就可以让多一种处理方式
老样子,依然是添加备份接口
处理级别:备份有优先级要高一些。如果存在备份方案,那么就不会走消息确认和回退方案
// 备份交换机
public static final String BACKUP_EXCHANGE_NAME = "backup_exchange";
// 备份队列
public static final String BACKUP_QUEUE_NAME = "backup_queue";
// 警告队列
public static final String WARNING_QUEUE_NAME ="warning_queue" ;
// 备份交换机、两个队列,以及他们之间的绑定
@Bean("backupExchange")
public FanoutExchange bakcupExchange(){
return new FanoutExchange(BACKUP_EXCHANGE_NAME);
}
@Bean("backupQueue")
public Queue backupQueue(){
return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
}
@Bean("warningQueue")
public Queue warningQueue(){
return QueueBuilder.durable(WARNING_QUEUE_NAME).build();
}
// 同时确认交换机也要发一份确认报文到备份交换机中
@Bean
public Binding backupBinding1(@Qualifier("backupExchange")FanoutExchange fanoutExchange,@Qualifier("backupQueue")Queue queue)
{
return BindingBuilder.bind(queue).to(fanoutExchange);
} @Bean
public Binding backupBinding2(@Qualifier("backupExchange")FanoutExchange fanoutExchange,@Qualifier("warningQueue")Queue queue)
{
return BindingBuilder.bind(queue).to(fanoutExchange);
}
并且确认交换机这里需要有改变,因为不管有没有确认成功,都需要将消息备份到备份队列中【启一个日志的作用?】
// 声明
@Bean("confirmExchange")
public DirectExchange confirmExchange(){
// 这里是添加了转发功能 转发到备份交换机上
return ExchangeBuilder.directExchange(CONFIRM_QUEUE_NAME).durable(true).withArgument("alternate-exchange",BACKUP_EXCHANGE_NAME).build();
// return new DirectExchange(EXCHANGE_NAME);
}
9、RabbitMq其他知识点
9.1 幂等性
用户对于统一操作的一起或多次,如果在服务器中起到的效果一样的,那么称为幂等性,如果不一样,那么称为非幂等性。
如点击多次付款,肯定不能接受。
消息重复读
消费者在消费mq时,mq已经把消息发送给消费者,消费者在给MQ返回ack时网络中断,古MQ未收到确认消息,该条消息会发生给其他消费者,或者在网络在网络重连后再次发生个消费者,但实际上该消费者已经成功消费了该消息了,造成消费者消费了重复的消息。
解决办法
一般解决使用全局ID或者写个唯一标识比如时间戳或者UUID或者订单消费者消费MQ中的消息也可利用MQ的该id来判断,或者可以按自己的规则生成唯一一个全局唯一id,每次消费消息时用该id先判断该消息是否已经消费过,
唯一ID+指纹码机制
指纹码:一些规则或者时间戳加别的服务给到的唯一信息码,它不一定是我们系统生成的,基本都是有我们的业务规则拼接出来的,但是一定要保证唯一性,然后就利用查询语句进行判断这个id是否存在是巨款中,优势就是实现简单一个拼接,然后查询是否重复即可,劣势是在高并发时,如果单个数据库就会有写入性能瓶颈,当然也可以采用分库分表提示性能,但也不是最好的方式
利用redis原子性
redis执行setnx指令,天然具有幂等性,从而实现不重复消费
9.2 优先级队列
如在一个系统中有订单催付的场景,可以用到优先级队列,因为有大客户和小客户之分,如果是重量级的客户,理应当其优先级会更高,大客户给高级优先级,否则给普通优先级
如果此时有这些消息进来,数字代表客户,其右边表示各自的客户权值(越高的越重量级),在RabbitMq中,优先级队列等级为0-255
创建一个优先级队列
可以直接在web端进行创建
最大值越大的话,对于排序时性能耗费越大,因此如果没必要的话,尽量将值设置为小一点即可
或者在代码中声明队列的时候
Map<String, Object> params = new HashMap();
params.put("x-max-priority", 10);
channel.queueDeclare("hello", true, false, false, params);
消息中代码添加优先级
AMQP.BasicProperties properties = new
AMQP.BasicProperties().builder().priority(5).build(); // 先声明优先级,后进行发生
channel.basicPublish("",QUEUE_NAME,properties,message.getBytes());// 是用了优先级声明的消息
channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); // m
惰性队列
惰性队列:消息保存在磁盘上还是在内存中
正常情况:消息式保存在内存中
队形队列:消息保存在磁盘中
如果有这么一个场景,大量的消息把服务弄崩溃了,此时大量的消息没有抵达消费者,如果是这个正常情况,消息会随着服务的关闭而丢失在内存中,但是如果使用惰性队列,消息就能在硬盘上得到保存,因此惰性队列适用于宕机、崩溃等情况下
两种模式
MQ有两种模式deafult和lazy,前者就是正常模式,而后者式惰性模式,两种模式都是基于某个队列的,因此可以在channel.queueDeclear方法中进行相关参数的设置,也可以通过policy设置,policy设置的具有更高的优先级。如果一个队列已经有了一种模式了,需要更改模式,那么则需要先删除队列再出创建新的队列
args.put("x-queue-mode","lazy");
channel.queueDeclear(queueName,null,null,null,args)
10、MQ集群
集群能有效地提高流量和可靠性。但是集群的内容是不同步的,也就是说主节点的队列,在从节点中是没有的,包括其中过的消息,也是会消失的,因此需要使用镜像队列
如
主节点:1号节点
从节点:2、3号节点
镜像队列,即备份,在主节点新增内容【队列、消息】的时候,将消息备份一份到别的节点中,也可以备份到所有节点中,当然,着看你是否需要了。这样做的好处就是:就算集群只剩下一台机器了,只要这台机器能用,那么服务依然能够消费队列里面的消息。
除此之外,对于mq服务,我们生产者在声明的时候,需要指定某个服务器的某个端口,那么如果连接的服务宕机了,生产者该如何知道取切换ip到好的服务器那里呢?这就涉及到了负载均衡方面的知识,需要借助外部的负载均衡组件去帮我们做一些抉择。
Haproxy
将请求转发到后面的多台mq服务器
keepalive进行生命探测
Federation Exchange
联合交换机,像一些距离比较远的服务,例如北京和深圳,他们之间的网络延迟是一个需要考虑的问题。有一个北京的业务需要连接北京,向其中的交换机A发送消息,此时的网络延迟很小,可以迅速地将消息发送给交换机A,但是如果有是深圳发来地,那么肯定会有比较多地延迟,造成性能降低,一定程度上造成阻塞。Federation插件就可以比较好的解决这个问题
开启插件指令
rabbitmq-plugins enable rabbitmq_federation
rabbitmq-plugins enable rabbitmq_federation_management
Shovel
其功能与Federation类似,同样具有数据从上游传到下游的功能,Shovel够可靠,持续地从一个Broker中地队列拉取数据并转发至另一个Broker交换器,作为源端地队列和作为目的端地交换器可以同时位于一个Borker,也可以位于不同地Broker上。Shovel可以翻译为“铲子”,将消息从一方铲到另一方。Shovel行为就像优秀地客户端应用能够负责连接源和目的地、负责消息的读写及负责连接失败问题的处理
rabbitmq-plugins enable rabbitmq_shovel
rabbitmq-plugins enable rabbitmq_shovel_management
有一个Source和Destination可以理解为发送方和接收方