一、概述

MQ全称为Message Queue,消息队列是应用程序和应用程序之间的通信方法,是在消息的传输过程中保存消息的容器,多用于分布式系统之间进行通信。
分布式系统有两种通信方式,直接远程调用和借助第三方完成间接通信
发生方称为生产者,接收方称为消费者

1、为什么使用MQ

在项目中,可将一些无需即时返回且耗时的操作提取出来,进行异步处理,而这种一处处理的方式大大的节省了服务起的请求响应时间,从而提高系统吞吐量

2、MQ的优势和劣势

优势:

  • 应用解耦:提升了系统容错性和可维护性
  • 异步提速:提升用户体验和系统吞吐量
  • 削峰填谷:提高系统稳定性

劣势:

  • 系统可用性降低
  • 系统复杂度提高
  • 一致性问题

总结:

既然mq有优势和劣势,那使用mq需要满足什么条件呢?

生产者不需要从消费者出获得反馈,引入消息队列之前的直接调用,其接口的返回值应该为空,这才让明明下层的动作还没做,上层却当成动作完成了继续往后走,即所谓异成为了可能。

容许短暂的不一致性

确实用了有效果,即解耦,提速,削峰这些方面的收益,超过加入MQ,管理MQ的这些成本

3、常见的MQ产品

RabbitMQ ActiveMQ RocketMQ Kafka
公司/社区 Rabbit Apache 阿里 Apache
开发语言 Erlang Java Java Scala&Java
协议支持 AMQP,XMPP,SMTP,STOMP OpenWire,STOMP,REST,XMPP,AMQP 自定义 自定义协议,社区封装了http协议支持
客户端支持语言 官方支持Erlang,Java,Ruby等,社区产出多种API,几乎支持所有语言 Java,C,C++,Python,PHP,Perl,.net等 Java,C++(不成熟) 官方支持Java,社区产出多种API,如PHP,Python等
单机吞吐量 万级(其次) 万级(最差) 十万级(最好) 十万级(次之)
消息延迟 微秒级 毫秒级 毫秒级 毫秒以内
功能特性 并发能力强,性能极其好,延时低,社区活跃,管理界面丰富 老牌产品,成熟度高,文档较多 MQ功能比较完备,扩展性佳 只支持主要的MQ功能,毕竟是为大数据领域准备的。

4、RabbitMQ简介

4.3 AMQP 概念

AMQP,即 Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。2006年,AMQP 规范发布。类比HTTP。

2007年,Rabbit 技术公司基于 AMQP 标准开发的 RabbitMQ 1.0 发布。RabbitMQ 采用 Erlang 语言开发。Erlang 语言由 Ericson 设计,专门为开发高并发和分布式系统的一种语言,在电信领域使用广泛。

4.2 rabbitMQ相关概念

image.png
Broker:接收和分发消息的应用,RabbitMQ Server就是 Message Broker

Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue 等

Connection:publisher/consumer 和 broker 之间的 TCP 连接

Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的 channel 进行通讯,AMQP method 包含了channel id 帮助客户端和message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销

Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)

Queue:消息最终被送到这里等待 consumer 取走

Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据

4.3 RabbitMQ 的六种工作模式

RabbitMQ 提供了 6 种工作模式:简单模式、work queues、Publish/Subscribe 发布与订阅模式、Routing 路由模式、Topics 主题模式、RPC 远程调用模式(远程调用,不太算 MQ;暂不作介绍)。
官网对应模式介绍:https://www.rabbitmq.com/getstarted.html

4.4 JMS

  • JMS 即 Java 消息服务(JavaMessage Service)应用程序接口,是一个 Java 平台中关于面向消息中间件的API
  • JMS 是 JavaEE 规范中的一种,类比JDBC
  • 很多消息中间件都实现了JMS规范,例如:ActiveMQ。RabbitMQ 官方没有提供 JMS 的实现包,但是开源社区有

    4.5 小结

  1. RabbitMQ 是基于 AMQP 协议使用 Erlang 语言开发的一款消息队列产品。
  2. RabbitMQ提供了6种工作模式,我们学习5种。这是今天的重点。
  3. AMQP 是协议,类比HTTP。
  4. JMS 是 API 规范接口,类比 JDBC。

二、RabbitMQ的安装和配置

1、安装/启动

rabbitmq 官网:https://www.rabbitmq.com/
安装博客:https://blog.csdn.net/qq_38667881/article/details/110135368

包含图形化界面 开启图形化界面功能后:可以通过 192.168.163.10:15672 进入图形化界面 图形化界面 账号密码: admin admin123

  1. service rabbitmq-server start #运行
  2. service rabbitmq-server status #查看运行状态
  3. service rabbitmq-server stop #停止

image.png

2、Rabbit 默认端口号

4369 (epmd), 25672 (Erlang distribution)
Epmd 是 Erlang Port Mapper Daemon 的缩写,在 Erlang 集群中相当于 dns 的作用,绑定在4369端口上。

5672, 5671 (AMQP 0-9-1 without and with TLS)
AMQP 是 Advanced Message Queuing Protocol 的缩写,一个提供统一消息服务的应用层标准高级消息队列
协议,是应用层协议的一个开放标准,专为面向消息的中间件设计。基于此协议的客户端与消息中间件之间可以
传递消息,并不受客户端/中间件不同产品、不同的开发语言等条件的限制。Erlang 中的实现有 RabbitMQ 等。

15672 (if management plugin is enabled)
通过 http://serverip:15672 访问 RabbitMQ 的 Web 管理界面,默认用户名密码都是 guest。
(注意:RabbitMQ 3.0之前的版本默认端口是55672,下同)

61613, 61614 (if STOMP is enabled)
Stomp 是一个简单的消息文本协议,它的设计核心理念就是简单与可用性,官方文档,实践一下 Stomp 协议需要:

一个支持 stomp 消息协议的 messaging server (譬如activemq,rabbitmq);
一个终端(譬如linux shell);
一些基本命令与操作(譬如nc,telnet)

1883, 8883 (if MQTT is enabled)
MQTT 只是 IBM 推出的一个消息协议,基于 TCP/IP 的。两个 App 端发送和接收消息需要中间人,
这个中间人就是消息服务器(比如ActiveMQ/RabbitMQ),三者通信协议就是 MQTT

三、RabbitMQ 快速入门

1、使用简单的模式完成消息传递

步骤:

  1. 创建连接工厂
  2. 设置参数
  3. 创建连接Connection
  4. 创建Channel
  5. 创建队列Queue
  6. 发送消息

    1.1 创建provider工程和consumer工程

    1.2 添加依赖

    1. <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
    2. <dependency>
    3. <groupId>com.rabbitmq</groupId>
    4. <artifactId>amqp-client</artifactId>
    5. <version>5.6.0</version>
    6. </dependency>

    1.3 生产者代码

    1. public class ProviderHelloWorld {
    2. public static void main(String[] args) throws IOException, TimeoutException {
    3. // 1.获取连接工厂
    4. ConnectionFactory connectionFactory = new ConnectionFactory();
    5. // 2.设置参数
    6. // 设置地址,默认localhost
    7. connectionFactory.setHost("192.168.163.10");
    8. // 设置端口 默认5672
    9. connectionFactory.setPort(5672);
    10. // 设置用户名 默认guest
    11. connectionFactory.setUsername("admin");
    12. // 设置密码 默认 guest
    13. connectionFactory.setPassword("admin123");
    14. // 3.创建连接
    15. Connection connection = connectionFactory.newConnection();
    16. // 4.创建channel
    17. Channel channel = connection.createChannel();
    18. // 5.创建队列 Queue
    19. /*
    20. queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
    21. 参数说明:
    22. 1. queue: 队列名称
    23. 2. durable:是否持久化,当mq重启之后,他还在
    24. 3. exclusive: 通常设置为false
    25. - 是否独占,只能有一个消费者来监听队列
    26. - 当connection关闭时 是否删除队列
    27. 4. autoDelete:是否自动删除,当没有consumer时,自动删除掉
    28. 5. arguments:参数信息
    29. */
    30. // 如果没有一个名字叫 hello_world的队列,则会创建,如果有则不会创建
    31. channel.queueDeclare("hello_world", false, false, false, null);
    32. // 6.发送消息
    33. /*
    34. basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
    35. 基础出版() 参数说明:
    36. exchange:交换机的名称,简单模式下交换机会使用默认的,使用""设置为默认
    37. routingKey:路由名称
    38. props:参数信息
    39. body:发送的消息信息
    40. */
    41. String body = "Hello World ...";
    42. channel.basicPublish("","hello_world",null,body.getBytes());
    43. // 7.释放资源
    44. channel.close();
    45. connection.close();
    46. }
    47. }

    1.4 消费者代码

    1. public class ConsumerHelloWorld {
    2. public static void main(String[] args) throws IOException, TimeoutException {
    3. // 1.创建连接工厂
    4. ConnectionFactory connectionFactory = new ConnectionFactory();
    5. // 2.设置参数
    6. connectionFactory.setPassword("admin123");
    7. connectionFactory.setUsername("admin");
    8. connectionFactory.setHost("192.168.163.10");
    9. connectionFactory.setPort(5672);
    10. // 3.获取连接
    11. Connection connection = connectionFactory.newConnection();
    12. // 4.创建channel
    13. Channel channel = connection.createChannel();
    14. // 5.创建队列 Queue
    15. /*
    16. queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
    17. 参数说明:
    18. 1. queue: 队列名称
    19. 2. durable:是否持久化,当mq重启之后,他还在
    20. 3. exclusive: 通常设置为false
    21. - 是否独占,只能有一个消费者来监听队列
    22. - 当connection关闭时 是否删除队列
    23. 4. autoDelete:是否自动删除,当没有consumer时,自动删除掉
    24. 5. arguments:参数信息
    25. */
    26. // 如果没有一个名字叫 hello_world的队列,则会创建,如果有则不会创建
    27. channel.queueDeclare("hello_world", false, false, false, null);
    28. // 6.接收消息
    29. /*
    30. basicConsume(String queue, boolean autoAck, Consumer callback)
    31. 方法参数说明:
    32. 1. queue:队列名称
    33. 2. autoAck: 是否自动确认
    34. 3. callback: 回调对象
    35. */
    36. // 创建回调对象,参数是 channel
    37. DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
    38. /**
    39. * 回调方法 当收到消息后 会自动执行该方法
    40. *
    41. * @param consumerTag 消息标识 标签
    42. * @param envelope 获取一些信息,交换机 路由key ...
    43. * @param properties 配置属性
    44. * @param body 真实数据
    45. * @throws IOException ioexception
    46. */
    47. @Override
    48. public void handleDelivery(String consumerTag, Envelope envelope,
    49. AMQP.BasicProperties properties, byte[] body)
    50. throws IOException {
    51. super.handleDelivery(consumerTag, envelope, properties, body);
    52. System.out.println("consumerTag: " +consumerTag);
    53. System.out.println("Exchange: " + envelope.getExchange());
    54. System.out.println("RoutingKey: " + envelope.getRoutingKey());
    55. System.out.println("properties: " + properties);
    56. System.out.println("body: " + new String(body));
    57. }
    58. };
    59. channel.basicConsume("hello_world",true,defaultConsumer);
    60. //消费者不要关闭资源,要保持一直监听
    61. }
    62. }

    image.png

    1.5 总结

    image.png
    在上图的模型中,有以下概念:

  • P:生产者,也就是要发送消息的程序
  • C:消费者:消息的接收者,会一直等待消息到来