一、安装

1.1 安装erlang运行环境

otp_win64_23.2.rar
安装otp_win64_20.2.exe
右键以管理员身份运行
78 - RabbitMQ 消息队列 - 图2
接着选定安装路径,一直点击下一步傻瓜式安装

1.2 安装RabbitMQ

1.2.1 安装rabbitmq-server-3.7.4.exe

rabbitmq-server-3.8.14.rar
双击文件rabbitmq-server-3.7.4.exe,傻瓜式安装,(注意不要安装在包含中文和空格的目录下!安装后window服务中就存在rabbitMQ了,并且是启动状态。 )

1.2.2 安装管理界面(插件)

  1. 进入rabbitMQ安装目录的sbin目录
  2. 点击上方的路径框输入cmd,按下回车键

image.png
image.png

  1. 输入命令点击回车
    1. rabbitmq-plugins enable rabbitmq_management
    image.png

    1.2.3 重启服务

    1.双击rabbitmq-server.bat(双击后可能需要等待一会)
    image.png
    2.打开浏览器,地址栏输入http://127.0.0.1:15672 ,即可看到管理界面的登陆页
    image.png
    账号密码都为guest,登录进入主页面
    image.png最上侧的导航依次是:概览、连接、信道、交换器、队列、用户管理

二、RabbitMQ简介

2.1 引言

以熟悉的电商场景为例,如果商品服务和订单服务是两个不同的微服务,在下单的过程中订单服务需要调用商品服务进行扣库存操作。按照传统的方式,下单过程要等到调用完毕之后才能返回下单成功,如果网络产生波动等原因使得商品服务扣库存延迟或者失败,会带来较差的用户体验,如果在高并发的场景下,这样的处理显然是不合适的,那怎么进行优化呢?这就需要消息队列登场了。
消息队列提供一个异步通信机制,消息的发送者不必一直等待到消息被成功处理才返回,而是立即返回。消息中间件负责处理网络通信,如果网络连接不可用,消息被暂存于队列当中,当网络畅通的时候在将消息转发给相应的应用程序或者服务,当然前提是这些服务订阅了该队列。如果在商品服务和订单服务之间使用消息中间件,既可以提高并发量,又降低服务之间的耦合度。
RabbitMQ就是这样一款我们苦苦追寻的消息队列。RabbitMQ是一个开源的消息代理的队列服务器,用来通过普通协议在完全不同的应用之间共享数据。
RabbitMQ是使用Erlang语言来编写的,并且RabbitMQ是基于AMQP协议的。Erlang语言在数据交互方面性能优秀,有着和原生Socket一样的延迟,这也是RabbitMQ高性能的原因所在。可谓“人如其名”,RabbitMQ像兔子一样迅速。

2.2 特点

RabbitMQ除了像兔子一样跑的很快以外,还有这些特点:

  • 开源、性能优秀,稳定性保障
  • 提供可靠性消息投递模式、返回模式
  • 与Spring,AMQP完美整合,API丰富
  • 集群模式丰富,表达式配置,HA模式,镜像队列模型
  • 保证数据不丢失的前提做到高可靠性、可用性

2.3 典型应用场景

  • 异步处理:把消息放入消息中间件中,等到需要的时候再去处理。
  • 流量削峰:例如秒杀活动,在短时间内访问量急剧增加,使用消息队列,当消息队列满了就拒绝响应,跳转到错误页面,这样就可以使得系统不会因为超负载而崩溃。
  • 日志处理
  • 应用解耦:假设某个服务A需要给许多个服务(B、C、D)发送消息,当某个服务(例如B)不需要发送消息了,服务A需要改代码再次部署;当新加入一个服务(服务E)需要服务A的消息的时候,也需要改代码重新部署;另外服务A也要考虑其他服务挂掉,没有收到消息怎么办?要不要重新发送呢?是不是很麻烦,使用MQ发布订阅模式,服务A只生产消息发送到MQ,B、C、D从MQ中读取消息,需要A的消息就订阅,不需要了就取消订阅,服务A不再操心其他的事情,使用这种方式可以降低服务或者系统之间的耦合。

2.4 AMQP协议与RabbitMQ

提到RabbitMQ,就不得不提AMQP协议。AMQP协议是具有现代特征的二进制协议。是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。
先了解一下AMQP协议中间的几个重要概念:

  • Server:接收客户端的连接,实现AMQP实体服务
  • Connection:连接,应用程序与Server的网络连接,TCP连接。
  • Channel:信道,消息读写等操作在信道中进行。客户端可以建立多个信道,每个信道代表一个会话任务。
  • Message:消息,应用程序和服务器之间传送的数据,消息可以非常简单,也可以很复杂。有Properties和Body组成。Properties为外包装,可以对消息进行修饰,比如消息的优先级、延迟等高级特性;Body就是消息体内容。
  • Virtual Host:虚拟主机,用于逻辑隔离。一个虚拟主机里面可以有若干个Exchange和Queue,同一个虚拟主机里面不能有相同名称的Exchange或Queue。
  • Exchange:交换器,接收消息,按照路由规则将消息路由到一个或者多个队列。如果路由不到,或者返回给生产者,或者直接丢弃。RabbitMQ常用的交换器常用类型有direct、topic、fanout、headers四种,后面详细介绍。
  • Binding:绑定,交换器和消息队列之间的虚拟连接,绑定中可以包含一个或者多个RoutingKey。
  • RoutingKey:路由键,生产者将消息发送给交换器的时候,会发送一个RoutingKey,用来指定路由规则,这样交换器就知道把消息发送到哪个队列。路由键通常为一个“.”分割的字符串,例如“com.rabbitmq”。
  • Queue:消息队列,用来保存消息,供消费者消费。

    我们完全可以直接使用 Connection 就能完成信道的工作,为什么还要引入信道呢? 试想这样一个场景, 一个应用程序中有很多个线程需要从 RabbitMQ 中消费消息,或者生产消息,那么必然需要建立很多个 Connection,也就是许多个 TCP 连接。然而对于操作系统而言,建立和销毁 TCP 连接是非常昂贵的开销,如果遇到使用高峰,性能瓶颈也随之显现。 RabbitMQ 采用 TCP 连接复用的方式,不仅可以减少性能开销,同时也便于管理 。

下图是AMQP的协议模型:
78 - RabbitMQ 消息队列 - 图9
正如图中所看到的,AMQP协议模型有三部分组成:生产者、消费者和服务端。
生产者是投递消息的一方,首先连接到Server,建立一个连接,开启一个信道;然后生产者声明交换器和队列,设置相关属性,并通过路由键将交换器和队列进行绑定。同理,消费者也需要进行建立连接,开启信道等操作,便于接收消息
接着生产者就可以发送消息,发送到服务端中的虚拟主机,虚拟主机中的交换器根据路由键选择路由规则,然后发送到不同的消息队列中,这样订阅了消息队列的消费者就可以获取到消息,进行消费。
最后还要关闭信道和连接
RabbitMQ是基于AMQP协议实现的,其结构如下图所示,和AMQP协议一模一样。
20190903141227300.png

三、RabbitMQ交换机分类

RabbitMQ的Exchange(交换器)分为四类:

  • direct(默认)
  • fanout
  • topic
  • headers(不常用)


使用RabbitMQ前先启动成1.2.3中最后的画面并引入依赖**

        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.7.3</version>
        </dependency>

3.1 direct交换器

3.1.1 概念

direct为默认的交换器类型,如果路由键匹配的话,消息就投递到相应的队列,如图:

78 - RabbitMQ 消息队列 - 图11
使用代码:channel.basicPublish(“”, QueueName, null, message)推送direct交换器消息到对于的队列,空字符为默认的direct交换器,用队列名称当做路由键。

3.1.2 代码示例

发送端:

/**
 * 路由
 */
public class MQProvider_Publish {

    //队列名称
    private final static String QUEUE_NAME = "hello_route";
    private final static String QUEUE_NAME_2 = "hello_route_2";
    private final static String QUEUE_NAME_3 = "hello_route_3";

    //交换机名称
    private final static String EXCHANGE_NAME = "ex2";

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        try {
            //新建连接与信道
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            //声明交换机
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            //声明队列【参数一:队列名称,参数二:是否持久化;参数三:是否独占模式;参数四:消费者断开连接时是否删除队列;参数五:消息其他参数】
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
            //将交换机与队列绑定,指定路由
            channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"one");

            channel.queueDeclare(QUEUE_NAME_2,false,false,false,null);
            channel.queueBind(QUEUE_NAME_2,EXCHANGE_NAME,"one");

            channel.queueDeclare(QUEUE_NAME_3,false,false,false,null);
            channel.queueBind(QUEUE_NAME_3,EXCHANGE_NAME,"two");
            //发送消息
            String message = "Hello Publish!";
            // 推送内容【参数一:交换机名称;参数二:路由键,参数三:消息的其他属性-路由的headers信息;参数四:消息主体】
            channel.basicPublish(EXCHANGE_NAME,"one",null,message.getBytes());
            System.out.println("send message:" + message);
            connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

接收端:

public class MQConsumer_Subscribe {

    private final static String QUEUE_NAME = "hello_route";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("get message:" + message);
        };
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
    }
}

3.1.3 测试

启动发送端发送一次数据
QQ图片20210310144044.png
QQ截图20210310144253.png
可见,在发送端代码中路由绑定的队列中各有一条消息
QQ图片20210310143928.png

启动接收端接收数据
可见,当接收端队列名为hello_route时,能够接收到消息(hello_route_2同理),而队列名为hello_route_3时无法接收消息
QQ截图20210310144427.pngQQ截图20210310144340.png

3.2 fanout交换器——发布/订阅模式

fanout有别于direct交换器,fanout是一种发布/订阅模式的交换器,当你发送一条消息的时候,交换器会把消息广播到所有附加到这个交换器的队列上。
和direct交换器不同,我们在发送消息的时候新增channel.exchangeDeclare(ExchangeName, “fanout”),这行代码声明fanout交换器。
78 - RabbitMQ 消息队列 - 图17
注意:对于fanout交换器来说routingKey(路由键)无效的,这个参数是被忽略的。
代码部分省略(与direct类似)
**

3.3 topic交换器——匹配订阅模式

topic交换器运行和fanout类似,但是可以更灵活的匹配自己想要订阅的信息,这个时候routingKey路由键就派上用场了,使用路由键进行消息(规则)匹配
假设我们现在有一个日志系统,会把所有日志级别的日志发送到交换器,warning、log、error、fatal,但我们只想处理error以上的日志,要怎么处理?这就需要使用topic路由器了。
topic路由器的关键在于定义路由键,定义routingKey名称不能超过255字节,使用“.”作为分隔符,例如:com.mq.rabbit.error。
消费消息的时候routingKey可以使用下面字符匹配消息:

  • “*”匹配一个分段(用“.”分割)的内容;
  • “#”匹配0和多个字符;

例如发布了一个“com.mq.rabbit.error”的消息:
能匹配上的路由键:

  • cn.mq.rabbit.*
  • cn.mq.rabbit.#
  • .error

  • cn.mq.#
  • #

不能匹配上的路由键:

  • cn.mq.*
  • *.error
  • *

所以如果想要订阅所有消息,可以使用“#”匹配。
注意:fanout、topic交换器是没有历史数据的,也就是说对于中途创建的队列,获取不到之前的消息。
代码部分省略(与direct类似)
**

四、SpringBoot集成RabbitMQ

4.1 生产者微服务producer

4.1.1 引入依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

4.1.2 yml配置文件

server:
  port: 8021
spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest

4.1.3 RabbitMQConfig

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

    //队列
    @Bean
    public Queue queue1() {
        return new Queue("queue1");
    }

    //Direct交换机,可指定其他类型
    @Bean
    DirectExchange directExchange() {
        return new DirectExchange("directExchange",true,false);
    }

    //绑定  将队列和交换机绑定, 并设置路由键:one
    @Bean
    Binding bindingDirect() {
        return BindingBuilder.bind(queue1()).to(directExchange()).with("one");
    }
}

4.1.4 测试类

import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import javax.annotation.Resource;

@SpringBootTest
@RunWith(SpringRunner.class)
class ProducerApplicationTests {

    //使用RabbitTemplate,这提供了接收/发送等等方法
    @Resource
    RabbitTemplate rabbitTemplate;

    @Test
    void test() {
        //参数1:交换机 参数2:路由键    参数3:需要发送的消息
        rabbitTemplate.convertAndSend("directExchange","one","Hello");
    }

}

注:如果发送对象,需要该对象的所有信息完全相同(包括全包名),并且实现序列化且id相同

4.1.5 查看结果

访问http://127.0.0.1:15672/并登录,账号密码都为guest
image.png

image.png

4.2 消费者微服务Consumer

4.2.1 引入依赖

同上

4.2.2 yml配置文件

同上,改个端口号

4.2.3 RabbitMQConfig

同上

4.2.4 Reciver接收类

注解中queues队列名,可为多个

@Component
@RabbitListener(queues = "queue1")
public class Receiver {

    @RabbitHandler
    public void process(String msg){
        System.out.println(msg);
    }
}

4.2.5 启动微服务,通过producer发出消息,查看结果

image.png