一、安装
1.1 安装erlang运行环境
otp_win64_23.2.rar
安装otp_win64_20.2.exe
右键以管理员身份运行
接着选定安装路径,一直点击下一步傻瓜式安装
1.2 安装RabbitMQ
1.2.1 安装rabbitmq-server-3.7.4.exe
rabbitmq-server-3.8.14.rar
双击文件rabbitmq-server-3.7.4.exe,傻瓜式安装,(注意不要安装在包含中文和空格的目录下!安装后window服务中就存在rabbitMQ了,并且是启动状态。 )
1.2.2 安装管理界面(插件)
- 进入rabbitMQ安装目录的sbin目录
- 点击上方的路径框输入cmd,按下回车键
- 输入命令点击回车
rabbitmq-plugins enable rabbitmq_management
1.2.3 重启服务
1.双击rabbitmq-server.bat(双击后可能需要等待一会)
2.打开浏览器,地址栏输入http://127.0.0.1:15672 ,即可看到管理界面的登陆页
账号密码都为guest,登录进入主页面最上侧的导航依次是:概览、连接、信道、交换器、队列、用户管理
二、RabbitMQ简介
2.1 引言
以熟悉的电商场景为例,如果商品服务和订单服务是两个不同的微服务,在下单的过程中订单服务需要调用商品服务进行扣库存操作。按照传统的方式,下单过程要等到调用完毕之后才能返回下单成功,如果网络产生波动等原因使得商品服务扣库存延迟或者失败,会带来较差的用户体验,如果在高并发的场景下,这样的处理显然是不合适的,那怎么进行优化呢?这就需要消息队列登场了。
消息队列提供一个异步通信机制,消息的发送者不必一直等待到消息被成功处理才返回,而是立即返回。消息中间件负责处理网络通信,如果网络连接不可用,消息被暂存于队列当中,当网络畅通的时候在将消息转发给相应的应用程序或者服务,当然前提是这些服务订阅了该队列。如果在商品服务和订单服务之间使用消息中间件,既可以提高并发量,又降低服务之间的耦合度。
RabbitMQ就是这样一款我们苦苦追寻的消息队列。RabbitMQ是一个开源的消息代理的队列服务器,用来通过普通协议在完全不同的应用之间共享数据。
RabbitMQ是使用Erlang语言来编写的,并且RabbitMQ是基于AMQP协议的。Erlang语言在数据交互方面性能优秀,有着和原生Socket一样的延迟,这也是RabbitMQ高性能的原因所在。可谓“人如其名”,RabbitMQ像兔子一样迅速。
2.2 特点
RabbitMQ除了像兔子一样跑的很快以外,还有这些特点:
- 开源、性能优秀,稳定性保障
- 提供可靠性消息投递模式、返回模式
- 与Spring,AMQP完美整合,API丰富
- 集群模式丰富,表达式配置,HA模式,镜像队列模型
- 保证数据不丢失的前提做到高可靠性、可用性
2.3 典型应用场景
- 异步处理:把消息放入消息中间件中,等到需要的时候再去处理。
- 流量削峰:例如秒杀活动,在短时间内访问量急剧增加,使用消息队列,当消息队列满了就拒绝响应,跳转到错误页面,这样就可以使得系统不会因为超负载而崩溃。
- 日志处理
- 应用解耦:假设某个服务A需要给许多个服务(B、C、D)发送消息,当某个服务(例如B)不需要发送消息了,服务A需要改代码再次部署;当新加入一个服务(服务E)需要服务A的消息的时候,也需要改代码重新部署;另外服务A也要考虑其他服务挂掉,没有收到消息怎么办?要不要重新发送呢?是不是很麻烦,使用MQ发布订阅模式,服务A只生产消息发送到MQ,B、C、D从MQ中读取消息,需要A的消息就订阅,不需要了就取消订阅,服务A不再操心其他的事情,使用这种方式可以降低服务或者系统之间的耦合。
2.4 AMQP协议与RabbitMQ
提到RabbitMQ,就不得不提AMQP协议。AMQP协议是具有现代特征的二进制协议。是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。
先了解一下AMQP协议中间的几个重要概念:
- Server:接收客户端的连接,实现AMQP实体服务。
- Connection:连接,应用程序与Server的网络连接,TCP连接。
- Channel:信道,消息读写等操作在信道中进行。客户端可以建立多个信道,每个信道代表一个会话任务。
- Message:消息,应用程序和服务器之间传送的数据,消息可以非常简单,也可以很复杂。有Properties和Body组成。Properties为外包装,可以对消息进行修饰,比如消息的优先级、延迟等高级特性;Body就是消息体内容。
- Virtual Host:虚拟主机,用于逻辑隔离。一个虚拟主机里面可以有若干个Exchange和Queue,同一个虚拟主机里面不能有相同名称的Exchange或Queue。
- Exchange:交换器,接收消息,按照路由规则将消息路由到一个或者多个队列。如果路由不到,或者返回给生产者,或者直接丢弃。RabbitMQ常用的交换器常用类型有direct、topic、fanout、headers四种,后面详细介绍。
- Binding:绑定,交换器和消息队列之间的虚拟连接,绑定中可以包含一个或者多个RoutingKey。
- RoutingKey:路由键,生产者将消息发送给交换器的时候,会发送一个RoutingKey,用来指定路由规则,这样交换器就知道把消息发送到哪个队列。路由键通常为一个“.”分割的字符串,例如“com.rabbitmq”。
- Queue:消息队列,用来保存消息,供消费者消费。
我们完全可以直接使用 Connection 就能完成信道的工作,为什么还要引入信道呢? 试想这样一个场景, 一个应用程序中有很多个线程需要从 RabbitMQ 中消费消息,或者生产消息,那么必然需要建立很多个 Connection,也就是许多个 TCP 连接。然而对于操作系统而言,建立和销毁 TCP 连接是非常昂贵的开销,如果遇到使用高峰,性能瓶颈也随之显现。 RabbitMQ 采用 TCP 连接复用的方式,不仅可以减少性能开销,同时也便于管理 。
下图是AMQP的协议模型:
正如图中所看到的,AMQP协议模型有三部分组成:生产者、消费者和服务端。
生产者是投递消息的一方,首先连接到Server,建立一个连接,开启一个信道;然后生产者声明交换器和队列,设置相关属性,并通过路由键将交换器和队列进行绑定。同理,消费者也需要进行建立连接,开启信道等操作,便于接收消息。
接着生产者就可以发送消息,发送到服务端中的虚拟主机,虚拟主机中的交换器根据路由键选择路由规则,然后发送到不同的消息队列中,这样订阅了消息队列的消费者就可以获取到消息,进行消费。
最后还要关闭信道和连接。
RabbitMQ是基于AMQP协议实现的,其结构如下图所示,和AMQP协议一模一样。
三、RabbitMQ交换机分类
RabbitMQ的Exchange(交换器)分为四类:
- direct(默认)
- fanout
- topic
- headers(不常用)
使用RabbitMQ前先启动成1.2.3中最后的画面并引入依赖**
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.3</version>
</dependency>
3.1 direct交换器
3.1.1 概念
direct为默认的交换器类型,如果路由键匹配的话,消息就投递到相应的队列,如图:
使用代码:channel.basicPublish(“”, QueueName, null, message)推送direct交换器消息到对于的队列,空字符为默认的direct交换器,用队列名称当做路由键。
3.1.2 代码示例
发送端:
/**
* 路由
*/
public class MQProvider_Publish {
//队列名称
private final static String QUEUE_NAME = "hello_route";
private final static String QUEUE_NAME_2 = "hello_route_2";
private final static String QUEUE_NAME_3 = "hello_route_3";
//交换机名称
private final static String EXCHANGE_NAME = "ex2";
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
try {
//新建连接与信道
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//声明队列【参数一:队列名称,参数二:是否持久化;参数三:是否独占模式;参数四:消费者断开连接时是否删除队列;参数五:消息其他参数】
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//将交换机与队列绑定,指定路由
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"one");
channel.queueDeclare(QUEUE_NAME_2,false,false,false,null);
channel.queueBind(QUEUE_NAME_2,EXCHANGE_NAME,"one");
channel.queueDeclare(QUEUE_NAME_3,false,false,false,null);
channel.queueBind(QUEUE_NAME_3,EXCHANGE_NAME,"two");
//发送消息
String message = "Hello Publish!";
// 推送内容【参数一:交换机名称;参数二:路由键,参数三:消息的其他属性-路由的headers信息;参数四:消息主体】
channel.basicPublish(EXCHANGE_NAME,"one",null,message.getBytes());
System.out.println("send message:" + message);
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
接收端:
public class MQConsumer_Subscribe {
private final static String QUEUE_NAME = "hello_route";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("get message:" + message);
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}
3.1.3 测试
启动发送端发送一次数据
可见,在发送端代码中路由绑定的队列中各有一条消息
启动接收端接收数据
可见,当接收端队列名为hello_route时,能够接收到消息(hello_route_2同理),而队列名为hello_route_3时无法接收消息
3.2 fanout交换器——发布/订阅模式
fanout有别于direct交换器,fanout是一种发布/订阅模式的交换器,当你发送一条消息的时候,交换器会把消息广播到所有附加到这个交换器的队列上。
和direct交换器不同,我们在发送消息的时候新增channel.exchangeDeclare(ExchangeName, “fanout”),这行代码声明fanout交换器。
注意:对于fanout交换器来说routingKey(路由键)是无效的,这个参数是被忽略的。
代码部分省略(与direct类似)
**
3.3 topic交换器——匹配订阅模式
topic交换器运行和fanout类似,但是可以更灵活的匹配自己想要订阅的信息,这个时候routingKey路由键就派上用场了,使用路由键进行消息(规则)匹配。
假设我们现在有一个日志系统,会把所有日志级别的日志发送到交换器,warning、log、error、fatal,但我们只想处理error以上的日志,要怎么处理?这就需要使用topic路由器了。
topic路由器的关键在于定义路由键,定义routingKey名称不能超过255字节,使用“.”作为分隔符,例如:com.mq.rabbit.error。
消费消息的时候routingKey可以使用下面字符匹配消息:
- “*”匹配一个分段(用“.”分割)的内容;
- “#”匹配0和多个字符;
例如发布了一个“com.mq.rabbit.error”的消息:
能匹配上的路由键:
不能匹配上的路由键:
- cn.mq.*
- *.error
- *
所以如果想要订阅所有消息,可以使用“#”匹配。
注意:fanout、topic交换器是没有历史数据的,也就是说对于中途创建的队列,获取不到之前的消息。
代码部分省略(与direct类似)
**
四、SpringBoot集成RabbitMQ
4.1 生产者微服务producer
4.1.1 引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
4.1.2 yml配置文件
server:
port: 8021
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
4.1.3 RabbitMQConfig
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
//队列
@Bean
public Queue queue1() {
return new Queue("queue1");
}
//Direct交换机,可指定其他类型
@Bean
DirectExchange directExchange() {
return new DirectExchange("directExchange",true,false);
}
//绑定 将队列和交换机绑定, 并设置路由键:one
@Bean
Binding bindingDirect() {
return BindingBuilder.bind(queue1()).to(directExchange()).with("one");
}
}
4.1.4 测试类
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import javax.annotation.Resource;
@SpringBootTest
@RunWith(SpringRunner.class)
class ProducerApplicationTests {
//使用RabbitTemplate,这提供了接收/发送等等方法
@Resource
RabbitTemplate rabbitTemplate;
@Test
void test() {
//参数1:交换机 参数2:路由键 参数3:需要发送的消息
rabbitTemplate.convertAndSend("directExchange","one","Hello");
}
}
注:如果发送对象,需要该对象的所有信息完全相同(包括全包名),并且实现序列化且id相同
4.1.5 查看结果
访问http://127.0.0.1:15672/并登录,账号密码都为guest
4.2 消费者微服务Consumer
4.2.1 引入依赖
4.2.2 yml配置文件
4.2.3 RabbitMQConfig
4.2.4 Reciver接收类
注解中queues为队列名,可为多个
@Component
@RabbitListener(queues = "queue1")
public class Receiver {
@RabbitHandler
public void process(String msg){
System.out.println(msg);
}
}