什么是RabbitMQ

MQ(Message Queue) 消息队列

  • 消息队列中间件,是分布式系统中的重要组件
  • 主要解决:异步处理、应用解耦、流量消峰等问题
  • 从而实现高性能、高可用、可伸缩和最终一致性的架构
  • 使用较多的消息队列产品:RabbitMQ、RocketMQ、Kafka、ActiveMQ

异步处理

例如:用户注册后,需要发送验证邮箱和手机验证码;将注册信息写入数据库,发送验证邮件,发送手机,三个步骤全部全部完成后,返回给客户端。

image.png

应用解耦

场景:订单系统需要通知库存系统。如果库存系统出现异常,则订单调用库存失败,导致下单失败,由于:订单系统和库存系统耦合度太高

image.png

  • 订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户,下单成功;
  • 库存系统:订阅下单的消息,获取下单信息,库存系统根据下单信息,在进行库存操作;
  • 假如:下单的时候,库存系统不能正常运行,也不会影响下单,因为下单后,订单系统写入消息队列就不再关心其他后续操作了,实现了订单系统和库存系统的应用解耦
  • 消息队列是典型的:生产者消费者模型
  • 消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的入侵,这样就实现了生产者和消费者的解耦

流量削峰

场景:秒杀、抢购等业务,针对高并发的场景。
因为流浪过大,暴增会导致应用挂掉,为解决这个问题,在前端加入消息队列
image.png
用户的请求,在服务器接收后,首先写入消息队列,如果超过队列的长度,就抛弃,甩一个秒杀结束的页面。
也就是说秒杀成功的就是进入队列的用户。

背景知识

  • AMQP

(Advanced Message Queuing Protocol)一个提供统一消息服务的应用层标准高级消息队列协议.
协议:数据在传输的过程中必须要遵守的规则
基于此协议的客户端可以与消息中间件传递消息
并不受产品、开发语言等条件的限制

  • JMS

Java Message Server,Java消息服务应用程序接口,一种规范和JDBC担任的角色类似。
是一个Java平台中关于面向消息中间件的API,用于在两个应用程序之间;或分布式系统中发送消息,进行异步通信

  • 二者的区别:

JMS是定义了统一接口,统一消息操作AMQP通过协议统一数据交互格式
JMS必须是Java语言;AMQP知识协议和语言无关

  • Erlang语言

Erlang 是一种通用的面向并发的编程语言,目的是创造一种可以应对大规模并发活动的编程语言和运行环境。专门为通信应用设计的。RabbitMQ就是由Erlang编写的。

为什么选择RabbitMQ

RabbtiMQ:

  • Erlang开发,AMQP的最佳搭档,安装部署简单,上手门槛低。
  • 它是企业级消息队列,经过大量实践考验的高可靠,一些一线大厂也都在使用。
  • 有强大的WEB管理页面
  • 强大的社区支持,为技术进步提供动力
  • 支持消息持久化、支持消息确认机制、灵活的任务分发机制等,支持功能丰富
  • 集群扩展很容易,并且可以通过增加节点实现成倍的性能提升

总结:如果你希望使用一个可靠性高、功能强大、易于管理的消息队列系统那么就选择RabbitMQ,如果你想用一个性能高,但偶尔丢点数据可以使用kafka和zeroMQ,kafka和zeroMQ的性能爆表,绝对可以压RabbitMQ一头!.

RabbitMQ各组件功能

image.png

  • Broker:消息队列服务器实体
  • Virtual Host:虚拟主机
    • 表示一批交换机、消息队列和相关对象,形成的整体
    • 虚拟主机是共享相同的身份认证和加密环境的独立服务器域
    • 每个vhost本质就是一个mini版的RabbitMQ服务器,拥有自己的队列、交换机、绑定和权限机制
    • vhost是AQMP概念的基础,RabbitMQ默认的vhost是/,必须在链接时指定
  • Exchange:交换器(路由)
    • 用来接收生产者发送的消息并将这些消息路由给服务器中的队列
  • Queue:消息队列
    • 用来保存消息直到发送给消费者
    • 它是消息的容器,也是消息的终点
    • 一个消息可以投入一个或多个队列
    • 消息一直在队列里面,等待消费者连接到这个队列将其取走
  • Banding:绑定,用于消息队列和交换机之间的关联
  • Channel:通道(信道)
    • 多路复用连接中的一条独立的双向数据流通道
    • 信道是建立在真实的TCP连接内的虚拟链接
    • AMQP命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,都是通过信道完成的
    • 因为对于操作系统来说,建立和销毁TCP连接都是非常昂贵的开销,所以引入了信道的概念,用来复用TCP连接
  • Connection:网络连接,比如一个TCP连接
  • Publisher:消息的生产者,也是一个向交换机发布消息的客户端应用程序
  • Consumer:消息的消费者,表示一个从消息队列中取得消息的客户端应用程序
  • Message:消息
    • 消息是不具名的,它是由消息头和消息体组成
    • 消息体是不透明的,而消息头则是由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(优先级)、delivery-mode(消息可能需要持久型存储)[消息的路由模式]等

RabbitMQ的安装和使用

要安装RabbitMQ必须要先安装erlang语言环境。要注意匹配的版本:https://www.rabbitmq.com/which-erlang.html 本篇文件使用的是RabbitMQ的3.8.6的版本,需要对应erlang的语言环境匹配

image.png

erlang下载:https://dl.bintray.com/rabbitmq-erlang/rpm/erlang
socat下载:http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm
RabbitMQ下载:https://www.rabbitmq.com/install-rpm.html#downloads

RabbitMQ安装启动

将软件包上传到Linux虚拟机的服务器。

image.png

1. 安装

  1. rpm -ivh erlang-21.3.8.16-1.el7.x86_64.rpm
  2. rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm
  3. rpm -ivh rabbitmq-server-3.8.6-1.el7.noarch.rpm

2. 启动后台管理插件

 rabbitmq-plugins enable rabbitmq_management

image.png

3. 启动RabbitMQ

systemctl start rabbitmq-server.service  # 启动rabbitMQ
systemctl status rabbitmq-server.service # 查看rabbitMQ服务的状态
systemctl restart rabbitmq-server.service # 重启rabbitMQ服务
systemctl stop rabbitmq-server.service # 停止rabbitMQ服务的状态

image.png
查看RabbitMQ的启动端口信息:ps -ef | grep rabbitmq
image.png

4. 访问管理端的地址

http://172.16.150.130:15672/

如果启动正常会显示如下界面:默认账号和密码:guest
image.png
但是输入账号和密码会显示,不允许远程连接,那么就需要添加远程账户
image.png

添加账户:

[root@localhost opt]# rabbitmqctl add_user prim 123456  # 添加账户信息
Adding user "prim" ...
[root@localhost opt]# rabbitmqctl set_user_tags prim administrator  # 设置账户标签为超级管理员
Setting tags for user "prim" to [administrator] ...
[root@localhost opt]# rabbitmqctl set_permissions -p "/" prim ".*" ".*" ".*"  # 设置权限信息
Setting permissions for user "prim" in vhost "/" ...

查询用户列表:

[root@localhost opt]# rabbitmqctl list_users
Listing users ...
user    tags
prim    [administrator]  # 这个就是我们添加的用户
guest   [administrator]

用新创建的账户,进行登录,界面显示如下:
image.png

:::tips 注意:
5672:RabbitMQ的提供给编程语言客户端链接的端口;
15672:RabbitMQ管理界面的端口;
25672:RabbitMQ集群的端口 :::

RabbitMQ 快速入门

  1. 创建虚拟主机

image.png

  1. 创建maven工程,引入依赖

     <dependencies>
         <dependency>
             <groupId>com.rabbitmq</groupId>
             <artifactId>amqp-client</artifactId>
             <version>5.7.3</version>
         </dependency>
         <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-log4j12</artifactId>
             <version>1.7.25</version>
             <scope>compile</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-lang3</artifactId>
             <version>3.9</version>
         </dependency>
     </dependencies>
    
  2. 连接MQ ```java public class ConnectionUtils { public static Connection getConnection() throws Exception {

     //1. 创建连接工厂
     ConnectionFactory factory = new ConnectionFactory();
     //2. 在工厂对象中设置MQ的连接信息 - ip post vhost username password
     factory.setHost("172.16.150.130");
     factory.setPort(5672);
     factory.setVirtualHost("/edu");//在第一步创建的虚拟主机
     factory.setUsername("prim"); //在安装时创建的账号和密码
     factory.setPassword("123456");
     //3. 通过工厂获得与MQ的连接
     Connection connection = factory.newConnection();
     return connection;
    

    }

    public static void main(String[] args) throws Exception {

     Connection connection = getConnection();
     System.out.println("connection:" + connection);
     //关闭链接
     connection.close();
    

    } }

输出如下信息,表明连接成功:<br />![image.png](https://cdn.nlark.com/yuque/0/2021/png/375694/1612247070041-34c5dd19-ede6-40de-9597-08b227b5d729.png#align=left&display=inline&height=90&margin=%5Bobject%20Object%5D&name=image.png&originHeight=180&originWidth=1082&size=21403&status=done&style=none&width=541)
<a name="4Jv5R"></a>
## RabbitMQ模式
RabbitMQ提供了6种消息模型,第6种RPC并不是MQ,只学习前5种<br />[https://www.rabbitmq.com/getstarted.html](https://www.rabbitmq.com/getstarted.html)<br />![image.png](https://cdn.nlark.com/yuque/0/2021/png/375694/1612247670132-ccefb687-24ed-42cb-a495-2f06f257d860.png#align=left&display=inline&height=558&margin=%5Bobject%20Object%5D&name=image.png&originHeight=1116&originWidth=1768&size=201150&status=done&style=none&width=884)<br />![image.png](https://cdn.nlark.com/yuque/0/2021/png/375694/1612247695309-2d77925b-f825-4e2a-ab48-e29543cd11a9.png#align=left&display=inline&height=552&margin=%5Bobject%20Object%5D&name=image.png&originHeight=1104&originWidth=1168&size=139816&status=done&style=none&width=584)<br />5种消息模型,大体分为两类:<br />1和2属于点对点<br />3.4.5属于发布订阅模式(一对多)

- **点对点模式**:P2P 模式包含三个角色:
   - 消息队列queue,发送者sender,接受者receiver
   - 每个消息发送到一个特定的队列中,接收者从中获得消息
   - 队列中保留这些消息,直到他们被消费或超时
   - 特点:
      - 每个消息只有一个消费者,一旦消费,消息就不在队列中了
      - 发送者和接收者之间没有依赖性,发送者发送完成,不管接受者是否运行,都不会影响消息发送到队列中(例如,QQ给你发送消息,不管你看不看手机,反正我发了)
      - 接收者成功接收消息之后需向对象应答成功(确认)
   - 如果希望发送的每个消息都会被成功处理,那需要P2P

这种模式就像,送快递,给你放到了快递柜中,不管你收件人在哪,只需要把快递放到快递柜中,会发短信通知你你的快递在快递柜中了。<br />这种模式的性能不太好,队列会被占用,就像快递柜放不了多少快递。

- **发布订阅模式**:publish/subscribe
   - pub/sub模式包含三个角色:交换机(exchange)、发布者(publisher)、订阅者(subcriber)
   - 多个发布者将消息发送交换机,系统将这些消息传递给多个订阅者
   - 特点:
      - 每个消息可以有多个订阅者
      - 发布者和订阅者之间在时间上有依赖,对于某个交换机的订阅者,必须创建一个订阅后,才能消费发布者的消息
      - 为了消费消息,订阅者必须保持运行状态:类似于,看电视、直播
   - 如果希望发送的消息被多个消费者处理,可采用本模式

这种模式类似,关注了某个主播,当这个主播开播时,它就会通知订阅的所有人来观看直播

<a name="mRkWD"></a>
### 简单模式
> [https://www.rabbitmq.com/tutorials/tutorial-one-java.html](https://www.rabbitmq.com/tutorials/tutorial-one-java.html)
> RabbitMQ是一个消息代理,你可以把它想象成一个邮局,当你把你想要寄的邮件放到一个邮箱里,你可以确定邮递员先生最终会把邮件发送到你的收件人那里,在这个类比中,RabbitMQ是一个邮箱,一个邮局和一个邮递员

RabbitMQ本省只是接收,存储和转发消息,并不会对消息进行处理。处理信件的应该是收件人而不是邮局。<br />![image.png](https://cdn.nlark.com/yuque/0/2021/png/375694/1612249454486-faedf188-1fee-420f-83b5-1c6abb571675.png#align=left&display=inline&height=61&margin=%5Bobject%20Object%5D&name=image.png&originHeight=122&originWidth=858&size=26430&status=done&style=none&width=429)

直接看代码如何实现:
<a name="qmQ2Z"></a>
#### 消息生成者
```java
package simplest;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import utils.ConnectionUtils;

/**
 * @program: rabbitmq_quickstart
 * @Description: 消息生产者
 * @author: sufulu
 * @version: 1.0.0
 * @create: 2021-02-02 15:04
 * @PackageName: simplest
 * @ClassName: Sender.java
 **/
public class Sender {
    public static void main(String[] args) throws Exception {
        String msg = "A:Hello Rabbit MQ";

        //1. 获得连接
        Connection connection = ConnectionUtils.getConnection();
        //2. 在连接中创建通道
        Channel channel = connection.createChannel();
        //3. 创建消息队列
        /**
         * 参数1:队列名称
         * 参数2:队列中的数据是否持久化
         * 参数3:是否排外 是否支持扩展,当前队列只能自己用,不能给别人用
         * 参数4:是否自动删除(当队列的连接数为0时,队列会销毁,不管队列中是否还保存数据)
         * 参数5:队列参数,没有参数传null
         */
        channel.queueDeclare("queue1", false, false, false, null);
        //4. 向指定的队列发送消息
        /**
         * 参数1:交换机名称,当前是简单模式-点对点模式 没有交换机,所以名称为""
         * 参数2:目标队列的名称:queue1
         * 参数3:设置消息的属性,没有属性则为null
         * 参数4:消息内容 直接接收byte[]
         */
        channel.basicPublish("", "queue1", null, msg.getBytes());
        System.out.println("已发送:" + msg);
        //5. 释放资源
        channel.close();
        connection.close();
    }
}

消息接收者

Recer 不会关闭,监听消息发送

package simplest;

import com.rabbitmq.client.*;
import utils.ConnectionUtils;

import java.io.IOException;

/**
 * @program: rabbitmq_quickstart
 * @Description: 消息接受者
 * @author: sufulu
 * @version: 1.0.0
 * @create: 2021-02-02 15:16
 * @PackageName: simplest
 * @ClassName: Recer.java
 **/
public class Recer {
    public static void main(String[] args) throws Exception {
        //1. 获得连接
        Connection connection = ConnectionUtils.getConnection();
        //2. 获得通道
        Channel channel = connection.createChannel();
        //3. 从信道中获得消息
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 交付处理(收件人信息,包裹上的快递标签,协议的配置,消息)
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                java.lang.String s = new java.lang.String(body);
                System.out.println("接收=" + s);
            }
        };
        //4. 监听队列
        //第二个参数:true 表示自动消息确认
        channel.basicConsume("queue1",true, consumer);
    }
}

启动消息生成者:
image.png
此时进入管理端查询消息队列信息
image.png
启动消息接收者,接收消息:可以接受到发送过来的消息
image.png

消息确认机制ACK

在上述的案例中,消息一旦被消费,消息就会立刻从队列中移除。
RabbitMQ如何得到消息被消费者接收?

  • 如果消费者接收消息后,还没执行操作就抛出异常宕机导致消费失败,但是RabbitMQ无从得知,这样消息就丢失了
  • 因此,RabbitMQ有一个ACK机制,当消费者获取消息后,会向RabbitMQ发送回执ACK,告知消息已经被接收
  • ACK:Acknowledge character 即是确认字符,在数据通信中,接收站发给发送站的一种传输类控制字符,表示发来的数据已确认接收无误我们在使用HTTP请求时,HTTP的状态码200就是告诉我们服务器执行成功。
  • 整个过程就像快递员将包裹送到你手里,并且需要你的签字并拍照回执
  • 不过这种回执ACK分为两种情况:
    • 自动ACK : 消息接收后,消费者立刻自动发送ACK(快递放在快递柜)
    • 手动ACK :消息接收后,不会发送ACK,需要手动调用(快递必须本人签收)
  • 这两种ACK的情况,需要根据消息的重要性选择
    • 如果消息不太重要,自动ACK比较方便
    • 如果消息非常重要,最好消费完成手动ACK,如果自动ACK消费后,RabbitMQ就会把消息从队列中删除,如果此时消费者抛出异常宕机,那么消息就永久丢失了

我们将Recer中的代码:将第二个参数改为false,消息需要手动确认

 channel.basicConsume("queue1",false, consumer);

发送消息,然后在启动Recer接收者接收消息:
可以看到消息队列中显示Unacked = 1 表示有一条消息没有确认
image.png
如何确认消息呢?

        //3. 从信道中获得消息
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 交付处理(收件人信息,包裹上的快递标签,协议的配置,消息)
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String s = new String(body);
                System.out.println("接收=" + s);
                //表示:手动确认
                //第二参数:是否同时确认多个消息
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };

我们在重新发送消息,然后启动接收者,查看管理端的变化:可以看到消息队列中的消息都进行消费和ACK确认
image.png

工作队列模式(Works Queue)

在上述的简单模式中,一个消费者来处理消息,如果生产者生产消息过快过多,而消费者的能力有限,就会产生消息在队列中堆积(生活中的滞销)

工作队列模式:消息队列中的任务会被众多消费者共享,但其中某一个消息只会被一个消费者获取

image.png

消息消费者

假设两个消费者,来到了买肉串的地方,然后催促老板赶紧烤肉串(先启动两个消费者)

/**
 * @program: rabbitmq_quickstart
 * @Description: 消息接收者1,通过ACK确认机制
 * @author: sufulu
 * @version: 1.0.0
 * @create: 2021-02-02 15:16
 * @PackageName: simplest
 * @ClassName: Recer.java
 **/
public class Recer1 {
    static int i = 1;//统计吃掉羊肉串的数量

    public static void main(String[] args) throws Exception {
        //1. 获得连接
        Connection connection = ConnectionUtils.getConnection();
        //2. 获得通道
        Channel channel = connection.createChannel();
        //queueDeclare 该方法有双重作用,如果队列不存在,则创建;如果队列存在,则获取
        channel.queueDeclare("test_work_queue", false, false, false, null);
        //3. 从信道中获得消息
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 交付处理(收件人信息,包裹上的快递标签,协议的配置,消息)
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String s = new String(body);
                System.out.println("【顾客1】吃掉:" + s + " !总共吃【" + i++ + "】串!");
                //模拟网络延迟 吃掉1串花费0.2s
                try {
                    Thread.sleep(200);
                } catch (Exception e) {

                }
                //表示:手动确认
                //第二参数:是否同时确认多个消息
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        //4. 监听队列
        //第二个参数:true 表示自动消息确认;false 手动确认消息
        channel.basicConsume("test_work_queue", false, consumer);
    }
}
/**
 * @program: rabbitmq_quickstart
 * @Description: 消息接收者1,通过ACK确认机制
 * @author: sufulu
 * @version: 1.0.0
 * @create: 2021-02-02 15:16
 * @PackageName: simplest
 * @ClassName: Recer.java
 **/
public class Recer2 {
    static int i = 1;//统计吃掉羊肉串的数量

    public static void main(String[] args) throws Exception {
        //1. 获得连接
        Connection connection = ConnectionUtils.getConnection();
        //2. 获得通道
        Channel channel = connection.createChannel();
        //queueDeclare 该方法有双重作用,如果队列不存在,则创建;如果队列存在,则获取
        channel.queueDeclare("test_work_queue", false, false, false, null);
        //3. 从信道中获得消息
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 交付处理(收件人信息,包裹上的快递标签,协议的配置,消息)
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String s = new String(body);
                System.out.println("【顾客2】吃掉:" + s + " !总共吃【" + i++ + "】串!");
                //模拟网络延迟 牙口不太好吃掉一串要花费 0.9s
                try {
                    Thread.sleep(900);
                } catch (Exception e) {

                }
                //表示:手动确认
                //第二参数:是否同时确认多个消息
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        //4. 监听队列
        //第二个参数:true 表示自动消息确认;false 手动确认消息
        channel.basicConsume("test_work_queue", false, consumer);
    }
}

消息生产者

假设消息生产者,就是买肉串的老板,老板看到顾客来了,立马进行烤肉串,一次烤了100个肉串,给两个消费者

package work;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import utils.ConnectionUtils;

/**
 * @program: rabbitmq_quickstart
 * @Description: 消息生产者
 * @author: sufulu
 * @version: 1.0.0
 * @create: 2021-02-02 16:05
 * @PackageName: work
 * @ClassName: Sender.java
 **/
public class Sender {
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare("test_work_queue", false, false, false, null);
        //生产100个肉串
        for (int i = 1; i <= 100; i++) {
            String msg = "羊肉串 --> " + i;
            channel.basicPublish("", "test_work_queue", null, msg.getBytes());
            System.out.println("新鲜出炉:" + msg);
        }
        //5. 释放资源
        channel.close();
        connection.close();

    }
}

我们看一下结果:
顾客1(Recer1): 牙口好0.2s吃一个肉串。
顾客2(Recer2):上年纪了,牙口不好,需要0.9s吃一个肉串。
但是我们发现了一个问题,顾客1和顾客2 都吃了50串,顾客1先吃完了50串,然后顾客2慢慢吃完剩下的50串,顾客1就在那等着。
虽然两个消费者的消费速度不一致(线程休眠时间),但是消费的数量却是一致的,各消费50个消息。
image.pngimage.png
例如:在工作中,A同学编码速率高,B同学编码速率低,两个人同时开发一个项目,A同学10天完成,B同学30天完成,A完成自己的编码部分,就无所事事了,等着B完成,这样不行的,遵循“能者多劳” :::tips 效率高的多干点,效率低的少干点。 :::

如下图是由官网提供的解决思路:
image.png

可以使用设置为prefetchCount = 1的basicQos方法。这告诉 RabbitMQ一次不要给一个worker发送一条以上的消息。或者,换句话说,在worker处理并 确认前一个消息之前,不要向它发送新消息。相反,它将把它分派到下一个不繁忙的

worker。

即加上channel.basicQos(1) 快递一个一个送,送一个再送下一个,速度快的送件数多
image.pngimage.png

能者多劳,必须要配合手动的ACK机制才可以生效。

面试题:避免消息堆积?

  1. workqueue,多个消费者监听同一个队列
  2. 接收消息后,通过线程池,异步消费

发布订阅模式

将一个消息传递给多个消费者。例如抖音的视频主,众多粉丝关注一个视频主,视频主发布视频,所有粉丝都可以收到视频通知

image.png
X就是视频主,红色的队列就是粉丝,binding是绑定的意思(关注)
P生产者发送信息给X路由,X将信息转发给绑定X的队列。
image.png
X队列将信息通过信道发送给消费者,从而进行消费。
整个过程,必须先创建路由:

  • 路由在生产者程序中创建
  • 因为路由没有存储消息的能力,当生产者将信息发送给路由后,消费者还没有运行,所以没有队列,路由并不知道将信息发送给谁

生产者

public class Sender {
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
//        channel.queueDeclare("test_work_queue", false, false, false, null);
        //声明路由,创建网红主播
        //第一个参数:路由名称
        //第二个参数:路由类型 一共有四种。
        //fanout类型:不处理路由键(只需要将队列绑定到路由上,发送到路由的消息就都会被转发到与该路由绑定的所有队列上)
        channel.exchangeDeclare("text_exchange_fanout", "fanout");
        String msg = "hello everyone";
        //向绑定路由、网红主播的人发送消息
        channel.basicPublish("text_exchange_fanout", "", null, msg.getBytes());
        System.out.println("生产者:" + msg);
        //5. 释放资源
        channel.close();
        connection.close();

    }
}

消费者

public class Recer1 {
    public static void main(String[] args) throws Exception {
        //1. 获得连接
        Connection connection = ConnectionUtils.getConnection();
        //2. 获得通道
        Channel channel = connection.createChannel();
        //queueDeclare 该方法有双重作用,如果队列不存在,则创建;如果队列存在,则获取
        //声明队列
        channel.queueDeclare("test_exchange_fanout_queue_1", false, false, false, null);
        //关注网红,绑定路由
        /**
         * 参数1:队列名
         * 参数2:路由名
         */
        channel.queueBind("test_exchange_fanout_queue_1","text_exchange_fanout","");
        //3. 从信道中获得消息
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 交付处理(收件人信息,包裹上的快递标签,协议的配置,消息)
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String s = new String(body);
                System.out.println("【消费者1】:" + s);
            }
        };
        //4. 监听队列
        //第二个参数:true 表示自动消息确认;false 手动确认消息
        channel.basicConsume("test_exchange_fanout_queue_1", true, consumer);
    }
}

消费者2和消费者1是一样的代码,此处省略。

运行生产者,先生产网红主播。
之后运行消费者,关注网红主播。
然后生产者,将消息给网红主播,网红主播进行消息上传,通知给关注主播的粉丝
image.png
image.png

路由模式

路由会根据类型进行定向分发消息给不同的队列。 可以理解为快递公司的分拣中心。整个小区,东面的楼小张送货,西面的楼小王送货。

image.png
运行顺序:

  • 先运行一次生产者,创建路由
  • 在运行消费者,绑定路由
  • 生产者运行,发出消息

    生产者

    ```java public class Sender { public static void main(String[] args) throws Exception {

      Connection connection = ConnectionUtils.getConnection();
      Channel channel = connection.createChannel();
    

    // channel.queueDeclare(“test_work_queue”, false, false, false, null);

      //声明路由,创建网红主播
      //第一个参数:路由名称
      //第二个参数:路由类型 一共有四种。
      //direct:根据路由键进行定向分发消息
      channel.exchangeDeclare("text_exchange_direct", "direct");
      String msg = "用户注册,【userid=s101】";
      //推消息到路由器
      //第二个参数必填,路由键
      channel.basicPublish("text_exchange_direct", "select", null, msg.getBytes());
      System.out.println("[用户系统]:" + msg);
      //5. 释放资源
      channel.close();
      connection.close();
    

    } }

<a name="nUuyv"></a>
#### 消费者
```java
public class Recer1 {
    public static void main(String[] args) throws Exception {
        //1. 获得连接
        Connection connection = ConnectionUtils.getConnection();
        //2. 获得通道
        Channel channel = connection.createChannel();
        //queueDeclare 该方法有双重作用,如果队列不存在,则创建;如果队列存在,则获取
        //声明队列
        channel.queueDeclare("test_exchange_direct_queue_1", false, false, false, null);
        //绑定路由 路由键的类型是:insert、update、delete就用queue1绑定
        /**
         * 参数1:队列名
         * 参数2:路由名
         * 参数3:路由键
         */
        channel.queueBind("test_exchange_direct_queue_1","text_exchange_direct","insert");
        channel.queueBind("test_exchange_direct_queue_1","text_exchange_direct","update");
        channel.queueBind("test_exchange_direct_queue_1","text_exchange_direct","delete");
        //3. 从信道中获得消息
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 交付处理(收件人信息,包裹上的快递标签,协议的配置,消息)
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String s = new String(body);
                System.out.println("【消费者1】:" + s);
            }
        };
        //4. 监听队列
        //第二个参数:true 表示自动消息确认;false 手动确认消息
        channel.basicConsume("test_exchange_direct_queue_1", true, consumer);
    }
}

消费者2:

public class Recer2 {
    public static void main(String[] args) throws Exception {
        //1. 获得连接
        Connection connection = ConnectionUtils.getConnection();
        //2. 获得通道
        Channel channel = connection.createChannel();
        //queueDeclare 该方法有双重作用,如果队列不存在,则创建;如果队列存在,则获取
        //声明队列
        channel.queueDeclare("test_exchange_direct_queue_2", false, false, false, null);
        //绑定路由
        /**
         * 参数1:队列名
         * 参数2:路由名
         * 参数3:路由键
         */
        channel.queueBind("test_exchange_direct_queue_2", "text_exchange_direct", "select");
        //3. 从信道中获得消息
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 交付处理(收件人信息,包裹上的快递标签,协议的配置,消息)
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String s = new String(body);
                System.out.println("【消费者2】:" + s);
            }
        };
        //4. 监听队列
        //第二个参数:true 表示自动消息确认;false 手动确认消息
        channel.basicConsume("test_exchange_direct_queue_2", true, consumer);
    }
}

生产者,发送路由键为:insert、update、delete,则消费者1 接收消息
image.png
生产者,发送路由键为:select,则消费者2接收消息
image.png

通配符模式

通配符模式和路由模式90%是一样的,唯独的区别就是路由键支持模糊匹配。 匹配符号: *: 只能匹配一个词,正好一个词,多一个不行,少一个也不行 # :能匹配0个或更多个次

image.png

生产者

public class Sender {
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
//        channel.queueDeclare("test_work_queue", false, false, false, null);
        //声明路由,创建网红主播
        //第一个参数:路由名称
        //第二个参数:路由类型 一共有四种。
        //direct:根据路由键进行定向分发消息
        //topic:模糊匹配的定向分发
        channel.exchangeDeclare("text_exchange_topic", "topic");
        String msg = "订单下单";
        //推消息到路由器
        //第二个参数必填,路由键
        channel.basicPublish("text_exchange_topic", "order.down", null, msg.getBytes());
        System.out.println("[用户系统]:" + msg);
        //5. 释放资源
        channel.close();
        connection.close();

    }
}

消费者

消费者1:接收user.#用户相关的消息

public class Recer1 {
    public static void main(String[] args) throws Exception {
        //1. 获得连接
        Connection connection = ConnectionUtils.getConnection();
        //2. 获得通道
        Channel channel = connection.createChannel();
        //queueDeclare 该方法有双重作用,如果队列不存在,则创建;如果队列存在,则获取
        //声明队列
        channel.queueDeclare("test_exchange_topic_queue_1", false, false, false, null);
        /**
         * 绑定用户相关的消息:user.#
         * 参数1:队列名
         * 参数2:路由名
         * 参数3:路由键
         */
        channel.queueBind("test_exchange_topic_queue_1","text_exchange_topic","user.#");
        //3. 从信道中获得消息
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 交付处理(收件人信息,包裹上的快递标签,协议的配置,消息)
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String s = new String(body);
                System.out.println("【消费者1】:" + s);
            }
        };
        //4. 监听队列
        //第二个参数:true 表示自动消息确认;false 手动确认消息
        channel.basicConsume("test_exchange_topic_queue_1", true, consumer);
    }
}

消费者2,接收product.# 和 order.# 消息

public class Recer2 {
    public static void main(String[] args) throws Exception {
        //1. 获得连接
        Connection connection = ConnectionUtils.getConnection();
        //2. 获得通道
        Channel channel = connection.createChannel();
        //queueDeclare 该方法有双重作用,如果队列不存在,则创建;如果队列存在,则获取
        //声明队列
        channel.queueDeclare("test_exchange_topic_queue_2", false, false, false, null);
        /**
         * 绑定商品和订单相关的消息
         * 参数1:队列名
         * 参数2:路由名
         * 参数3:路由键
         */
        channel.queueBind("test_exchange_topic_queue_2", "text_exchange_topic", "product.#");
        channel.queueBind("test_exchange_topic_queue_2", "text_exchange_topic", "order.#");
        //3. 从信道中获得消息
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 交付处理(收件人信息,包裹上的快递标签,协议的配置,消息)
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String s = new String(body);
                System.out.println("【消费者2】:" + s);
            }
        };
        //4. 监听队列
        //第二个参数:true 表示自动消息确认;false 手动确认消息
        channel.basicConsume("test_exchange_topic_queue_2", true, consumer);
    }
}

运行测试: :::tips 注意运行顺序:首先运行生产者,创建路由,然后在运行消费者,需要发送消息则继续运行生产者。 ::: 分别发送:商品信息product.price和订单信息order.down
image.png
再发送用户信息user.register
image.png

持久化

消息的可靠性是RabbitMQ的一大特色,那么RabbitMQ是如何避免消息丢失呢?

  • 消费者的ACK确认机制,可以放置消费者丢失消息
  • 万一在消费者消费之前,RabbitMQ服务器宕机,那消息也会丢失

将消息持久化,那么路由和队列都要持久化才可以。

在管理端,路由的Features代D的就是持久化的路由,队列也是一样的Fetures为D就表示持久化
image.png
为了演示测试效果,首先将RabbitMQ服务重启:systemctl restart rabbitmq-server.service 然后在管理端查看路由和队列已经没有了之前创建的路由和队列了,因为他们不是持久化的。
然后修改我们的上述写的通配符模式的代码:
生产者:路由持久化,exchangeDeclare方法的第三个参数为true,并且信道MessageProperties.PERSISTENT_TEXT_PLAIN设置

channel.exchangeDeclare("text_exchange_topic", "topic", true);
String msg = "订单下单";
//推消息到路由器
//第二个参数必填,路由键
channel.basicPublish("text_exchange_topic", "order.down", MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());

消费者:队列持久化,第二个参数为true表示持久化队列

channel.queueDeclare("test_exchange_topic_queue_2", true, false, false, null);

运行,然后查看管理端:
路由信息:
image.png
队列信息:
image.png
然后我们在重新启动RabbitMQ服务,查看我们创建的路由和队列是否还存在,持久化的再次重新肯定还是存在的。
image.png
image.png