一、概述

MQ全称为Message Queue,消息队列是应用程序和应用程序之间的通信方法,是在消息的传输过程中保存消息的容器,多用于分布式系统之间进行通信。
分布式系统有两种通信方式,直接远程调用和借助第三方完成间接通信
发生方称为生产者,接收方称为消费者

1、为什么使用MQ

在项目中,可将一些无需即时返回且耗时的操作提取出来,进行异步处理,而这种一处处理的方式大大的节省了服务起的请求响应时间,从而提高系统吞吐量

2、MQ的优势和劣势

优势:

  • 应用解耦:提升了系统容错性和可维护性
  • 异步提速:提升用户体验和系统吞吐量
  • 削峰填谷:提高系统稳定性

劣势:

  • 系统可用性降低
  • 系统复杂度提高
  • 一致性问题

总结:

既然mq有优势和劣势,那使用mq需要满足什么条件呢?**

生产者不需要从消费者出获得反馈,引入消息队列之前的直接调用,其接口的返回值应该为空,这才让明明下层的动作还没做,上层却当成动作完成了继续往后走,即所谓异成为了可能。

容许短暂的不一致性

确实用了有效果,即解耦,提速,削峰这些方面的收益,超过加入MQ,管理MQ的这些成本

3、常见的MQ产品

RabbitMQ ActiveMQ RocketMQ Kafka
公司/社区 Rabbit Apache 阿里 Apache
开发语言 Erlang Java Java Scala&Java
协议支持 AMQP,XMPP,SMTP,STOMP OpenWire,STOMP,REST,XMPP,AMQP 自定义 自定义协议,社区封装了http协议支持
客户端支持语言 官方支持Erlang,Java,Ruby等,社区产出多种API,几乎支持所有语言 Java,C,C++,Python,PHP,Perl,.net等 Java,C++(不成熟) 官方支持Java,社区产出多种API,如PHP,Python等
单机吞吐量 万级(其次) 万级(最差) 十万级(最好) 十万级(次之)
消息延迟 微秒级 毫秒级 毫秒级 毫秒以内
功能特性 并发能力强,性能极其好,延时低,社区活跃,管理界面丰富 老牌产品,成熟度高,文档较多 MQ功能比较完备,扩展性佳 只支持主要的MQ功能,毕竟是为大数据领域准备的。

4、RabbitMQ简介

4.3 AMQP 概念

AMQP,即 Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。2006年,AMQP 规范发布。类比HTTP。

2007年,Rabbit 技术公司基于 AMQP 标准开发的 RabbitMQ 1.0 发布。RabbitMQ 采用 Erlang 语言开发。Erlang 语言由 Ericson 设计,专门为开发高并发和分布式系统的一种语言,在电信领域使用广泛。

4.2 rabbitMQ相关概念

image.png
Broker:接收和分发消息的应用,RabbitMQ Server就是 Message Broker

Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue 等

Connection:publisher/consumer 和 broker 之间的 TCP 连接

Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的 channel 进行通讯,AMQP method 包含了channel id 帮助客户端和message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销

Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)

Queue:消息最终被送到这里等待 consumer 取走

Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据

4.3 RabbitMQ 的六种工作模式

RabbitMQ 提供了 6 种工作模式:简单模式、work queues、Publish/Subscribe 发布与订阅模式、Routing 路由模式、Topics 主题模式、RPC 远程调用模式(远程调用,不太算 MQ;暂不作介绍)。
官网对应模式介绍:https://www.rabbitmq.com/getstarted.html

4.4 JMS

  • JMS 即 Java 消息服务(JavaMessage Service)应用程序接口,是一个 Java 平台中关于面向消息中间件的API
  • JMS 是 JavaEE 规范中的一种,类比JDBC
  • 很多消息中间件都实现了JMS规范,例如:ActiveMQ。RabbitMQ 官方没有提供 JMS 的实现包,但是开源社区有

    4.5 小结

  1. RabbitMQ 是基于 AMQP 协议使用 Erlang 语言开发的一款消息队列产品。
  2. RabbitMQ提供了6种工作模式,我们学习5种。这是今天的重点。
  3. AMQP 是协议,类比HTTP。
  4. JMS 是 API 规范接口,类比 JDBC。

二、RabbitMQ的安装和配置

1、安装/启动

rabbitmq 官网:https://www.rabbitmq.com/
安装博客:https://blog.csdn.net/qq_38667881/article/details/110135368

包含图形化界面 开启图形化界面功能后:可以通过 192.168.163.10:15672 进入图形化界面 图形化界面 账号密码: admin admin123

  1. service rabbitmq-server start #运行
  2. service rabbitmq-server status #查看运行状态
  3. service rabbitmq-server stop #停止

image.png

2、Rabbit 默认端口号

4369 (epmd), 25672 (Erlang distribution)
Epmd 是 Erlang Port Mapper Daemon 的缩写,在 Erlang 集群中相当于 dns 的作用,绑定在4369端口上。

5672, 5671 (AMQP 0-9-1 without and with TLS)
AMQP 是 Advanced Message Queuing Protocol 的缩写,一个提供统一消息服务的应用层标准高级消息队列
协议,是应用层协议的一个开放标准,专为面向消息的中间件设计。基于此协议的客户端与消息中间件之间可以
传递消息,并不受客户端/中间件不同产品、不同的开发语言等条件的限制。Erlang 中的实现有 RabbitMQ 等。

15672 (if management plugin is enabled)
通过 http://serverip:15672 访问 RabbitMQ 的 Web 管理界面,默认用户名密码都是 guest。
(注意:RabbitMQ 3.0之前的版本默认端口是55672,下同)

61613, 61614 (if STOMP is enabled)
Stomp 是一个简单的消息文本协议,它的设计核心理念就是简单与可用性,官方文档,实践一下 Stomp 协议需要:

一个支持 stomp 消息协议的 messaging server (譬如activemq,rabbitmq);
一个终端(譬如linux shell);
一些基本命令与操作(譬如nc,telnet)

1883, 8883 (if MQTT is enabled)
MQTT 只是 IBM 推出的一个消息协议,基于 TCP/IP 的。两个 App 端发送和接收消息需要中间人,
这个中间人就是消息服务器(比如ActiveMQ/RabbitMQ),三者通信协议就是 MQTT

三、RabbitMQ 快速入门

1、使用简单的模式完成消息传递

步骤:

  1. 创建连接工厂
  2. 设置参数
  3. 创建连接Connection
  4. 创建Channel
  5. 创建队列Queue
  6. 发送消息

    1.1 创建provider工程和consumer工程

    1.2 添加依赖

    <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
    <dependency>
     <groupId>com.rabbitmq</groupId>
     <artifactId>amqp-client</artifactId>
     <version>5.6.0</version>
    </dependency>
    

    1.3 生产者代码

    public class ProviderHelloWorld {
     public static void main(String[] args) throws IOException, TimeoutException {
         // 1.获取连接工厂
         ConnectionFactory connectionFactory = new ConnectionFactory();
    
         // 2.设置参数
         // 设置地址,默认localhost
         connectionFactory.setHost("192.168.163.10");
         // 设置端口 默认5672
         connectionFactory.setPort(5672);
    
         // 设置用户名  默认guest
         connectionFactory.setUsername("admin");
         // 设置密码 默认 guest
         connectionFactory.setPassword("admin123");
    
         // 3.创建连接
         Connection connection = connectionFactory.newConnection();
    
         // 4.创建channel
         Channel channel = connection.createChannel();
    
         // 5.创建队列 Queue
         /*
         queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
         参数说明:
             1. queue: 队列名称
             2. durable:是否持久化,当mq重启之后,他还在
             3. exclusive: 通常设置为false
                     - 是否独占,只能有一个消费者来监听队列
                     - 当connection关闭时 是否删除队列
             4. autoDelete:是否自动删除,当没有consumer时,自动删除掉
             5. arguments:参数信息
          */
         // 如果没有一个名字叫 hello_world的队列,则会创建,如果有则不会创建
         channel.queueDeclare("hello_world", false, false, false, null);
    
         // 6.发送消息
         /*
         basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
         基础出版() 参数说明:
             exchange:交换机的名称,简单模式下交换机会使用默认的,使用""设置为默认
             routingKey:路由名称
             props:参数信息
             body:发送的消息信息
          */
         String body = "Hello World ...";
         channel.basicPublish("","hello_world",null,body.getBytes());
    
         // 7.释放资源
         channel.close();
         connection.close();
     }
    }
    

    1.4 消费者代码

    public class ConsumerHelloWorld {
     public static void main(String[] args) throws IOException, TimeoutException {
         // 1.创建连接工厂
         ConnectionFactory connectionFactory = new ConnectionFactory();
         // 2.设置参数
         connectionFactory.setPassword("admin123");
         connectionFactory.setUsername("admin");
         connectionFactory.setHost("192.168.163.10");
         connectionFactory.setPort(5672);
    
         // 3.获取连接
         Connection connection = connectionFactory.newConnection();
    
         // 4.创建channel
         Channel channel = connection.createChannel();
    
         // 5.创建队列 Queue
         /*
         queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
         参数说明:
             1. queue: 队列名称
             2. durable:是否持久化,当mq重启之后,他还在
             3. exclusive: 通常设置为false
                     - 是否独占,只能有一个消费者来监听队列
                     - 当connection关闭时 是否删除队列
             4. autoDelete:是否自动删除,当没有consumer时,自动删除掉
             5. arguments:参数信息
          */
         // 如果没有一个名字叫 hello_world的队列,则会创建,如果有则不会创建
         channel.queueDeclare("hello_world", false, false, false, null);
    
         // 6.接收消息
         /*
         basicConsume(String queue, boolean autoAck, Consumer callback)
         方法参数说明:
             1. queue:队列名称
             2. autoAck: 是否自动确认
             3. callback: 回调对象
          */
         // 创建回调对象,参数是 channel
         DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
    
             /**
              *  回调方法 当收到消息后 会自动执行该方法
              *
              * @param consumerTag 消息标识 标签
              * @param envelope    获取一些信息,交换机 路由key ...
              * @param properties  配置属性
              * @param body        真实数据
              * @throws IOException ioexception
              */
             @Override
             public void handleDelivery(String consumerTag, Envelope envelope,
                                        AMQP.BasicProperties properties, byte[] body)
                     throws IOException {
    
                 super.handleDelivery(consumerTag, envelope, properties, body);
                 System.out.println("consumerTag: " +consumerTag);
                 System.out.println("Exchange: " + envelope.getExchange());
                 System.out.println("RoutingKey: " + envelope.getRoutingKey());
                 System.out.println("properties: " + properties);
                 System.out.println("body: " + new String(body));
             }
         };
    
         channel.basicConsume("hello_world",true,defaultConsumer);
         //消费者不要关闭资源,要保持一直监听
     }
    }
    

    image.png

    1.5 总结

    image.png
    在上图的模型中,有以下概念:

  • P:生产者,也就是要发送消息的程序
  • C:消费者:消息的接收者,会一直等待消息到来
  • queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息