RabbitMQ

RabbitMQ - 图1

RabbitMQ - 图2

安装 Docker

  1. // 卸载系统之前的 docker
  2. sudo yum remove docker \
  3. docker-client \
  4. docker-client-latest \
  5. docker-common \
  6. docker-latest \
  7. docker-latest-logrotate \
  8. docker-logrotate \
  9. docker-engine
  10. // 安装 Docker-CE
  11. sudo yum install -y yum-utils device-mapper-persistent-data lvm2
  12. // 设置 docker repo 的 yum 源, 此处是阿里源
  13. sudo yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo
  14. // 安装 docker, 以及docker-cli
  15. sudo yum install docker-ce docker-ce-cli containerd.io
  16. // 启动 docker
  17. sudo systemctl start docker
  18. // 查看 docker 状态
  19. sudo systemctl status docker
  20. // 设置 docker 开机自启
  21. sudo systemctl enable docker
  22. // 重启docker
  23. sudo systemctl restart docker

Docker 安装 RabbitMQ

// 下载
docker pull rabbitmq:management

// 启动
docker run -d -p 5672:5672 -p 15672:15672 -p 25672:25672 --name rabbitmq rabbitmq:management

// 设置开机自启
docker update rabbitmq --restart=always

三种使用方式

使用代码

/* 生产者 */
// 建立连接
ConnectionFactory factory = new ConnectionFactory(); // 创建 RabbitMQ 连接工厂
// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
factory.setHost("192.168.0.111");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
// 1.2.建立连接
Connection connection = factory.newConnection();
// 2.创建通道Channel
Channel channel = connection.createChannel();
// 3.创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.发送消息
String message = "hello, rabbitmq!"; // 消息
channel.basicPublish("", queueName, null, message.getBytes());
System.out.println("发送消息成功:【" + message + "】");
// 5.关闭通道和连接
channel.close();
connection.close();

/****************************************************************************/

/* 消费者 */
// 建立连接、设置连接参数、创建连接、创建队列 略
channel.basicConsume(queueName, true, new DefaultConsumer(channel){ // 设置监听
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope,
        AMQP.BasicProperties properties, byte[] body) throws IOException {
            // 5.处理消息
            String message = new String(body);
            System.out.println("接收到消息:【" + message + "】");
        }
});

使用 Spring AMQP

  • 使用 Spring AMQP

    • 发送流程
      • 引入 AMQP 依赖 spring-boot-starter-amqp
      • 配置文件添加 mq 连接信息
      • 注入 rabbitTemplate, 设置队列名称、消息, 调用 convertAndSend 发送消息
    • 接收流程

      • 引入 AMQP 依赖 spring-boot-starter-amqp
      • 配置文件添加 mq 连接信息
      • 新建一个类使用 @Component 注解注入到 Spring 容器中, 编写方法 使用 @RabbitListener 指定队列名称, 编写方法接收消息

        spring:
        rabbitmq:
        host: 192.168.0.111 # 主机名
        port: 5672 # 端口
        virtual-host: / # 虚拟主机
        username: guest # 用户名
        password: guest # 密码
        
        // 消费者
        @RabbitListener(queues = "simple.queue") // 绑定队列名称
        public void listenSimpleQueueMessage(String msg) { // 传递过来是什么类型接受就是什么类型
        // 处理业务逻辑
        System.out.println("spring 消费者接收到消息:【" + msg + "】");
        }
        
        // 生产者
        public void testSimpleQueue() {
        // 队列名称
        String queueName = "simple.queue";
        // 消息
        String message = "hello, spring amqp!";
        // 发送消息
        rabbitTemplate.convertAndSend(queueName, message);
        }
        

        基于注解声明队列和交换机

        @RabbitListener(bindings = @QueueBinding( // 绑定关系
            value = @Queue(name = "对"), // 队列名称
            exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT), // 交换机名称, 交换机类型
            key = {"red", "blue"} // rk
        ))
        public void listenDirectQueue1(String msg){
        System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】");
        }
        
        // 生产者
        public void testSendDirectExchange() {
        // 交换机名称
        String exchangeName = "交换机名称";
        // 消息
        String message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";
        // 发送消息
        rabbitTemplate.convertAndSend(exchangeName, "red", message);
        }
        

        RabbitMQ 的优势

        应用解耦(解除耦合)

  • 传统做法:每新增一项业务, 都要在原有的支付服务基础上去修改代码

  • 接入MQ: 支付完成后发送消息不在关注后续业务, 后续新增业务订阅事件即可

RabbitMQ - 图3

异步处理(提升性能)

  • 传统做法
    • 串行: 支付成功 -> 订单业务 -> 仓储服务 -> 短信服务, 整体调用链路过长消耗性能
    • 并行: 支付成功 -> 创建线程同时处理 剩余业务, 可以在串行上进一步提升性能
  • 接入MQ: 支付成功 发送消息 直接返回, 不在等待后续业务完成

RabbitMQ - 图4

故障隔离

  • 传统做法: 如果下游链路中某一个环节在使用中失败, 会导致整体业务不可用
  • 接入MQ: 接入MQ: 支付成功 发送消息 直接返回, 不在等待后续业务完成, 出现问题的业务自己处理

RabbitMQ - 图5

流量消峰

RabbitMQ - 图6

RabbitMQ 的基本结构

RabbitMQ - 图7

RabbitMQ 消息模型

RabbitMQ - 图8

简单模型

  • 一个生产者、一个队列、一个消费者

RabbitMQ - 图9

  • 入门案例(生产者消费者都声明队列的原因是避免队列不存在)

    • 基本消息队列的消息发送流程
      • 建立 connection
      • 创建 channel
      • 利用 channel 声明队列
      • 利用 channel 向队列发送消息
    • 基本消息队列的消息接收流程
      • 建立 connection
      • 创建 channel
      • 利用 channel 声明队列
      • 定义 consumer 的消费行为 handleDelivery()
      • 利用 channel 将消费者与队列绑定

        工作模型

        RabbitMQ - 图10
  • 工作队列(RabbitMQ 默认会做公平分发, 预取操作 先把消息平均发送给所有的 消费者)

    • 多个消费者绑定到一个队列,同一条消息只会被一个消费者处理
    • 通过设置prefetch来控制消费者预取的消息数量
      spring:
      rabbitmq:
      listener:
       simple:
         # 如果不设定该参数, RabbitMQ 默认会做公平分发, 预取操作 先把消息平均发送给所有的 消费者
         prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
      

      广播模型

      RabbitMQ - 图11
      RabbitMQ - 图12
      RabbitMQ - 图13
  • 广播模型(Fanout Exchange)

    • 在 consumer 服务声明 Exchange、Queue、Binding
      • 在 consumer 定义一个类, 添加 @configuration 注解, 并声明 FanoutExchange、Queue 和绑定关系对象 Bingding
    • 在 consumer 服务声明两个消费者
      • 添加两个方法, 分别监听绑定的队列
    • 在 publisher 服务发送消息到 FanoutExchange
  • 注: 交换机的作用

    • 接收 publisher 发送的消息
    • 将消息按照规则路由到与之绑定的队列
    • 不能缓存消息,路由失败,消息丢失
    • FanoutExchange 的会将消息路由到每个绑定的队列

      路由模型

      RabbitMQ - 图14
  • 路由模型(Direct Exchange)

    • 在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct 类型的 Exchange
      • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
      • 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey
      • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息
    • 注意
      • Fanout交换机将消息路由给每一个与之绑定的队列
      • Direct交换机根据RoutingKey判断路由给哪个队列
      • 如果多个队列具有相同的RoutingKey,则与Fanout功能类似

        通配符模型

        RabbitMQ - 图15
  • 通配符模型(Topic Exchange)

    • Topic类型的ExchangeDirect相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符
      • Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
        • #:匹配一个或多个词
        • *:匹配不多不少恰好1个词
    • 注意
      • Topic交换机接收的消息RoutingKey必须是多个单词,以 **.** 分割
      • Topic交换机与队列绑定时的bindingKey可以指定通配符