image.png
消息队列提供一个异步通信机制,消息的发送者不必一直等待到消息被成功处理才返回,而是立即返回。消息中间件负责处理网络通信,如果网络连接不可用,消息被暂存于队列当中,当网络畅通的时候在将消息转发给相应的应用程序或者服务,当然前提是这些服务订阅了该队列。如果在商品服务和订单服务之间使用消息中间件,既可以提高并发量,又降低服务之间的耦合度。
RabbitMQ就是这样一款我们苦苦追寻的消息队列。RabbitMQ是一个开源的消息代理的队列服务器,用来通过普通协议在完全不同的应用之间共享数据。
RabbitMQ是使用Erlang语言来编写的,并且RabbitMQ是基于AMQP协议的。Erlang语言在数据交互方面性能优秀,有着和原生Socket一样的延迟,这也是RabbitMQ高性能的原因所在。可谓“人如其名”,RabbitMQ像兔子一样迅速。

RabbitMQ除了像兔子一样跑的很快以外,还有这些特点:

  • 开源、性能优秀,稳定性保障
  • 提供可靠性消息投递模式、返回模式
  • 与Spring AMQP完美整合,API丰富
  • 集群模式丰富,表达式配置,HA模式,镜像队列模型
  • 保证数据不丢失的前提做到高可靠性、可用性

MQ典型应用场景:

  • 异步处理。把消息放入消息中间件中,等到需要的时候再去处理。
  • 流量削峰。例如秒杀活动,在短时间内访问量急剧增加,使用消息队列,当消息队列满了就拒绝响应,跳转到错误页面,这样就可以使得系统不会因为超负载而崩溃。
  • 日志处理
  • 应用解耦。假设某个服务A需要给许多个服务(B、C、D)发送消息,当某个服务(例如B)不需要发送消息了,服务A需要改代码再次部署;当新加入一个服务(服务E)需要服务A的消息的时候,也需要改代码重新部署;另外服务A也要考虑其他服务挂掉,没有收到消息怎么办?要不要重新发送呢?是不是很麻烦,使用MQ发布订阅模式,服务A只生产消息发送到MQ,B、C、D从MQ中读取消息,需要A的消息就订阅,不需要了就取消订阅,服务A不再操心其他的事情,使用这种方式可以降低服务或者系统之间的耦合。

AMQP协议和RabbitMQ

提到RabbitMQ,就不得不提AMQP协议。AMQP协议是具有现代特征的二进制协议。是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。
先了解一下AMQP协议中间的几个重要概念:

  • Server:接收客户端的连接,实现AMQP实体服务。
  • Connection:连接,应用程序与Server的网络连接,TCP连接。
  • Channel:信道,消息读写等操作在信道中进行。客户端可以建立多个信道,每个信道代表一个会话任务。
  • Message:消息,应用程序和服务器之间传送的数据,消息可以非常简单,也可以很复杂。有Properties和Body组成。Properties为外包装,可以对消息进行修饰,比如消息的优先级、延迟等高级特性;Body就是消息体内容。
  • Virtual Host:虚拟主机,用于逻辑隔离。一个虚拟主机里面可以有若干个Exchange和Queue,同一个虚拟主机里面不能有相同名称的Exchange或Queue。
  • Exchange:交换器,接收消息,按照路由规则将消息路由到一个或者多个队列。如果路由不到,或者返回给生产者,或者直接丢弃。RabbitMQ常用的交换器常用类型有direct、topic、fanout、headers四种,后面详细介绍。
  • Binding:绑定,交换器和消息队列之间的虚拟连接,绑定中可以包含一个或者多个RoutingKey。
  • RoutingKey:路由键,生产者将消息发送给交换器的时候,会发送一个RoutingKey,用来指定路由规则,这样交换器就知道把消息发送到哪个队列。路由键通常为一个“.”分割的字符串,例如“com.rabbitmq”。
  • Queue:消息队列,用来保存消息,供消费者消费。

下图是AMQP的协议模型:
RabbitMq - 图2

正如图中所看到的,AMQP协议模型有三部分组成:生产者、消费者和服务端。
生产者是投递消息的一方,首先连接到Server,建立一个连接,开启一个信道;然后生产者声明交换器和队列,设置相关属性,并通过路由键将交换器和队列进行绑定。同理,消费者也需要进行建立连接,开启信道等操作,便于接收消息。
接着生产者就可以发送消息,发送到服务端中的虚拟主机,虚拟主机中的交换器根据路由键选择路由规则,然后发送到不同的消息队列中,这样订阅了消息队列的消费者就可以获取到消息,进行消费。
最后还要关闭信道和连接。
RabbitMQ是基于AMQP协议实现的,其结构如下图所示,和AMQP协议简直就是一模一样。

RabbitMq - 图3

常用交换器

RabbitMQ常用的交换器类型有direct、topic、fanout、headers四种。
Direct Exchange
该类型的交换器将所有发送到该交换器的消息被转发到RoutingKey指定的队列中,也就是说路由到BindingKey和RoutingKey完全匹配的队列中。
RabbitMq - 图4
Topic Exchange
该类型的交换器将所有发送到Topic Exchange的消息被转发到所有RoutingKey中指定的Topic的队列上面。
Exchange将RoutingKey和某Topic进行模糊匹配,其中“”用来匹配一个词,“#”用于匹配一个或者多个词。例如“com.#”能匹配到“com.rabbitmq.oa”和“com.rabbitmq”;而”login.“只能匹配到“com.rabbitmq”。
RabbitMq - 图5
Fanout Exchange
该类型不处理路由键,会把所有发送到交换器的消息路由到所有绑定的队列中。优点是转发消息最快,性能最好。
RabbitMq - 图6
Headers Exchange
该类型的交换器不依赖路由规则来路由消息,而是根据消息内容中的headers属性进行匹配。headers类型交换器性能差,在实际中并不常用。

高级特性

过期时间(TTL)
Time To Live,也就是生存时间,是一条消息在队列中的最大存活时间,单位是毫秒。了解Redis的朋友应该一看就明白,二者很像。
RabbitMQ可以对消息和队列设置TTL。
RabbitMQ支持设置消息的过期时间,在消息发送的时候可以进行指定,每条消息的过期时间可以不同。
RabbitMQ支持设置队列的过期时间,从消息入队列开始计算,直到超过了队列的超时时间配置,那么消息会变成死信,自动清除。
如果两种方式一起使用,则过期时间以两者中较小的那个数值为准。
当然也可以不设置TTL,不设置表示消息不会过期;如果设置为0,则表示除非此时可以直接将消息投递到消费者,否则该消息将被立即丢弃。

消息确认
为了保证消息从队列可靠地到达消费者,RabbitMQ提供了消息确认机制。消费者订阅队列的时候,可以指定autoAck参数,当autoAck为true的时候,RabbitMQ采用自动确认模式,RabbitMQ自动把发送出去的消息设置为确认,然后从内存或者硬盘中删除,而不管消费者是否真正消费到了这些消息。当autoAck为false的时候,RabbitMQ会等待消费者回复的确认信号,收到确认信号之后才从内存或者磁盘中删除消息。
消息确认机制是RabbitMQ消息可靠性投递的基础,只要设置autoAck参数为false,消费者就有足够的时间处理消息,不用担心处理消息的过程中消费者进程挂掉后消息丢失的问题。
持久化
消息的可靠性是RabbitMQ的一大特色,那么RabbitMQ是如何保证消息可靠性的呢?答案就是消息持久化。持久化可以防止在异常情况下丢失数据。RabbitMQ的持久化分为三个部分:交换器持久化、队列持久化和消息的持久化。
交换器持久化可以通过在声明队列时将durable参数设置为true。如果交换器不设置持久化,那么在RabbitMQ服务重启之后,相关的交换器元数据会丢失,不过消息不会丢失,只是不能将消息发送到这个交换器了。
队列的持久化能保证其本身的元数据不会因异常情况而丢失,但是不能保证内部所存储的消息不会丢失。要确保消息不会丢失,需要将其设置为持久化。队列的持久化可以通过在声明队列时将durable参数设置为true。
设置了队列和消息的持久化,当RabbitMQ服务重启之后,消息依然存在。如果只设置队列持久化或者消息持久化,重启之后消息都会消失。
当然,也可以将所有的消息都设置为持久化,但是这样做会影响RabbitMQ的性能,因为磁盘的写入速度比内存的写入要慢得多。对于可靠性不是那么高的消息可以不采用持久化处理以提高整体的吞吐量。鱼和熊掌不可兼得,关键在于选择和取舍。在实际中,需要根据实际情况在可靠性和吞吐量之间做一个权衡。
死信队列
当消息在一个队列中变成死信之后,他能被重新发送到另一个交换器中,这个交换器成为死信交换器,与该交换器绑定的队列称为死信队列。。
消息变成死信有下面几种情况:

  • 消息被拒绝。通过调用basic.reject或者basic.nack并且设置requeue=false。
  • 消息过期
  • 队列达到最大长度

DLX也是一个正常的交换器,和一般的交换器没有区别,他能在任何的队列上面被指定,实际上就是设置某个队列的属性。当这个队列中有死信的时候,RabbitMQ会自动将这个消息重新发送到设置的交换器上,进而被路由到另一个队列,我们可以监听这个队列中消息做相应的处理。
死信队列设置:

  • 设置死信队列的exchange和queue,然后进行绑定
    • Exchange:dlx.exchange
    • Queue:dlx.queue
    • RoutingKey:#
  • 然后进行正常声明交换器、队列、绑定,只不过我们需要在队列上加一个参数即可:arguments.put(“x-dead-letter-exchange”,“dlx.exchange”)

死信队列有什么用?
当发生异常的时候,消息不能够被消费者正常消费,被加入到了死信队列中。后续的程序可以根据死信队列中的内容分析当时发生的异常,进而改善和优化系统。
延迟队列
一般的队列,消息一旦进入队列就会被消费者立即消费。延迟队列就是进入该队列的消息会被消费者延迟消费,延迟队列中存储的对象是的延迟消息,“延迟消息”是指当消息被发送以后,等待特定的时间后,消费者才能拿到这个消息进行消费。
延迟队列用于需要延迟工作的场景。最常见的使用场景:淘宝或者天猫我们都使用过,用户在下单之后通常有30分钟的时间进行支付,如果这30分钟之内没有支付成功,那么订单就会自动取消。除了延迟消费,延迟队列的典型应用场景还有延迟重试。比如消费者从队列里面消费消息失败了,可以延迟一段时间以后进行重试。

与springBoot整合

Spring Boot使用了这么久,套路差不多摸清楚了。Spring Boot与其他组件进行整合,无非就是加入pom依赖,接着配置一些基本信息,然后就可以使用相关注解进行开发了。
RabbitMQ也是相同的套路,第一步要引入依赖。要引入的依赖比较容易记,RabbitMQ实现了AMQP协议,引入依赖spring-boot-starter-amqp

  1. <!-- rabbitmq依赖 -->
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-amqp</artifactId>
  5. </dependency>

第二步要配置RabbitMQ连接信息,包括主机、端口号、用户名和密码。RabbitMQ配置信息:

  1. spring.rabbitmq.host=192.168.16.128
  2. spring.rabbitmq.port=5672
  3. spring.rabbitmq.username=guest
  4. spring.rabbitmq.password=guest

如果没有安装RabbitMQ,我推荐使用Docker快速安装和启动,启动命令:

  1. docker run -d --hostname my-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3.8.0-beta.4-management
  • 实现生产者和消费者

第三步实现生产和消费者。

  • 生产者

生产者用来生产消息并进行发送。需要用到RabbitTemplateRabbitTemplate是发送消息的关键类,convertAndSend方法可以指定消息发送的交换器、路由键、消息内容等。

  1. @Component
  2. public class Producer {
  3. @Autowired
  4. RabbitTemplate rabbitTemplate;
  5. public void produce() {
  6. String message = new Date() + "Beijing";
  7. System.out.println("生产者生产消息=====" + message);
  8. rabbitTemplate.convertAndSend("rabbitmq_queue", message);
  9. }
  10. }
  • 消费者

消费者消费生产者发送的消息。实现消费者主要用到注解@RabbitListener@RabbitListener是一个功能强大的注解。这个注解里面可以注解配置@QueueBinding、@Queue、@Exchange直接通过这个组合注解一次性搞定多个交换机、绑定、路由、并且配置监听功能等。

  1. 在RabbitMQ控制面板创建好队列,使用@RabbitListener监听队列。

    1. @RabbitListener(queues = "rabbitmq_queue")

    2.使用@RabbitListener自动创建队列。

    1. @RabbitListener(queuesToDeclare = @Queue("myQueue"))

    3.使用@RabbitListener自动创建队列,并对Exchange和Queue进行绑定。

    1. @RabbitListener(bindings = @QueueBinding(value = @Queue("myQueue"), key = "mobi", exchange = @Exchange("myExchange")))

    本文使用@RabbitListener自动创建一个队列。

    1. @Component
    2. public class Consumer {
    3. @RabbitHandler
    4. @RabbitListener(queuesToDeclare = @Queue("rabbitmq_queue"))
    5. public void process(String message) {
    6. System.out.println("消费者消费消息=====" + message);
    7. }
    8. }

    测试
    第四步测试。为了方便,写一个测试类生产消息。然后启动工程,运行测试类,使生产者发送消息,不出意外消费者将会消费消息,在控制台输出信息。 ```java @RunWith(SpringRunner.class) @SpringBootTest public class ApplicationTests {

    @Autowired Producer producer;

    @Test public void contextLoads() {

    1. producer.produce();

    }

}

  1. 操作之后可以在测试控制台看到生产者消息发送成功,在该工程的控制台看到消息消费成功。<br />访问RabbitMQ控制面板也会看到有消息。<br />![](https://cdn.nlark.com/yuque/0/2020/png/1683429/1596438671417-f601376c-3e4a-431c-8e63-90e8e9dd4b89.png#align=left&display=inline&height=421&margin=%5Bobject%20Object%5D&originHeight=421&originWidth=1125&size=0&status=done&style=none&width=1125)
  2. <a name="wTpQ8"></a>
  3. ### 使用docker构建rabbitMq高可用负载均衡集群
  4. 本文使用Docker搭建RabbitMQ集群,然后使用HAProxy做负载均衡,最后使用KeepAlived实现集群高可用,从而搭建起来一个完成了RabbitMQ高可用负载均衡集群。受限于自身条件,本文使用VMware虚拟机的克隆功能克隆了两台服务器进行操作,仅作为一个demo,开发中可根据实际情况进行调整。<br />为什么要搭建高可用负载均衡集群?一句话来说就是:引入消息队列,可以实现异步操作、流量削峰、应用解耦等好处,但是消息队列毕竟是一把双刃剑,带了这些好处的同时也会使系统可用性、稳定性降低。对于RabbitMQ而言,它本身并不是分布式的(对比Kafka),所以我们要搭建RabbitMQ的集群来实现高可用。<br />首先看下RabbitMQ高可用负载均衡集群长什么样子:<br />![](https://cdn.nlark.com/yuque/0/2020/png/1683429/1596440549928-b8808f6f-02ca-4634-90fe-8b27fbed995b.png#align=left&display=inline&height=423&margin=%5Bobject%20Object%5D&originHeight=423&originWidth=859&size=0&status=done&style=none&width=859)<br />使用Docker构建RabbitMQ高可用负载均衡集群大概分为三个步骤:
  5. 1. 启动多个(3个为例)RabbitMQ,构建RabbitMQ集群,并配置为镜像模式。
  6. 1. 使用HAProxy做负载均衡。
  7. 1. 使用KeepAlived实现高可用。
  8. <a name="gz8vZ"></a>
  9. #### 一、构建RabbitMQ集群
  10. 1. 启动多个RabbitMQ节点<br />使用Docker启动3RabbitMQ节点,目标如下表所示:
  11. | 服务器ip | 端口 | hostname | 管理界面地址 |
  12. | --- | --- | --- | --- |
  13. | 192.168.16.128 | 5672 | my-rabbit1 | 192.168.16.128:15672 |
  14. | 192.168.16.128 | 5673 | my-rabbit2 | 192.168.16.128:15673 |
  15. | 192.168.16.128 | 5674 | my-rabbit3 | 192.168.16.128:15674 |
  16. ```java
  17. docker run -d --hostname my-rabbit1 --name rabbit1 -p 5672:5672 -p 15672:15672 -e RABBITMQ_ERLANG_COOKIE='rabbitcookie' rabbitmq:3.8.0-beta.4-management
  18. docker run -d --hostname my-rabbit2 --name rabbit2 -p 5673:5672 -p 15673:15672 -e RABBITMQ_ERLANG_COOKIE='rabbitcookie' --link rabbit1:my-rabbit1 rabbitmq:3.8.0-beta.4-management
  19. docker run -d --hostname my-rabbit3 --name rabbit3 -p 5674:5672 -p 15674:15672 -e RABBITMQ_ERLANG_COOKIE='rabbitcookie' --link rabbit1:my-rabbit1 --link rabbit2:my-rabbit2 rabbitmq:3.8.0-beta.4-management

注意:由于Erlang节点间通过认证Erlang cookie的方式来允许互相通信,所以RABBITMQ_ERLANG_COOKIE必须设置为相同的。
启动完成之后,使用docker ps命令查看运行情况,确保RabbitMQ都已经启动。

  1. CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
  2. 2d6f612fdc8e rabbitmq:3.8.0-beta.4-management "docker-entrypoint..." 5 seconds ago Up 4 seconds 4369/tcp, 5671/tcp, 15671/tcp, 25672/tcp, 0.0.0.0:5674->5672/tcp, 0.0.0.0:15674->15672/tcp rabbit3
  3. c410aa73ce68 rabbitmq:3.8.0-beta.4-management "docker-entrypoint..." 14 seconds ago Up 14 seconds 4369/tcp, 5671/tcp, 15671/tcp, 25672/tcp, 0.0.0.0:5673->5672/tcp, 0.0.0.0:15673->15672/tcp rabbit2
  4. ceb28620d7b1 rabbitmq:3.8.0-beta.4-management "docker-entrypoint..." 24 seconds ago Up 23 seconds 4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, 15671/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp rabbit1
  1. 加入集群
    内存节点和磁盘节点的选择:
    每个RabbitMQ节点,要么是内存节点,要么是磁盘节点。内存节点将所有的队列、交换器、绑定、用户等元数据定义都存储在内存中;而磁盘节点将元数据存储在磁盘中。单节点系统只允许磁盘类型的节点,否则当节点重启以后,所有的配置信息都会丢失。如果采用集群的方式,可以选择至少配置一个节点为磁盘节点,其余部分配置为内存节点,,这样可以获得更快的响应。所以本集群中配置节点1位磁盘节点,节点2和节点3位内存节点。
    集群中的第一个节点将初始元数据代入集群中,并且无须被告知加入。而第2个和之后加入的节点将加入它并获取它的元数据。要加入节点,需要进入Docker容器,重启RabbitMQ。
    设置节点1:
    1. docker exec -it rabbit1 bash
    2. rabbitmqctl stop_app
    3. rabbitmqctl reset
    4. rabbitmqctl start_app
    5. exit
    设置节点2:
    1. docker exec -it rabbit2 bash
    2. rabbitmqctl stop_app
    3. rabbitmqctl reset
    4. rabbitmqctl join_cluster --ram rabbit@my-rabbit1
    5. rabbitmqctl start_app
    6. exit
    设置节点3:
    1. docker exec -it rabbit3 bash
    2. rabbitmqctl stop_app
    3. rabbitmqctl reset
    4. rabbitmqctl join_cluster --ram rabbit@my-rabbit1
    5. rabbitmqctl start_app
    6. exit
    节点设置完成之后,在浏览器访问192.168.16.128:15672、192.168.16.128:15673和192.168.16.128:15674中任意一个,都会看到RabbitMQ集群已经创建成功。

RabbitMq - 图7
3. 配置镜像队列
镜像队列工作原理:在非镜像队列的集群中,消息会路由到指定的队列。当配置为镜像队列之后,消息除了按照路由规则投递到相应的队列外,还会投递到镜像队列的拷贝。也可以想象在镜像队列中隐藏着一个fanout交换器,将消息发送到镜像的队列的拷贝。
进入任意一个RabbitMQ节点,执行如下命令:

  1. rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'

可以设置镜像队列,”^”表示匹配所有队列,即所有队列在各个节点上都会有备份。在集群中,只需要在一个节点上设置镜像队列,设置操作会同步到其他节点。
查看集群的状态:

  1. rabbitmqctl cluster_status

二、HAProxy负载均衡

第一步构建RabbitMQ集群只是构建高可用负载均衡集群的基础,下面将使用HAProxy为RabbitMQ集群做负载均衡。
Haproxy 是目前比较流行的一种群集调度工具,是使用C语言编写的自由及开放源代码软件,其提供高可用性、负载均衡,以及基于TCP和HTTP的应用程序代理。同类群集调度工具有很多,如LVS 和 Nginx 。相比较而言,LVS 性能最好,但是搭建相对复杂,Nginx的upstream模块支持群集功能,但是对群集节点的健康检查功能不强,性能没有HAProxy 好。
对于调度算法本文采用最简单最常用的轮询算法。
本来想采用Docker的方式拉取并运行HAProxy镜像,折腾了好几天搞不定,HAProxy启动不了,故采用源码安装的方式安装HAProxy。
配置两个HAProxy节点实现负载均衡:

服务器ip 端口号 管理界面地址
192.168.16.128 8888 http://192.168.16.128:8888/haproxy
192.168.16.129 8888 http://192.168.16.129:8888/haproxy

1. 安装HAProxy

  1. 下载

由于到官网下载需要kexue上网,这里提供百度云链接。
链接: https://pan.baidu.com/s/1uaSJa3NHFiE1E6dk7iHMwQ 提取码: irz6

  1. 将haproxy-1.7.8.tar.gz拷贝至/opt目录下,解压缩:

    1. tar zxvf haproxy-1.7.8.tar.gz
  2. 进入目录,编译成可执行文件。

将源代码解压之后,需要运行make来将HAProxy编译成为可执行文件。如果是在Linux2.6系统上面进行编译的话,需要设置TARGET=linux26以开启epoll支持,这也是为什么网上许多博客里面都是这么写的。对于其他的UNIX系统来说,直接采用TARGET=generic方式,本文进行安装的系统为CentOS7 ,内核3.10版本。

  1. cd haproxy-1.7.8
  2. make TARGET=generic
  1. 执行完毕之后,目录下出现haproxy的可执行文件。
  2. 2. 配置HAProxy
  3. HAProxy配置文件说明
  4. HAProxy配置文件通常分为三个部分,即globaldefaultslistenglobal为全局配置,defaults为默认配置,listen为应用组件配置。
  5. global为全局配置部分,属于进程级别的配置,通常和使用的操作系统配置相关。
  6. defaults配置项配置默认参数,会被应用组件继承,如果在应用组件中没有特别声明,将使用默认配置参数。
  7. 以配置RabbitMQ集群的负载均衡为例,在安装目录下面新建一个haproxy.cfg,输入下面配置信息: