01|环境搭建

系统环境 CentOS7
erlang v23.0.2 下载地址
rabbitmq 3.8.5 下载地址

erlang和rabbitmq兼容关系说明

1、安装依赖

  1. yum install socat -y

2、安装erlang

  1. # 下载安装包
  2. wget -i https://github.com/rabbitmq/erlang-rpm/releases/download/v23.0.2/erlang-23.0.2-1.el7.x86_64.rpm
  3. # 安装erlang
  4. rpm -ivh erlang-23.0.2-1.el7.x86_64.rpm

3、安装rabbitmq

  1. wget -i https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.5/rabbitmq-server-3.8.5-1.el7.noarch.rpm
  2. rpm -ivh rabbitmq-server-3.8.5-1.el7.noarch.rpm

4、启用配置

  1. # 启用RabbitMQ的管理插件
  2. rabbitmq-plugins enable rabbitmq_management
  3. # 启动MQ服务
  4. systemctl start rabbitmq-server.service
  5. # 添加账号
  6. rabbitmqctl add_user root 123456
  7. # 设置用户角色信息
  8. rabbitmqctl set_user_tags root administrator
  9. # 设置用户权限
  10. rabbitmqctl set_permissions -p "/" root ".*" ".*" ".*"

用户角色信息说明:

tag 说明
management 1. 可以使用AMQP协议登录的虚拟主机的权限
2. 查看它们能登录的所有虚拟主机中所有队列、交换器和绑定的权限
3. 查看和关闭它们自己的通道和连接的权限
4. 查看它们能访问的虚拟主机中的全局统计信息,包括其他用户的活动
policymaker 所有management标签可以做的,加上:
1. 在它们能通过AMQP协议登录的虚拟主机上,查看、创建和删除策略以及虚拟主机参数的权限
monitoring 所有management能做的,加上:
1. 列出所有的虚拟主机,包括列出不能使用消息协议访问的虚拟主机的权限
2. 查看其他用户连接和通道的权限
3. 查看节点级别的数据如内存使用和集群的权限
4. 查看真正的全局所有虚拟主机统计数据的权限
administrator 所有policymaker和monitoring能做的,加上:
1. 创建删除虚拟主机的权限
2. 查看、创建和删除用户的权限
3. 查看、创建和删除权限的权限
4. 关闭其他用户连接的权限

02|RabbitMQ架构

2.1|RabbitMQ整体逻辑架构

image.png

RabbitMQ组件功能说明:

  • Broker:标识消息队列服务器实体
  • Virtual Host:虚拟主机。标识一批交换机、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个vhost本质上就是一个mini版的RabbitMQ服务器,拥有自己的队列、交换器、绑定和权限机制。vhost是AMQP概念的基础,必须在连接时指定,RabbitMQ默认的vhost是”/“
  • Exceange:交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列,常用的交换器类型有:fanout、direct、topic
  • Quene:消息队列,作为消息的容器,用来保存消息直到发送给消费者。
  • Binding:用于消息队列和交换机之间的关联。一个绑定就是基于路由键将交换机和消息队列连接起来的路由规则。
  • Channel:多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟连接,AMQP命令都是通过信道完成。因为对于操作系统来说,建立和销毁TCP都是非常昂贵的开销,所以引入来信道的概念,以复用一条TCP连接。

    2.2|RabbitMQ数据存储

    存储机制:

    RabbitMQ消息有两种类型:

  • 持久化消息

  • 非持久化消息

这两种消息在到达队列时写入磁盘,同时会在内充中保存一份备份,当内存吃紧时,消息从内存中清除。非持久化消息一般只存于内存中,当内存压力大时数据刷盘处理,以节省内存空间。
RabbitMQ存储包含两个部分:

  • 队列索引(index)
  • 消息存储(store)

消息(包括消息头、消息体、属性)可以直接存储在index中,也可以存储在store中。最佳的方式是较小的消息存在index 中,而较大的消息存在store中。这个消息大小的界定可以通过queue_index_embed_msg_below来配置,默认为4096B。

队列索引

索引维护队列的落盘消息的信息,如存储地点、是否已被消费者接收、是否已被消费者ACK等。索引使用顺序的段文件来存储,后缀为.idx,文件名依次累加,每个段文件中包含固定的segment_entry_count条记录(默认值是16384)。

消息存储

消息以键值对的形式存储到文件中,一个虚拟主机上的所有队列使用同一块存储,每个节点只有一个。存储分为持久化存储和临时存储。持久化存储的内容在broker重启后不会消失,临时存储的内容在broker重启后丢失。store使用文件来存储,后缀为.rdq,经过store处理的所有消息都会以追加的方式写入到该文件中。

队列状态

RabbitMQ队列有以下4种状态:

  • alpha:消息索引和消息内容都在内存,最耗内存,很少消耗CPU
  • beta:消息索引在内存中,消息内容存磁盘
  • gama:消息索引分布在内容和磁盘中,消息内容存磁盘
  • delta:消息索引和内容都存磁盘

消息存入队列后,不是固定不变的,它会随着系统的负载在队列中不断流动,消息的状态会不断发送变化。持久化的消息,索引和内容都必须保存在磁盘上。当系统负载较高,消息堆积较多时,消息就会溢写到磁盘,进而导致性能下降。

03|RabbitMQ工作流程

image.png

3.1|生成者发送消息流程

  • 连接RabbitMQ,建立TCP连接(Connection),开启信道(Channel)
  • 声明Exchange并设置相关属性
  • 声明Queue并设置相关属性
  • 通过BindingKey将Exchange和Queue绑定起来
  • 发送消息至Broker,其中包含RoutingKey、Exchange等信息
  • 相应的交换器根据接收到的RoutingKey查找相匹配的队列。如找到,则将消息存入相应的队列中;如没有找到,则依据策略选择丢弃或退换给生产者。
  • 关闭信道,关闭连接(放回连接池)

    3.2|消费者接收消息流程

  • 连接RabbitMQ,建立TCP连接(Connection),开启信道(Channel)

  • 请求消费相应队列中的消息
  • 等待回应,接收消息,确认(ack)接收到的消息
  • 从队列中删除相应已经被ack的消息
  • 关闭信道,关闭连接(放回连接池)

生产者和消费者需要与RabbitMQ建立TCP连接,也就是Connection,一旦TCP连接建立起来,客户端紧接着需要创建一个AMQP信道(Channel),每个信道都会被指派一个唯一的ID,Channel是建立在Connection之上的虚拟连接,RabbitMQ处理每条AMQP指令都是通过信道完成的,RabbitMQ采用类型NIO的做法,复用TCP连接,减少性能开销,便于管理。

3.3|RabbitMQ工作模式

在RabbitMQ中,生产者不是将消息直接发送给消息队列,实际上生产者不知道一个消息被发送到哪个队列,生产者只会将消息直接发送给交换器,交换器在接收到消息之后已经配置的规则将消息投递到对应的消息队列中。

  • publish/subscribe:发布订阅

交换器类型为fanout,会将所有发送到该交换器的消息路由到所有与该交换器绑定的队列中
原生API:
ConnectionFactoryUtils:

  1. public class ConnectionFactoryUtils {
  2. public static ConnectionFactory factory(){
  3. ConnectionFactory factory = new ConnectionFactory();
  4. factory.setHost("local1");
  5. factory.setVirtualHost("/");
  6. factory.setUsername("root");
  7. factory.setPassword("123456");
  8. factory.setPort(5672);
  9. return factory;
  10. }
  11. }
  1. String EX_NAME = "ex.fanout";
  2. @Test
  3. public void producer() throws Exception{
  4. ConnectionFactory factory = ConnectionFactoryUtils.factory();
  5. try (Connection connection = factory.newConnection();
  6. Channel channel = connection.createChannel()) {
  7. channel.exchangeDeclare(EX_NAME, BuiltinExchangeType.FANOUT, true);
  8. final AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().deliveryMode(2).build();
  9. channel.basicPublish(EX_NAME, "", properties, "message pub sub".getBytes());
  10. }
  11. }
  12. @Test
  13. public void consumer() throws Exception{
  14. ConnectionFactory factory = ConnectionFactoryUtils.factory();
  15. try (Connection connection = factory.newConnection();
  16. Channel channel = connection.createChannel()) {
  17. channel.exchangeDeclare(EX_NAME, BuiltinExchangeType.FANOUT, true);
  18. // 生成随机队列
  19. final String queueName = channel.queueDeclare().getQueue();
  20. // 绑定到交换机
  21. channel.queueBind(queueName,EX_NAME,"");
  22. channel.basicConsume(queueName, (consumerTag, delivery) -> {
  23. String message = new String(delivery.getBody(), "UTF-8");
  24. System.out.println(" Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
  25. }, consumerTag -> {
  26. });
  27. Thread.sleep(Integer.MAX_VALUE);
  28. }
  29. }
  • routing:路由模式

交换器类型为direct,会将消息路由到那些BindingKey和RoutingKey完全匹配的队列中

  1. public static final String EX_NAME = "ex.routing";
  2. public static final String QUEUE_NAME = "queue.routing";
  3. String[] level = {"info","warn","error"};
  4. @Test
  5. public void producer() throws Exception{
  6. ConnectionFactory factory = ConnectionFactoryUtils.factory();
  7. try (Connection connection = factory.newConnection();
  8. Channel channel = connection.createChannel()) {
  9. channel.exchangeDeclare(EX_NAME, BuiltinExchangeType.DIRECT,true);
  10. for (int i = 0; i < 100; i++) {
  11. String message = EX_NAME + ": message :" + i;
  12. final AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
  13. final AMQP.BasicProperties properties = builder.deliveryMode(2).build();
  14. channel.basicPublish(EX_NAME,level[i%3], properties,message.getBytes());
  15. }
  16. }
  17. }
  18. @Test
  19. public void consumer(){
  20. Arrays.stream(level).parallel().forEach(rk->{
  21. try {
  22. consumerByRoutingKey(rk);
  23. } catch (Exception e) {
  24. e.printStackTrace();
  25. }
  26. });
  27. }
  28. public void consumerByRoutingKey(String rKey)throws Exception{
  29. ConnectionFactory factory = ConnectionFactoryUtils.factory();
  30. try(Connection connection = factory.newConnection();
  31. Channel channel = connection.createChannel()){
  32. channel.exchangeDeclare(EX_NAME, BuiltinExchangeType.DIRECT,true);
  33. String queueName = QUEUE_NAME+"."+rKey;
  34. channel.queueDeclare(queueName,true,false,false,null);
  35. channel.queueBind(queueName, EX_NAME, rKey);
  36. channel.basicConsume(queueName, (consumerTag, delivery) -> {
  37. String message = new String(delivery.getBody(), "UTF-8");
  38. System.out.println(" Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
  39. }, consumerTag -> {});
  40. Thread.sleep(1000_000);
  41. }
  42. }
  • topic:主题模式

升级版的routing模式,交换器类型为topic,topic类型的交换器在direct匹配规则上进行来拓展,也是将消息路由到BindingKey和RoutingKey匹配的规则中,它约定:BindingKey和RoutingKey一样都是有”.”分割的字符串,BindingKey中可以存在两种特殊字符”“和”#”,用于模糊匹配,其中”“用于匹配一个单词,”#”用于匹配零个或多个单词

  1. public static String EX_NAME = "ex.topic";
  2. public static final String QUEUE_NAME = "queue.routing";
  3. public static final String[] rks = {"sh.info","sh.warn","sh.error","bj.info","bj.warn","bj.error","sz.info","sz.warn","sz.error"};
  4. @Test
  5. public void producer() throws Exception{
  6. ConnectionFactory factory = ConnectionFactoryUtils.factory();
  7. try (Connection connection = factory.newConnection();
  8. Channel channel = connection.createChannel()) {
  9. channel.exchangeDeclare(EX_NAME, BuiltinExchangeType.TOPIC,true);
  10. for (int i = 0; i < 100; i++) {
  11. final String rk = rks[i % 8];
  12. String message = rk+":"+i;
  13. channel.basicPublish(EX_NAME, rk,null,message.getBytes());
  14. }
  15. }
  16. }
  17. @Test
  18. public void consumer() throws Exception{
  19. ConnectionFactory factory = ConnectionFactoryUtils.factory();
  20. try(Connection connection = factory.newConnection();
  21. Channel channel = connection.createChannel()){
  22. channel.exchangeDeclare(EX_NAME, BuiltinExchangeType.TOPIC,true);
  23. channel.queueDeclare(QUEUE_NAME,true,false,false,null);
  24. // 只关注info的消息
  25. channel.queueBind(QUEUE_NAME, EX_NAME, "#.info");
  26. channel.basicConsume(QUEUE_NAME,true, (consumerTag, delivery) -> {
  27. String message = new String(delivery.getBody(), "UTF-8");
  28. System.out.println(" Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
  29. }, consumerTag -> {});
  30. Thread.sleep(30_000);
  31. }
  32. }

04|RabbitMQ高级特性

4.1|消息可靠性

可以通过以下几个方面来保证消息的可靠性:

  • 捕获客户端代码中的异常,包括生产者和消费者
  • AMQP/RabbitMQ的事务机制
  • 发送端确认机制
  • 消息持久化机制
  • Borker的高可用集群
  • 消费者确认机制

image.png
消息传输保证层级:

  • At most once(最多一次):消息可能会丢失,但绝不会重复传输
  • At least once(最少一次):消息绝不会丢失,但可能会重复传输
  • Exactly once(恰好一次):每条消息肯定会被穿出一次且仅传输一次

RabbmitMQ支持“最多一次”和“最好一次”。
发送端确认原生API:

  1. final ConnectionFactory factory = ConnectionFactoryUtils.factory();
  2. try (final Connection connection = factory.newConnection();
  3. final Channel channel = connection.createChannel()) {
  4. channel.confirmSelect();
  5. channel.exchangeDeclare("ex.ack", BuiltinExchangeType.DIRECT);
  6. channel.queueDeclare("queue.ack",false,false,false,null);
  7. channel.queueBind("queue.ack","ex.ack","rk.ack");
  8. channel.basicPublish("ex.ack","rk.ack",null,"message.ack".getBytes());
  9. // 等待消息被确认
  10. channel.waitForConfirmsOrDie(5_000);
  11. } catch (InterruptedException e) {
  12. e.printStackTrace();
  13. } catch (IOException e) {
  14. e.printStackTrace();
  15. } catch (TimeoutException e) {
  16. e.printStackTrace();
  17. }

消费端确认原生API:

  1. final ConnectionFactory factory = ConnectionFactoryUtils.factory();
  2. try (final Connection connection = factory.newConnection();
  3. final Channel channel = connection.createChannel()) {
  4. channel.exchangeDeclare("ex.ack", BuiltinExchangeType.DIRECT);
  5. channel.queueDeclare("queue.ack",false,false,false,null);
  6. channel.queueBind("queue.ack","ex.ack","rk.ack");
  7. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  8. String message = new String(delivery.getBody(), "UTF-8");
  9. System.out.println(" Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
  10. final long deliveryTag = delivery.getEnvelope().getDeliveryTag();
  11. // 手动ack确认消息
  12. channel.basicAck(deliveryTag,true);
  13. };
  14. channel.basicConsume("queue.ack",false, deliverCallback, consumerTag -> {
  15. });
  16. Thread.sleep(10_000);
  17. }

4.2|TTL机制

TTL(Time to Live):RabbitMQ可以对消息和队列两个纬度分别设置TTL。目前有两种方法设置消息的TTL:

    1. 通过Queue属性设置,队列中的所有消息都有相同的过期时间
    1. 对消息自身进行单独设置,每条消息的TTL可以不同

如果两种方法一起使用,则以TTL较小数值为准。
原生API案例:

  1. final ConnectionFactory factory = ConnectionFactoryUtils.factory();
  2. try(final Connection connection = factory.newConnection();
  3. final Channel channel = connection.createChannel()){
  4. Map<String, Object> arguments = new HashMap<>();
  5. // 消息的过期时间
  6. arguments.put("x-message-ttl",10_000);
  7. // 队列的过期时间
  8. arguments.put("x-expires",30_000);
  9. channel.exchangeDeclare("ex.ttl", BuiltinExchangeType.DIRECT);
  10. channel.queueDeclare("queue.ttl",false,false,false,arguments);
  11. channel.queueBind("queue.ttl","ex.ttl","rk.ttl");
  12. channel.basicPublish("ex.ttl","rk.ttl",null,"ttl.message".getBytes());
  13. }
  14. }

4.3|DLX死信队列

DLX(Dead Letter Exchange):消息在一个队列中变成死信(Dead Latter)之后,被重新发送到一个特殊的交换器(DLX)中,同时,绑定DLX的队列就称为“死信队列”,以下几种情况会导致消息变为死信:

    1. 消息被拒绝,并且设置requeue参数为false
    1. 消息过期
    1. 队列达到最大长度

对于RabbitMQ来说,DLX是一个非常有用的特性。它可以处理异常情况下,消息不能被消费者正确消费而被置于死信队列中的情况,后续分析程序可以通过消费这个死心队列中的内容来分析当时所遇到的异常情况,进而改善和优化系统,并可以作为某些业务的兜底措施。
原生API案例:

  1. final ConnectionFactory factory = ConnectionFactoryUtils.factory();
  2. try(final Connection connection = factory.newConnection();
  3. final Channel channel = connection.createChannel()){
  4. // 定义一个死信交换器(和定义普通交换器的方式相同)
  5. channel.exchangeDeclare("ex.dlx",BuiltinExchangeType.DIRECT,true);
  6. channel.queueDeclare("queue.dlx",false,false,false,null);
  7. channel.queueBind("queue.dlx","ex.dlx","rk.dlx");
  8. // 定义正常业务交换器
  9. channel.exchangeDeclare("ex.biz", BuiltinExchangeType.DIRECT);
  10. // 设置业务交换器消息若10秒内没有被消费则成为死信消息
  11. Map<String, Object> arguments = new HashMap<>();
  12. arguments.put("x-message-ttl",20_000);
  13. // 设置该队列关联的死信交换器
  14. arguments.put("x-dead-letter-exchange", "ex.dlx");
  15. // 设置该队列所关联的死信交换器的routingKey
  16. arguments.put("x-dead-letter-routing-key", "rk.dlx");
  17. channel.queueDeclare("queue.biz",false,false,false,arguments);
  18. channel.queueBind("queue.biz","ex.biz","rk.biz");
  19. // 发送业务消息,当10秒内,如业务消息没有被消费,消息过期之后消息将会放置到死信队列中
  20. channel.basicPublish("ex.biz","rk.biz",null,"dlx.message".getBytes());
  21. }

05|RabbitMQ集群

RabbitMQ集群允许消费者和生成者在RabbitMQ单个节点奔溃的情况下继续运行,并可以通过添加更多的节点来线性扩展消息通信的吞吐量。当失去一个RabbitMQ节点时,客户端能够重新连接到集群中的任何其他节点并继续生成和消费,RabbitMQ集群中的所有节点都会备份所有的元数据信息,包括:

  • 队列元数据:队列的名称及属性
  • 交换器:交换器的名称及属性
  • 绑定关系元数据:交换器与队列或者交换器与交换器之间的绑定关系
  • vhost元数据:vhost内的队列、交换器和绑定提供命名空间及安全属性

RabbitMQ集群中各节点数据并不是全量对等的,各节点之间同步备份的仅仅是上述元数据以及Queue Owner(队列所有者,实际创建Queue并保存消息数据的节点)的指针。当集群中某个节点崩溃后,该节点的队列进程和关联的绑定都会消失,关联的消费者也会丢失订阅消息,节点恢复后持久化的消息可以重新被消费。集群中只能保证集群中的某个Node节点挂掉后应用程序还可以切换到其他Node上继续发送和消费消息,但并无法保证原有的消息不丢失,所以并不是一个真正意义的高可用集群。
image.png
这是RabbitMQ内置的集群模式,Erlang语言天生具备分布式特性,所以不需要借助类似Zookeeper之类的组件来实现集群(集群节点间使用cookie来进行通信验证,所有节点都必须使用相同的.erlang.cookie文件内容),不用节点的Erlang、RabbitMQ版本必须一致。

镜像队列模式

RabbitMQ内置的集群模式有丢失消息的风险,“镜像队列”可以看成是对内置默认集群模式的一种高可用架构的补充。可以将队列镜像(同步)到集群中的其他broker上,相当于是多副本冗余。如果集群中的一个节点失效,队列能自动地切换到集群中的另一个镜像节点上以保证服务的高可用性,而且消息不丢失。
在RabbitMQ镜像队列中分为master和slave,一个queue第一次创建所在的节点是它的master节点,其他节点为salve节点。如果master由于某种原因失效,最先加入的slave会被提升为新的master。
无论客户端请求到达master还是slave,最终数据都是从master节点获取。当请求到达master节点时,master节点直接将消息返回给client,同时master节点会通过GM(GUaranteed Multicast)协议将Queue的最新状态广播到slave节点。GM保证了广播消息的原子性,即要么都更新要么都不更新。当请求达到slave节点时,slave节点需要将请求先重定向到master节点,master节点将消息返回给client,同时master节点会通过GM协议将queue的最新状态广播到slave节点。
image.png