一、概述
MQ全称为Message Queue,消息队列是应用程序和应用程序之间的通信方法,是在消息的传输过程中保存消息的容器,多用于分布式系统之间进行通信。
分布式系统有两种通信方式,直接远程调用和借助第三方完成间接通信
发生方称为生产者,接收方称为消费者
1、为什么使用MQ
在项目中,可将一些无需即时返回且耗时的操作提取出来,进行异步处理,而这种一处处理的方式大大的节省了服务起的请求响应时间,从而提高了系统的吞吐量
2、MQ的优势和劣势
优势:
- 应用解耦:提升了系统容错性和可维护性
- 异步提速:提升用户体验和系统吞吐量
- 削峰填谷:提高系统稳定性
劣势:
- 系统可用性降低
- 系统复杂度提高
- 一致性问题
总结:
既然mq有优势和劣势,那使用mq需要满足什么条件呢?
生产者不需要从消费者出获得反馈,引入消息队列之前的直接调用,其接口的返回值应该为空,这才让明明下层的动作还没做,上层却当成动作完成了继续往后走,即所谓异成为了可能。
容许短暂的不一致性
确实用了有效果,即解耦,提速,削峰这些方面的收益,超过加入MQ,管理MQ的这些成本
3、常见的MQ产品
RabbitMQ | ActiveMQ | RocketMQ | Kafka | |
---|---|---|---|---|
公司/社区 | Rabbit | Apache | 阿里 | Apache |
开发语言 | Erlang | Java | Java | Scala&Java |
协议支持 | AMQP,XMPP,SMTP,STOMP | OpenWire,STOMP,REST,XMPP,AMQP | 自定义 | 自定义协议,社区封装了http协议支持 |
客户端支持语言 | 官方支持Erlang,Java,Ruby等,社区产出多种API,几乎支持所有语言 | Java,C,C++,Python,PHP,Perl,.net等 | Java,C++(不成熟) | 官方支持Java,社区产出多种API,如PHP,Python等 |
单机吞吐量 | 万级(其次) | 万级(最差) | 十万级(最好) | 十万级(次之) |
消息延迟 | 微秒级 | 毫秒级 | 毫秒级 | 毫秒以内 |
功能特性 | 并发能力强,性能极其好,延时低,社区活跃,管理界面丰富 | 老牌产品,成熟度高,文档较多 | MQ功能比较完备,扩展性佳 | 只支持主要的MQ功能,毕竟是为大数据领域准备的。 |
4、RabbitMQ简介
4.3 AMQP 概念
AMQP,即 Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。2006年,AMQP 规范发布。类比HTTP。
2007年,Rabbit 技术公司基于 AMQP 标准开发的 RabbitMQ 1.0 发布。RabbitMQ 采用 Erlang 语言开发。Erlang 语言由 Ericson 设计,专门为开发高并发和分布式系统的一种语言,在电信领域使用广泛。
4.2 rabbitMQ相关概念
Broker:接收和分发消息的应用,RabbitMQ Server就是 Message Broker
Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue 等
Connection:publisher/consumer 和 broker 之间的 TCP 连接
Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的 channel 进行通讯,AMQP method 包含了channel id 帮助客户端和message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销
Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
Queue:消息最终被送到这里等待 consumer 取走
Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据
4.3 RabbitMQ 的六种工作模式
RabbitMQ 提供了 6 种工作模式:简单模式、work queues、Publish/Subscribe 发布与订阅模式、Routing 路由模式、Topics 主题模式、RPC 远程调用模式(远程调用,不太算 MQ;暂不作介绍)。
官网对应模式介绍:https://www.rabbitmq.com/getstarted.html
4.4 JMS
- JMS 即 Java 消息服务(JavaMessage Service)应用程序接口,是一个 Java 平台中关于面向消息中间件的API
- JMS 是 JavaEE 规范中的一种,类比JDBC
- 很多消息中间件都实现了JMS规范,例如:ActiveMQ。RabbitMQ 官方没有提供 JMS 的实现包,但是开源社区有
4.5 小结
- RabbitMQ 是基于 AMQP 协议使用 Erlang 语言开发的一款消息队列产品。
- RabbitMQ提供了6种工作模式,我们学习5种。这是今天的重点。
- AMQP 是协议,类比HTTP。
- JMS 是 API 规范接口,类比 JDBC。
二、RabbitMQ的安装和配置
1、安装/启动
rabbitmq 官网:https://www.rabbitmq.com/
安装博客:https://blog.csdn.net/qq_38667881/article/details/110135368
包含图形化界面 开启图形化界面功能后:可以通过 192.168.163.10:15672 进入图形化界面 图形化界面 账号密码: admin admin123
service rabbitmq-server start #运行
service rabbitmq-server status #查看运行状态
service rabbitmq-server stop #停止
2、Rabbit 默认端口号
4369 (epmd), 25672 (Erlang distribution)
Epmd 是 Erlang Port Mapper Daemon 的缩写,在 Erlang 集群中相当于 dns 的作用,绑定在4369端口上。
5672, 5671 (AMQP 0-9-1 without and with TLS)
AMQP 是 Advanced Message Queuing Protocol 的缩写,一个提供统一消息服务的应用层标准高级消息队列
协议,是应用层协议的一个开放标准,专为面向消息的中间件设计。基于此协议的客户端与消息中间件之间可以
传递消息,并不受客户端/中间件不同产品、不同的开发语言等条件的限制。Erlang 中的实现有 RabbitMQ 等。
15672 (if management plugin is enabled)
通过 http://serverip:15672 访问 RabbitMQ 的 Web 管理界面,默认用户名密码都是 guest。
(注意:RabbitMQ 3.0之前的版本默认端口是55672,下同)
61613, 61614 (if STOMP is enabled)
Stomp 是一个简单的消息文本协议,它的设计核心理念就是简单与可用性,官方文档,实践一下 Stomp 协议需要:
一个支持 stomp 消息协议的 messaging server (譬如activemq,rabbitmq);
一个终端(譬如linux shell);
一些基本命令与操作(譬如nc,telnet)
1883, 8883 (if MQTT is enabled)
MQTT 只是 IBM 推出的一个消息协议,基于 TCP/IP 的。两个 App 端发送和接收消息需要中间人,
这个中间人就是消息服务器(比如ActiveMQ/RabbitMQ),三者通信协议就是 MQTT
三、RabbitMQ 快速入门
1、使用简单的模式完成消息传递
步骤:
- 创建连接工厂
- 设置参数
- 创建连接Connection
- 创建Channel
- 创建队列Queue
-
1.1 创建provider工程和consumer工程
1.2 添加依赖
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
1.3 生产者代码
public class ProviderHelloWorld {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.获取连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2.设置参数
// 设置地址,默认localhost
connectionFactory.setHost("192.168.163.10");
// 设置端口 默认5672
connectionFactory.setPort(5672);
// 设置用户名 默认guest
connectionFactory.setUsername("admin");
// 设置密码 默认 guest
connectionFactory.setPassword("admin123");
// 3.创建连接
Connection connection = connectionFactory.newConnection();
// 4.创建channel
Channel channel = connection.createChannel();
// 5.创建队列 Queue
/*
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
参数说明:
1. queue: 队列名称
2. durable:是否持久化,当mq重启之后,他还在
3. exclusive: 通常设置为false
- 是否独占,只能有一个消费者来监听队列
- 当connection关闭时 是否删除队列
4. autoDelete:是否自动删除,当没有consumer时,自动删除掉
5. arguments:参数信息
*/
// 如果没有一个名字叫 hello_world的队列,则会创建,如果有则不会创建
channel.queueDeclare("hello_world", false, false, false, null);
// 6.发送消息
/*
basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
基础出版() 参数说明:
exchange:交换机的名称,简单模式下交换机会使用默认的,使用""设置为默认
routingKey:路由名称
props:参数信息
body:发送的消息信息
*/
String body = "Hello World ...";
channel.basicPublish("","hello_world",null,body.getBytes());
// 7.释放资源
channel.close();
connection.close();
}
}
1.4 消费者代码
public class ConsumerHelloWorld {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2.设置参数
connectionFactory.setPassword("admin123");
connectionFactory.setUsername("admin");
connectionFactory.setHost("192.168.163.10");
connectionFactory.setPort(5672);
// 3.获取连接
Connection connection = connectionFactory.newConnection();
// 4.创建channel
Channel channel = connection.createChannel();
// 5.创建队列 Queue
/*
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
参数说明:
1. queue: 队列名称
2. durable:是否持久化,当mq重启之后,他还在
3. exclusive: 通常设置为false
- 是否独占,只能有一个消费者来监听队列
- 当connection关闭时 是否删除队列
4. autoDelete:是否自动删除,当没有consumer时,自动删除掉
5. arguments:参数信息
*/
// 如果没有一个名字叫 hello_world的队列,则会创建,如果有则不会创建
channel.queueDeclare("hello_world", false, false, false, null);
// 6.接收消息
/*
basicConsume(String queue, boolean autoAck, Consumer callback)
方法参数说明:
1. queue:队列名称
2. autoAck: 是否自动确认
3. callback: 回调对象
*/
// 创建回调对象,参数是 channel
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
/**
* 回调方法 当收到消息后 会自动执行该方法
*
* @param consumerTag 消息标识 标签
* @param envelope 获取一些信息,交换机 路由key ...
* @param properties 配置属性
* @param body 真实数据
* @throws IOException ioexception
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body);
System.out.println("consumerTag: " +consumerTag);
System.out.println("Exchange: " + envelope.getExchange());
System.out.println("RoutingKey: " + envelope.getRoutingKey());
System.out.println("properties: " + properties);
System.out.println("body: " + new String(body));
}
};
channel.basicConsume("hello_world",true,defaultConsumer);
//消费者不要关闭资源,要保持一直监听
}
}
1.5 总结
在上图的模型中,有以下概念:
- P:生产者,也就是要发送消息的程序
- C:消费者:消息的接收者,会一直等待消息到来