前言

学习RabbitMQ就把它理解为一个简单的队列,数据从一头进,从另一头出来而已。但是它提供了非常丰富的 API,基本能想到的功能它都有,我们要做的只是调用相应的API即可。不用畏惧,干起来!

图解RabbitMQ

下面的几幅图对理解RabbitMQ中的生产者、消费者、交换器、路由键、绑定键、队列、连接等非常有帮助,但本文观看顺序不一定是从这几幅图开始,只是强调非常重要而已!!!

文中的Demo已经上传到GitHub,戳我

image.png

image.png
image.png
image.png
image.png
image.png

RabbitMQ知识点概览

image.png

RabbitMQ安装

我在mac下用HomeBrew安装就行,网上很多教程。也可以从官网下载 。ebin/rabbit.app 是它的核心配置文件,可以根据需要修改。sbin 目录下是一些启动脚本,例如启动:./rabbitmq-server start

Docker 安装

拉取镜像

docker pull rabbitmq:management

运行

docker run -di —name rabbitmq- -p 5671:5671 -p 5672:5672 -p 15671:15671 -p 15672:15672 -p 25672:25672 -p 4369:4369 镜像id

访问

在浏览器输入:http://localhost:15672 默认用户名和密码是:guest

RabbitMQ常用命令

rabbitmqctl 命令比较好用,具体可以网上搜下,或者参考
image.png

RabbitMQ简介

RabbitMQ跟Kafka一样也是一个消息中间件,类似的的还有阿里的RocketMQ 。一个很经典的应用场景就是用户下单,将订单消息放入RabbitMQ中,而不是马上入库;另外还有下单支付倒计时等等,在交易系统中消息中间件有着广泛的应用。

image.png

AMQP 相关概念

高级消息队列协议,是一种应用层的通信协议,建立在 TCP 之上。它定义了一些标准,相当于我们开发中定义的一些接口,而不管你底层是如何实现的,但必须接受它的标准。下面 AMQP 定义了一些概念的的层次图:
image.png

Server

又称 Broker,接受客户端的连接,提供实体服务。

Connection

连接,应用程序与 Broker 的网络连接。

Channel

网信通道,几乎所有的操作都是在 Channel 中进行的,客户端可以建立多个 Channel,一个 Channel 代表一个会话任务。

Message

消息,需要传输的数据,由 Properties 和 Body 组成,Properties 可以对消息进行修饰说明,Body 是消息的具体内容。

Virtual Host

虚拟地址,用于逻辑隔离,方便消息路由和数据划分。

Exchange

交换机,接受信息,根据路由键转发消息到绑定的队列。

Binding

绑定,交换机和队列之间的虚拟连接,binding 中可以包含 routing key。

Routing Key

路由键,一个消息的路由规则

Queue

消息队列,保存消息的一个队列结构。

RabbitMQ概念

image.png

RabbitMQ使用

image.png

RabbitMQ高级特性

image.png
生产者执行过程:

  1. 生产者连接到一个RabbitMQ Broker,建立连接,开启一个信道
  2. 生产者声明一个交换器(交换器类型,是否持久化等)
  3. 生产者声明一个队列并设置相关属性(是否持久化,是否自动删除等)
  4. 生产者通过路由键将交换器和队列绑定起来
  5. 生产者发送消息到RabbitMQ Broker
  6. 相应的交换器根据收到的路由键查找匹配的对列
  7. 如果找到,将消息存入对应的队列中,否则根据生产者配置属性选择丢弃或退还给生产者
  8. 关闭信道
  9. 关闭连接

消费者执行过程:

  1. 消费者连接到一个RabbitMQ Broker,建立连接,开启一个信道
  2. 消费者向RabbitMQ Broker请求消费相应队列中的消息,可能会设置相应的回调函数
  3. 等待RabbitMQ Broker回应并投递相应队列中的消息,消费者接受消息
  4. 消费者确认(ack)接收到的信息
  5. RabbitMQ 从队列中删除相应已被确认的消息
  6. 关闭信道
  7. 关闭连接

一句话总结:
生产者将消息发送给交换器,交换器和队列绑定 。当生产者发送消息时所携带的 RoutingKey与绑定时的 BindingKey 相匹配时,消息会被存入相应的队列之中。消费者可以订阅相应的队列来获取消息。

消息回退

mandatory 参数告诉服务器至少将该消息路由到一个队列中,否则将消息返回给生产者,可以设置addReturnListener中的回退业务逻辑,如果未设置mandatory参数,该消息会被丢弃
_

交换器类型

fanout

它会把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中,不需要路由。需要将消息一次发给多个队列时,可以使用这种类型。

direct

该交换器是默认交换器,是个空字符串 “”,不想指定交换器,可以使用这种类型。它会把消息路由到那些 BindingKey和 RoutingKey 完全匹配的队列中,一个消息只会被消费一次。

topic

BindingKey和 RoutingKey可以根据规则进行模糊匹配,路由到相应的队列。
规则一:RoutingKey用一个点“.”进行分隔
规则二:BindingKey和RoutingKey都是用点“.”来分隔
规则三:BindingKey可以存在“”和“#”来做模糊匹配,“”代表一个单词,“#”代表0个或多个单词

headers

它不依赖于RoutingKey的匹配来路由消息,而是根据消息内容中的headers来进行匹配。该类型的交换器,性能差也不实用,所以一般不用。
**

死信队列

Dead-Letter-Exchange,简称DLX,死信交换器。当一个消息在队列中变成死信之后,会被发送到死信交换器中,绑定DLX的队列称为死信队列。

死信队列也只是一个普通队列而已,只不过他是专门用来处理死信的。

死信队列也可以作为延迟队列来使用。

出现死信的原因

  1. 消息被拒绝
  2. 消息过期
  3. 队列达到最大长度

优先级队列

优先级高的队列,具有先被消费的权利

生产者消息确认

为保证消息能被正确达到服务器(也就是RabbitMQ的交换器中),RabbitMQ提供了两种解决方案。

事务机制

事务机制来保证消息正确消费,但是性能差。该机制是同步的,会阻塞等待信道返回确认

发送方确认

生产者将信道设置为confirm模式后,会把发布在该信道上的消息加上一个唯一ID(从1开始),一旦消息投递到队列后,会返回一个Basic.ACK(携带着唯一ID)给生产者,这样就知道已经投递成功了。

如果消息是持久化的,那么在确认消息的deliveryTag中也包含唯一ID。

该机制是异步的,可以通过回调来处理确认的消息。

消息只消费一次

RabbitMQ没有去重机制来保证一条消息只被消费一次。例如生产者使用confirm机制时,发送了一条消息,等待RabbitMQ返回确认通知,但此时网络断开,生产者捕获了异常,为了保证消息的可靠性会选择重新发送,这样就会导致该消息被重复消费。

因此,需要在业务中进行去重处理

存储机制

事实上,在RabbitMQ中不管是持久化或非持久化消息都可以被写入磁盘。

持久化的消息在到达队列 时就被写入到磁盘,并且如果可以,持久化的消息也会在内存中保存一份备份,这样可以提高 一定的性能,当内存吃紧的时候会从内存中清除。非持久化的消息一般只保存在内存中 ,在内 存吃紧的时候会被换入到磁盘中,以节省内存空间。

惰性队列

在3.6.6版本开始引入了惰性队列,它的目的是为了能支持更长的队列。

当声明一个队列是惰性队列时,会将消息直接落盘,不管是持久化还是非持久化的,这样减少了内存的消耗,但增加了I/O的使用。

注意:即使消息会落盘,但对于非持久化消息来说,一旦重启还是会丢失。

惰性队列的使用场景是当消费者下线、宕机,为避免在内存中大量堆积,这时候是非常适用的。

消息可靠性投递

生产端投递

对生产端的消息进行持久化,然后用定时任务定时去获取未被消费的消息,让消费者去消费

幂等性

多次相同的请求操作只会影响一次结果,例如使用数据库版本号的方式来执行更新操作:

  1. update set stock = stock - 1,version = version + 1 where version = 5

方案有:

  • 数据库唯一主键的方式
  • Redis 原子性去重


管理台的使用

RabbitMQ 提供了一个管理控制台,可以直接操作。

创建一个队列

image.png

可以看到刚才创建的一个队列
image.png

创建一个交换器

image.png

可以看到刚才创建的一个交换器
image.png
image.png
image.png

参考资料