image.png

  1. 参考文献
    1. https://blog.csdn.net/kavito/article/details/91403659
    2. https://www.jianshu.com/p/79ca08116d57
    3. https://www.cnblogs.com/vipstone/p/9350075.html
    4. https://blog.csdn.net/weixin_44742255/article/details/118993370

1. 下载安装

  1. ## 环境背景:一个纯净的linux环境,软件需求:jdk、tomcat、erlang语言环境、socat加密
  2. ## 下载RabbitMQ所需软件包(本神在这里使用的是 RabbitMQ3.6.5 稳定版本)
  3. wget www.rabbitmq.com/releases/erlang/erlang-18.3-1.el7.centos.x86_64.rpm
  4. wget http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-1.1.el7.lux.x86_64.rpm
  5. wget www.rabbitmq.com/releases/rabbitmq-server/v3.6.5/rabbitmq-server-3.6.5-1.noarch.rpm
  6. ## 首先在Linux上进行一些软件的准备工作,yum下来一些基础的软件包
  7. yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz
  8. ## 配置好主机名称
  9. /etc/hosts
  10. /etc/hostname
  11. ## 安装服务命令
  12. rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm
  13. rpm -ivh socat-1.7.3.2-1.1.el7.x86_64.rpm
  14. rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm
  15. ## 修改用户登录与连接心跳检测,注意修改
  16. vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app
  17. 修改点1loopback_users 中的 <<"guest">>,只保留guest (用于用户登录)
  18. 修改点2heartbeat 10(用于心跳连接)
  19. ## 安装管理插件
  20. ## 首先启动服务(后面 | 包含了停止、查看状态以及重启的命令)
  21. - /etc/init.d/rabbitmq-server start | stop | status | restart
  22. ## 查看服务有没有启动: lsof -i:5672 (5672是Rabbit的默认端口)
  23. - rabbitmq-plugins enable rabbitmq_management
  24. ## 可查看管理端口有没有启动:
  25. - lsof -i:15672 或者 netstat -tnlp | grep 15672
  26. ## 一切OK 我们访问地址,输入用户名密码均为 guest :
  27. http://你的ip地址:15672/
  28. ## 如果一切顺利,那么到此为止,我们的环境已经安装完啦

2. 控制台解析

  1. 在启动的时候会自动生成一个虚拟主机:**/ **
  2. 访问端口
    1. **java**代码连接mq访问:**5672**
    2. 集群之间通信默认:**25672**
    3. **mq**的管理界面:**15672**
  3. image.png

    3. 核心概念

    image.png

    3.1 简单概念

    3.1.1 生产者和消费者

  4. **Producer**:消息的生产者,也是一个向交换器发布消息的客户端应用程序

  5. **Consumer**:消息消费者,即消费方客户端,接收**MQ**转发的消息

    3.1.2 消息队列

  6. 消息队列**(queue)**是用来保存消息,直到发送给消费者

  7. 它是消息的容器,也是消息的终点
  8. 一个消息可投入一个或多个队列
  9. 消息一直在队列里面,等待消费者连接到这个队列将其取走
  10. 一些属性:
    1. **Durability**:是否持久化
    2. **Auto delete**:如果是**yes**,代表最后一个监听被移除后,队列会被自动删除 ```java /*
  • 声明(创建)队列
  • 参数:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments
  • 1、queue 队列名称
  • 2、durable 是否持久化,如果持久化,mq重启后队列还在
  • 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建
  • 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)
  • 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间 */ channel.queueDeclare(QUEUE_NAME, false, false, false, null); ```

    3.1.3 消息

  1. 消息**(message)**是不具名的,它由消息头和消息体组成
  2. 本质上就是一段数据,由**properties****payload**``**(body)**组成
  3. 消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性主要包括
    1. **routing-key(路由键)**
    2. **priority(相对于其他消息的优先权)**
    3. **delivery-mode(指出该消息可能需要持久性存储)**
  4. 消息体的常见属性

    1. **headers**:实际上是一个map,是自定义的一些参数
    2. **content_type**
    3. **content_encoding**:字符集
    4. **correlation_id**:唯一标记,可以用来保持幂等性
    5. **reply_to**
    6. **expiration**
    7. **message_id**
    8. **timestamp**
    9. **type**
    10. **user_id**
    11. **app_id**
    12. **cluster_id**

      3.1.4 绑定、连接

  5. 绑定**(binding)**,用于消息队列和交换器之间的关联,一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表

  6. 连接**(Connection)**:网络连接,比如一个**TCP**连接

    3.2 信道 Channel

  7. 信道,多路复用连接中的一条独立的双向数据流通道,信道是建立在真实的**TCP**连接内地虚拟连接

  8. **AMQP** 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成
  9. 对于操作系统来说建立和销毁 **TCP** 都是非常昂贵的开销,引入了信道的概念,以复用一条 **TCP** 连接
  10. 一个**channel**可能对应多个消费者,多个**exchange**,多个生产者

    3.3 Broker

  11. 表示消息队列服务器实体(代表真正的服务器)

  12. 消息队列服务进程,此进程包括两个部分:**Exchange****Queue**
  13. 一个**broker**包含多个**Virtual Host**

    3.4 虚拟地址 Virtual Host

  14. 虚拟地址,用于进行逻辑隔离,最上层的消息路由

  15. 一个**Virtual Host**里面可以有若干个**Exchange****Queue**
  16. 同一个**Virtual Host**里面不能有相同名称的**Exchange**或者**Queue**
  17. 设置虚拟机,一个**mq**服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的**mq**
  18. 最好的实践,可以根据不同的业务定义不同的域,例如:**/order、/pay**
  19. 消息队列在启动的时候会自动生成一个虚拟主机:**/**

    3.5 交换机 Exchange

    3.5.1 基础认知

  20. 它本身是一个逻辑的概念,接收生产者发送的消息并根据路由转键转发到消息到所绑定的队列

  21. 交换机接收生产者发送的消息,另一方面:知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于**Exchange**的类型
  22. 交换机只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与**Exchange**绑定,或者没有符合路由规则的队列,那么消息会丢失!

    3.5.2 工作原理

  23. 生产者发送消息到交换机

  24. 交换机通过开始和队列绑定的关键信息和**RoutingKey**,把消息路由到指定的队列

    3.5.3 常见属性

  25. **Name**:交换机的名称

  26. **Type**:类型
    1. **Direct**:直链模式所有发送到该交换机上的消息被转发到**"routing key"**中指定的队列
    2. **Topic**: 做一些规则的匹配,比如** "tex.*"**的匹配
    3. **Fanout**: 类似于一个广播的模式,将消息交给所有绑定到交换机的队列
    4. **Header**: 请求的模式忘记就完事了,工作中不会使用
  27. **Durability**: 是否持久化
    1. **durable**:持久化
    2. **transient**:不持久化
  28. **Auto delete**: 自动删除,当最后一个绑定到交换机上的队列删除后,自动删除该交换机
  29. **Internal**: 当前交换机是否用于 **Rabbit MQ** 内部使用,默认**false**,外部使用
  30. **Arguments**: 扩展参数,用于扩展**AMQP**协议自制定制化使用

    3.5.4 实践忠告

    交换机在设计的时候和队列是多对对的关系,但是在生产实践不建议这样用,会把业务之间的关系搞复杂,最好是一个队列对应一个交换机
    Rabbit MQ - 图5

    4. 工作流程

    4.1 生产者发送消息

  31. 生产者和**broker**建立**TCP**连接

  32. 生产者和**broker**建立**channel**
  33. 生产者通过**channel**消息发送给**broker**,由**bxchange**将消息进行转发
  34. 交换机将**message**转发到指定的**queue**

    4.2 消费者接收消息

  35. **cunsumer****broker**建立**TCP**连接

  36. **cunsumer****broker**建立**channel**
  37. **cunsumer**监听指定的**queue**
  38. 当有**message**到达**queue****broker**默认将消息推送给消费者
  39. **cunsumer**接收到消
  40. **ack**回复
  41. **Tips**:使用代码的时候会发现消费服务不会停止,一直在监听队列中是否有新的消息。一旦有新的消息进入队列,就会立即打印(消费)

    5. 六中基本消息类型


  1. 基本消息类型和**work**消息模型,可以看成完全匹模式**(direct)**的特殊类型,使用默认的**exchange****queue**,生产环境基本不用,多数使用订阅模型
  2. 在实际代码开发中,**queue****exchange**的声明在生产者和消费者中都可以,但实际上,最常用的是在界面手动创建

2.1 基础消息模型

  1. 这个模式就是单纯的当**mq**就是一个队列,使用默认的**exchange**和自定义的队列
  2. 类似于**direct**模式:规则**exchange**使用**amqp defaul**t,**routingKey**使用队列名称

    2.2 work消息模型

    2.2.1 基础认知

  3. 工作队列或者竞争消费者模式,**exchange****queue**也是使用默认的

  4. 入门程序与**work queues**相比,后者多了一个消费端,两个消费端共同消费同一个队列中的消息,但是一个消息只能被一个消费者获取。
  5. 这个消息模型在**Web**应用程序中特别有用,可以处理短的**HTTP**请求窗口中无法处理复杂的任务

    2.2.2 默认公平消费原则导致,能者多劳问题?

  6. 背景

    1. 生产环境各个消费则的服务器性能并不一定相同,需要的是能者多劳的情况?
    2. 消费者二设置休眠,模拟处理业务慢的情况
    3. 结果:两个消费者各自消费了不同25条消息
  7. 问题
    1. 消费者一比消费者二的效率要低,一次任务的耗时较长
    2. 然而两人最终消费的消息数量是一样的
    3. 消费者二大量时间处于空闲状态,消费者一一直忙碌
  8. 产生的原因
    1. 它实现了默认公平消费原则,例如在收到两个消费者连接**MQ**的时候,会默认将消息平分给两个消费者
  9. 解决
    1. 通过**BasicQos**方法设置**prefetchCount = 1**
    2. 这样**Rabbit MQ**就会使得每个**Consumer**在同一个时间点最多处理**1个Message**
    3. 换句话说,在接收到该**Consumer****ack**前,他它不会将新的**Message**分发给它
    4. 相反,它会将其分派给不是仍然忙碌的下一个**Consumer**
    5. 值得注意的是:**prefetchCount**在手动**ack**的情况下才生效,自动**ack**不生效

      2.3 订阅模型


  1. 该模式的逻辑是,拥有自己的**exchange**,不再使用默认的**exchange**(当然也可以指定使用)
    1. 一个生产者多个消费者
    2. 每个消费者都有一个自己的队列
    3. 生产者没有将消息直接发送给队列,而是发送给**exchange**(交换机、转发器)
    4. 每个队列都需要绑定到交换机上
    5. 生产者发送的消息,经过交换机到达队列,实现一个消息被多个消费者消费
  2. 例子:注册->发邮件、发短信

2.3.1 广播模式 - Fanout

  1. 类似于一个广播的模式,将消息交给所有绑定到交换机的队列
  2. 每个发到 fanout 类型交换机的消息,都会被转发到与该交换机绑定的所有队列上
  3. 交换机不处理路由键,只是简单的将队列绑定到交换机上,每个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上
  4. fanout 类型转发消息是最快的?

    1. 为什么最快: 因为不需要匹配,不需要做规则路由

      2.3.2 完全匹配 - Direct

  5. 完全匹配模式所有发送到该交换机上的消息被转发到**"routing key"**中指定的队列

  6. 消息传递时,routing key 必须完全匹配才能被接受,否则消息会被抛弃
  7. 消息中的 routing key 如果和 Binding 中的 binding key 一致, 交换机就将消息发到对应的队列中,否则消息会被抛弃
  8. 该模式可以使用自带的 default exchange ,所有不需要有任何 binding 操作
  9. 例如

    1. 如果一个队列绑定到交换机要求路由键为 dog
    2. 则只转发 routing key 标记为 dog 的消息,
    3. 不会转发 dog.puppy ,也不会转发 dog.guard 等等,它是完全匹配、单播的模式。

      2.3.3 正则匹配 - Topic

  10. 规则的匹配、通配符,把消息交给符合**路由模式(routing pattern)**的队列,比如 **"tex.*"**的匹配

  11. 一个交换机和多个队列绑定,发送的消息会被任意一个吸收
  12. 所有队列吸收之和等于消息总数,接受的逻辑是随机的,相当于负载均衡

    6. 高级特性

    6.1 确认消息 - Confirm

    image.png

  13. 概念

    1. 消息的确认,指生产者投递消息后,如果 broker 确认收到消息
    2. 则会给我们生产者一个应答,生产者进行接受应答,用来确定这条消息是否正确发送到 broker
  14. 如何实现 confirm

    1. 在 channel 上开启确认模式:**channel.confirmSelect()**
    2. 在 channel 上添加监听:**addConfirmListener**,监听成功或者失败的返回结果,根据具体的据结果对消息进行重新发送,或者记录日志等处理

      6.2 返回消息 - Return

  15. 发消息的时候,指定的 exchange 和 routing key 不存在,mq会自动的抛弃这些消息

  16. 但是 return 消息会监听到这些消息,可以监听之后做一些处理,如记录日志…
  17. 如何开启这个功能,需要设置**mandatory**属性

    1. true:则监听器会接受到这些失败的消息
    2. false:那么 mq 就自动删除这些消息

      6.3 消息的ACKACk的流程图

      6.3.1 Broker到Producer的ACK

  18. mq 给生产者的 ack 是已经写好的,无需关心

    6.3.2 Consumer到Borker的ACK

  19. 概念

    1. 当消费者获取消息后,会向 Rabbit MQ 发送回执 ACK
    2. 告知消息已经被消费了,这种机制称为 MQ 的 ACK、或者称为消费端的 ACK
    3. 如果消费端消费失败了,也会给 MQ 回复一个 NACK,表示消费失败
  20. 两种 ACK 机制
    1. 自动 ACK:消息一旦被接收,消费者自动发送 ACK
    2. 手动 ACK:消息接收后,不会发送 ACK,需要手动调用(生产绝大多数使用)
  21. 如何抉择那种ack
    1. 如果消息不太重要,丢失也没有影响,那么自动ACK会比较方便
    2. 如果消息非常重要,不容丢失。那么最好在消费完成后手动 ACK,否则接收消息后就自动 ACK,Rabbit MQ 就会把消息从队列中删除。如果此时消费者宕机,那么消息就丢失
  22. 消费者进行消费的时候,由于业务异常导致消费失败,我们可以给 MQ 回复一个 NACK,然后进行日志的记录,然后进行补偿(建议真正使用的时候不要重回队列)
  23. 由于服务宕机等严重问题,那我们需要手工进行 ACK 保障消费端消费成功

    6.4 消费端的重回队列

    …….

    6.5 消费端的消息限流

  24. 场景问题

    1. MQ 服务器上有海量消息,例如上万条未处理的消息
    2. 这时候打开一个消费者客户端,会出现下面的情况,巨量的消息瞬间推送过来
    3. 我们单个、多个客户端无法同时处理这么多数据,所以我们需要做流量限制
  25. 解决方式
    1. MQ 提供了一种**QOS(服务质量保证)**功能
    2. 即在手动 ACK 的前提下,吐过一定数目的消息
    3. 通过基于 consumer 或者 channel 设置的 QOS 的值,未被ack确认前,不进行消费新的消息
    4. 例如
      1. 就是可以设置一个值,比如等于10,就代表如果有10条消息没处理完
      2. 就不ack,不ack就不在拉取后面的消息
  26. 开启配置的方式
    1. 如下:**Void BasicQos(unit prefetchSize,ushort prefetchCount,bool global);**
    2. 参数解释
      1. prefetchSize:报文大小
      2. prefetchCount:告诉 MQ 不要一次给消费者推送多余设置的值,即一旦消费者有N条消息还没有ack,则将该consumer block掉,直到有消息ack
      3. global:true/false,是否将上面设置应用于 channel 还是单个的 consumer ,true就是管道级别的,false是单个consumer

image.png

6.6 TTL消息、TTL队列

  1. TTL:time to live,即生存时间
  2. TTL message:指定某一个消息的存活时间
  3. TTL queue:指定进入这个队列的消息存活时间,注意不是指队列存储时间

    6.7 死信队列

    6.7.1 基础认知

  4. 死信队列:DLQ,dead letter queue

  5. 死信交换机:DLX,dead letter exchange
  6. 一般的 MQ 都叫死信队列,但是rabbit mq独有的exchange机制,也可以叫做死信交换机,这个交换机专门路由死信的消息
  7. 当一个消息,在一个队列中变成死信(dead message)之后,他能被重新 publish 到一个新的 exchange 称为DLX
  8. 消息如何死信
    1. NACK + 不重回队列
    2. 消息的TTL过期
    3. 队列达到最大长度

image.png

6.7.2 如何设置一个死信队列

  1. 首先需要设置死信队列的 exchange、queue,然后进行绑定
    1. exchange: dlx.exchange
    2. queue:dlx.queue
    3. routing key:#
  2. 然后声明一个正常的 exchange、queue、binding,知识需要加一个指定死信队列的参数
    1. **arguments.put("x-dead-letter-exchange","dlx.exchage")**

      6.8 集群架构模式

      6.8.1 主备模式

      master 对外提供服务,salve 作为备份,当 master 挂掉之后,salve 上台
      利用zk做故障切换
      image.png
      image.png

      6.8.2 远程模式

      早期的 Rabbit MQ 提供的一种多活的存储模式,主要是用来做异地的容灾备份(配置复杂,了解即可)

      6.8.3 镜像模式

      Mango DB、ES、Kafka都有这种概念
      image.png

      6.8.4 多活模式

      理解为升级版本的远程模式,不用很复杂的配置了
      image.png
      image.png
      image.png

      7. 基础组件搭建


  1. 一个 MQ 的体系架构,只需要关注两点,这两点做好了架构就完美
    1. 生产端怎么样不丢消息,也就是可靠性投递
    2. 消费端如何保证消息不被重复消费,也就是消费幂等性
    3. 至于:保障消费者是否成功消费消息,mq 已经实现(注意所有的 mq 都基本上实现)

7.1 消息如何保障100%被投递

7.1.1 基础认知

  1. 消息投递的流程
    1. producer 发消息 — broker 收到消息 — producer 收到 broker 回复的 ACK
    2. 确认应答想要保证消息100%,也就是保障上面的节点不出现问题
    3. 或者出现问题有补救措施(这指消完善的消息补偿机制)
  2. 解决方式

    1. 消息落库,对消息状态进行打标
    2. 解决方案流程(个人理解,类似于乐观锁的概念)
    3. 重试机制很重要

      7.1.2 架构流程

      image.png
  3. 名词解释

    1. Msg db:消息存储的数据库表
    2. Biz Service:业务逻辑
    3. Producer Component:监听组件
  4. 流程解析

    1. step1、step2:发消息之前记录数据库,这两步骤必须加事务,保证原子性
    2. step3:发送消息到 MQ
    3. step4:MQ 对生产者发送ack应答(系统会一直监听这个ack是否回复)
    4. step5:监听收到ack,系统会修改 msg db 表的状态(修改为发送成功)
    5. 这时候一直没有收到返回的ack,怎么办?
    6. step6:设置超时时间,比如五分钟没有收到ack,触发定时任务
    7. step7:触发定时任务,进行消息重发,循环走前6步
    8. 最后:触发多少次之后,短信通知技术人员,人工补偿

      7.2 重复消费问题

  5. 在海量订单业务高峰期,如何避免消息的重复消费问题(也就是保证消息消费的幂等性)

  6. 为什么会产生多条相同消息?
    1. 例如:MQ 返回给生产者 ACK 的时候出现故障,导致该条消息一直是发送失败状态
  7. 解决方式
    1. 消费端实现幂等性,代表消息永远不会被消费多次,即使收到了多条一样的消息
  8. 业界主流的幂等性解决方案
    1. 业务唯一ID或者指纹码机制,利用数据库主键去重