title: “Rabbit Mq”
date: “2022-03-22T13:31:53+08:00”
tags:

  • “Rabbit Mq”
    categories:
  • “消息队列”
  • “服务器”
    toc: true

bookComments: false

bookSearchExclude: false


消息队列

MQ 的相关概念

RabbitMQ

概念

RabbitMQ是一个消息中间件:它接受并转发消息。可以把它当做一个快递站点,当你要发送一个包裹时,你把你的包裹放到快递站,快递员最终会把你的快递送到收件人那里,按照这种逻辑RabbitMQ是一个快递站,一个快递员帮你传递快件。RabbitMQ与快递站的主要区别在于,它不处理快件而是接收存储转发消息数据

四大核心

  • 生产者:
    产生数据发送消息的程序是生产者

  • 交换机:
    交换机是 RabbitMQ 非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推送到多个队列,亦或者是把消息丢弃,这个得有交换机类型决定

  • 队列:
    队列是 RabbitMQ 内部使用的一种数据结构,尽管消息流经 RabbitMQ 和应用程序,但它们只能存储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。这就是我们使用队列的方式

  • 消费者:
    消费与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序。请注意生产者,消费者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又是可以是消费者。
    RabbitMQ - 图1
    RabbitMQ - 图2

核心模式

  1. Hello World——简单模式
  2. Work Queue——工作模式
  3. Publish/Subscribe——发布/订阅模式
  4. Routing——路由模式
  5. Topics——主题模式
  6. Publisher Confirms——发布确认模式

RabbitMQ - 图3

关键词

RabbitMQ - 图4

  • Broker:接收和分发消息的应用,RabbitMQ Server 就是 Message Broker
  • Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个 vhost,每个用户在自己的 vhost 创建exchange/queue 等
  • Connection:publisher/consumer 和 broker 之间的 TCP 连接
  • Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCPConnection 的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个 thread 创建单独的 channel 进行通讯,AMQP method 包含了 channel id 帮助客户端和 message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的Connection 极大减少了操作系统建立 TCP connection 的开销
  • Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到 queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) andfanout(multicast)
  • Queue:消息最终被送到这里等待 consumer 取走
  • Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key,Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据

安装

非docker方式

  1. 官方网址

  2. 文件上传
    上传到/usr/local/software 目录下(如果没有 software 需要自己创建)
    RabbitMQ - 图5
    链接:https://pan.baidu.com/s/1UTJhAesjTjQl3rl-v9MZBw 提取码:ytte —来自百度网盘超级的分享

  3. 安装文件(分别按照以下顺序安装)

    1. rpm -ivh erlang-21.3-1.el7.x86_64.rpm #rabbitmq的前置erlang的环境
    2. yum install socat -y #官网要求安装
    3. rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm #rabbitmq-server
  1. 常用命令(按照以下顺序执行) | | | | —- | —- | | chkconfig rabbitmq-server on | 添加开机启动 RabbitMQ 服务 | | /sbin/service rabbitmq-server start | 启动服务 | | /sbin/service rabbitmq-server status | 查看服务状态 | | /sbin/service rabbitmq-server stop | 停止服务(选择执行) | | rabbitmq-plugins enable rabbitmq_management | 开启 web 管理插件 |


RabbitMQ - 图6
开启web管理插件的时候需要停止服务,开启web管理插件后需要手动再次启动rabbitmq服务。

  1. linux防火墙开启端口 ```shell firewall-cmd —zone=public —add-port=5672/tcp —permanent #开启5672端口 程序代码执行所需端口 firewall-cmd —zone=public —add-port=15672/tcp —permanent #开启15672端口 web插件所需端口

firewall-cmd —reload #防火墙重启

firewall-cmd —zone=public —list-ports #查看开放的端口

systemctl status firewalld #查看防火墙状态

  1. 6.
  2. 云服务器开启156725672端口
  3. <br />用默认账号密码(guest)访问地址 http://ip:15672/出现权限问题
  4. 7.
  5. 出现权限问题是因为guest设置了超级管理员角色,但是没有被赋予权限。
  6. <br />添加一个新的用户:
  7. | 序号 | | |
  8. | :---: | --- | --- |
  9. | 1 | `rabbitmqctl add_user admin 123` | 创建账号 |
  10. | 2 | `rabbitmqctl set_user_tags admin administrator` | 设置用户角色 |
  11. | 3 | `rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"` | 设置用户权限 |
  12. | 3.1 | set_permissions [-p ] | 用户 user_admin 具有/vhost1 这个 virtual host 中所有资源的配置、写、读权限 |
  13. | 4 | `rabbitmqctl list_users` | 显示当前用户和角色 |
  14. 8.
  15. 再次利用 admin 用户登录
  16. 9.
  17. 重置命令
  18. - 关闭应用的命令为:`rabbitmqctl stop_app`
  19. - 清除的命令为:`rabbitmqctl reset`
  20. - 重新启动命令为:`rabbitmqctl start_app`
  21. <a name="6232cdc9"></a>
  22. #### docker安装
  23. 1.
  24. pull
  25. ```shell
  26. docker pull rabbitmq
  1. run
    15672端口
    1. docker run -d --name myrabbitmq -p 15672:15672 -p 5672:5672 rabbitmq
  1. 增加超级管理员用户,开启web插件 ```shell docker ps docker exec -it 容器ID /bin/bash rabbitmqctl add_user admin 123 #创建账号 rabbitmqctl set_user_tags admin administrator #设置用户角色 rabbitmqctl set_permissions -p “/“ admin “.“ “.“ “.*” #用户 user_admin 具有/vhost1 这个 virtual host 中所有资源的配置、写、读权限 rabbitmqctl list_users #显示当前用户和角色

rabbitmq-plugins enable rabbitmq_management #开启web插件

ctrl+p+q #退出当前容器

  1. 4.
  2. 重启容器
  3. ```shell
  4. docker restart 容器ID
  1. linux防火墙开启端口 ```shell firewall-cmd —zone=public —add-port=5672/tcp —permanent #开启5672端口 firewall-cmd —zone=public —add-port=15672/tcp —permanent #开启15672端口

firewall-cmd —reload #防火墙重启

firewall-cmd —zone=public —list-ports #查看开放的端口

systemctl status firewalld #查看防火墙状态

  1. 6.
  2. 云服务器开启156725672端口
  3. 7.
  4. 登陆网页查看
  5. <a name="helloworld"></a>
  6. # helloworld
  7. Java 编写两个程序。发送单个消息的生产者和接收消息并打印出来的消费者。
  8. 在下图中,“ P”是我们的生产者,“ C”是我们的消费者。中间的框是一个队列-RabbitMQ 代表使用者保留的消息缓冲区
  9. ![](https://blogimg.ytte.top//img-jixiang/2022/03/26/123228234.png#alt=)
  10. -
  11. pom依赖:
  12. ```xml
  13. <!--指定 jdk 编译版本-->
  14. <build>
  15. <plugins>
  16. <plugin>
  17. <groupId>org.apache.maven.plugins</groupId>
  18. <artifactId>maven-compiler-plugin</artifactId>
  19. <configuration>
  20. <source>8</source>
  21. <target>8</target>
  22. </configuration>
  23. </plugin>
  24. </plugins>
  25. </build>
  26. <dependencies>
  27. <!--rabbitmq 依赖客户端-->
  28. <dependency>
  29. <groupId>com.rabbitmq</groupId>
  30. <artifactId>amqp-client</artifactId>
  31. <version>5.8.0</version>
  32. </dependency>
  33. <!--操作文件流的一个依赖-->
  34. <dependency>
  35. <groupId>commons-io</groupId>
  36. <artifactId>commons-io</artifactId>
  37. <version>2.6</version>
  38. </dependency>
  39. </dependencies>
  • 消息的生产者

    1. public class Producer {
    2. //队列名称常量
    3. private final static String QUEUE_NAME = "hello";
    4. public static void main(String[] args) {
    5. //创建一个连接工厂
    6. ConnectionFactory connectionFactory = new ConnectionFactory();
    7. connectionFactory.setHost("101.35.136.150");
    8. connectionFactory.setUsername("ytte");
    9. connectionFactory.setPassword("ytte");
    10. //channel 实现了自动 close 接口 自动关闭 不需要显示关闭
    11. try {
    12. Connection connection = connectionFactory.newConnection();
    13. Channel channel = connection.createChannel();
    14. /**
    15. * 生成一个队列
    16. * 1.队列名称
    17. * 2.队列里面的消息是否持久化 默认消息存储在内存中
    18. * 3.该队列是否只供一个消费者进行消费 是否进行共享 false 可以多个消费者消费
    19. * (如果设置成了true)只有队列能接收到消息,消费者也接受不到消息
    20. * 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除
    21. * 5.其他参数
    22. */
    23. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    24. String message = "hello world";
    25. /**
    26. * 发送一个消息
    27. * 1.发送到那个交换机
    28. * 2.路由的 key 是哪个
    29. * 3.其他的参数信息
    30. * 4.发送消息的消息体
    31. */
    32. channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
    33. System.out.println("消息发送完毕");
    34. } catch (IOException | TimeoutException e) {
    35. e.printStackTrace();
    36. }
    37. }
    38. }
  • 消息的消费者 ```java public class Consumer {

    private final static String QUEUE_NAME = “hello”;

  1. public static void main(String[] args) {
  2. ConnectionFactory connectionFactory = new ConnectionFactory();
  3. connectionFactory.setHost("101.35.136.150");
  4. connectionFactory.setUsername("ytte");
  5. connectionFactory.setPassword("ytte");
  6. System.out.println("等待接收消息。。。。。。。。");
  7. try {
  8. Connection connection = connectionFactory.newConnection();
  9. Channel channel = connection.createChannel();
  10. //推送的消息如何进行消费的接口回调
  11. DeliverCallback deliverCallback = (consumerTag, message) -> {
  12. System.out.println(new String(message.getBody()));
  13. };
  14. //取消消费的一个回调接口 如在消费的时候队列被删除掉了
  15. CancelCallback cancelCallback = consumerTag -> {
  16. System.out.println("消息消费被中断");
  17. };
  18. /**
  19. * 消费者消费消息
  20. * 1.消费哪个队列
  21. * 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答
  22. * 3.消费者未成功消费的回调
  23. */
  24. channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
  25. } catch (IOException | TimeoutException e) {
  26. e.printStackTrace();
  27. }
  28. }

}

  1. <a name="446131ac"></a>
  2. # 工作队列WorkQueue
  3. ![](https://blogimg.ytte.top//img-jixiang/2022/03/27/093053298.png#alt=)
  4. 工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。我们把任务发送到队列。当有多个工作线程时,这些工作线程将一起处理这些任务。每个消息任务只能被完整的执行一次,如果一个消费者工作线程断开连接,他正在处理的消息任务会返回队列中,并由其他消费者工作线程来执行,所以说是只能被**完整**的执行一次。
  5. <a name="a68032c8"></a>
  6. ## 轮循分发消息
  7. 即,如果存在两个消费者工作线程,消息会轮流分发给二者,你一个我一个。
  8. <a name="c66e753d"></a>
  9. ### 抽取工具类
  10. 重复代码太多抽取工具类
  11. ```java
  12. public class RabbitMqUtils {
  13. //得到一个连接的 channel
  14. public static Channel getChannel() throws Exception {
  15. //创建一个连接工厂
  16. ConnectionFactory factory = new ConnectionFactory();
  17. factory.setHost("101.35.136.150");
  18. factory.setUsername("ytte");
  19. factory.setPassword("ytte");
  20. Connection connection = factory.newConnection();
  21. return connection.createChannel();
  22. }
  23. }

消费者

不创建多个进行,使用idea自带的单一类可创建多实例。

RabbitMQ - 图7

这样就可以多次启动WorkQueue1,且可更改其中的代码在启动(将C1 消费者启动改为C2 消费者启动)

  1. public class WorkQueue1 {
  2. private static final String QUEUE_NAME="hello";
  3. public static void main(String[] args) throws Exception {
  4. Channel channel = RabbitMqUtils.getChannel();
  5. DeliverCallback deliverCallback=(consumerTag, delivery)->{
  6. String receivedMessage = new String(delivery.getBody());
  7. System.out.println("接收到消息:"+receivedMessage);
  8. };
  9. CancelCallback cancelCallback=(consumerTag)->{
  10. System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
  11. };
  12. System.out.println("C1 消费者启动等待消费.................. ");
  13. channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
  14. }
  15. }

生产者

  1. public class Task01 {
  2. private static final String QUEUE_NAME = "hello";
  3. public static void main(String[] args) throws Exception {
  4. try (Channel channel = RabbitMqUtils.getChannel();) {
  5. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  6. //从控制台当中接受信息
  7. Scanner scanner = new Scanner(System.in);
  8. while (scanner.hasNext()) {
  9. String message = scanner.next();
  10. channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
  11. System.out.println("发送消息完成:" + message);
  12. }
  13. }
  14. }
  15. }

结果

RabbitMQ - 图8

消息应答

RabbitMQ 一旦向消费者传递了一条消息,便立即将该消息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续发送给该消费这的消息,因为它无法接收到。

为了保证消息在发送过程中不丢失,rabbitmq 引入消息应答机制,消息应答就是:消费者在接收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了。

  1. 自动应答
  2. 手动应答

自动应答

消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡,因为如果消息在接收到之前,消费者出现连接或者 channel 关闭,那么消息就丢失了;虽然这种模式消费者可以传递过载的消息,但是这样有可能使得消费者由于接收太多还来不及处理的消息,造成消息的积压,内存耗尽,最终这些消费者线程被操作系统杀死。所以自动应答仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用。

一般不采用自动应答,采用手动应答。

手动应答

常用方法:

  • Channel.basicAck(用于肯定确认)
    RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了

  • Channel.basicNack(用于否定确认)

  • Channel.basicReject(用于否定确认)
    与 Channel.basicNack 相比少一个参数,不处理该消息了直接拒绝,可以将其丢弃了

手动应答的好处是可以批量应答并且减少网络拥堵

RabbitMQ - 图9

批量应答即,消息在消费者的对应的信道中,还未被接受也会被消费者回复收到应答给RabbitMq。

消息自动重新入队

如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息。

RabbitMQ - 图10

消息手动应答代码

生产者:

  1. public class Task02 {
  2. private static final String TASK_QUEUE_NAME = "ack_queue";
  3. public static void main(String[] argv) throws Exception {
  4. try (Channel channel = RabbitMqUtils.getChannel()) {
  5. channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);
  6. Scanner sc = new Scanner(System.in);
  7. System.out.println("请输入信息");
  8. while (sc.hasNext()) {
  9. String message = sc.nextLine();
  10. channel.basicPublish("", TASK_QUEUE_NAME, null, message.getBytes("UTF-8"));
  11. System.out.println("生产者发出消息" + message);
  12. }
  13. }
  14. }
  15. }

消费者1:

  1. public class Work01 {
  2. private static final String ACK_QUEUE_NAME = "ack_queue";
  3. public static void main(String[] args) throws Exception {
  4. Channel channel = RabbitMqUtils.getChannel();
  5. System.out.println("C1 等待接收消息处理时间较短");
  6. //消息消费的时候如何处理消息
  7. DeliverCallback deliverCallback = (consumerTag, delivery)-> {
  8. String message = new String(delivery.getBody());
  9. // 睡眠1s
  10. SleepUtils.sleep(1);
  11. System.out.println("接收到消息:" + message);
  12. /**
  13. * 1.消息标记 tag
  14. * 2.是否批量应答未应答消息
  15. */
  16. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  17. };
  18. //采用手动应答
  19. channel.basicConsume(ACK_QUEUE_NAME, false, deliverCallback, (consumerTag) -> {
  20. System.out.println(consumerTag + "消费者取消消费接口回调逻辑");
  21. });
  22. }
  23. }

消费者2:

  1. public class Work02 {
  2. private static final String ACK_QUEUE_NAME = "ack_queue";
  3. public static void main(String[] args) throws Exception {
  4. Channel channel = RabbitMqUtils.getChannel();
  5. System.out.println("C2 等待接收消息处理时间较短");
  6. //消息消费的时候如何处理消息
  7. DeliverCallback deliverCallback = (consumerTag, delivery)-> {
  8. String message = new String(delivery.getBody());
  9. //睡眠20s表示正在处理还多的代码
  10. SleepUtils.sleep(20);
  11. System.out.println("接收到消息:" + message);
  12. /**
  13. * 1.消息标记 tag
  14. * 2.是否批量应答未应答消息
  15. */
  16. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  17. };
  18. //采用手动应答
  19. channel.basicConsume(ACK_QUEUE_NAME, false, deliverCallback, (consumerTag) -> {
  20. System.out.println(consumerTag + "消费者取消消费接口回调逻辑");
  21. });
  22. }
  23. }

SleepUtils:

  1. public class SleepUtils {
  2. public static void sleep(int second) {
  3. try {
  4. Thread.sleep(1000 * second);
  5. } catch (InterruptedException _ignored) {
  6. Thread.currentThread().interrupt();
  7. }
  8. }
  9. }

效果

生产者发送1~9,消费者工作线程轮流接受消息,1和2中平均接收到消息,但是由于1中睡眠1s,2中睡眠20s,所以可以看出2对应的信道里还屯留了几条消息(不知道是否可以这样说),而1中马上就处理完了。这时候我们关闭2进程,这就导致没有处理消息,或者没有处理完成,也就没有发送应答,Rabbit会检查到没有接收到应答消息,将未处理的消息交个能处理它们的1中,1继续处理消息。

RabbitMQ - 图11

RabbitMQ - 图12

RabbitMQ - 图13

RabbitMQ - 图14

RabbitMQ - 图15


2022-3-30

RabbitMQ 持久化

保障当 RabbitMQ 服务停掉以后消息生产者发送过来的消息不丢失。默认情况下 RabbitMQ 退出或由于某种原因崩溃时,它忽视队列和消息,除非告知它不要这样做。我们需要将队列和消息都标记为持久化。

队列如何实现持久化

队列实现持久化 需要在声明队列的时候把 durable 参数设置为持久化

RabbitMQ - 图16

注意:之前声明的不是持久化的队列,需要把原先队列先删除,或者重新创建一个持久化的队列,不然就会出现错误

RabbitMQ - 图17

控制台中持久化与非持久化队列的 UI 显示区:

RabbitMQ - 图18

现在即使重启 rabbitmq 队列也依然存在。

消息实现持久化

消息持久化需要在消息生产者修改代码,MessageProperties.PERSISTENT_TEXT_PLAIN添加这个属性。

  1. channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));

RabbitMQ - 图19

将消息标记为持久化并不能完全保证不会丢失消息。尽管它告诉 RabbitMQ 将消息保存到磁盘,但是这里依然存在当消息刚准备存储在磁盘的时候 但是还没有存储完,消息还在缓存的一个间隔点。此时并没有真正写入磁盘。后面的发布确认章节,会细说如何让生产者收到确认的消息。(生产者收到确认消息,即消息已经 到达队列并且持久化了)

不公平分发

之前的轮流分发在某种场景下不是好的方法,比方说有两个消费者在处理任务,二者处理速度不同,任务是平分的,就会造成一方消费者空闲 一方忙活。不能够最大化利用现有资源。

为了避免这种情况,我们可以设置参数

  1. channel.basicQos(1)

在web页面中可以看到Prefetch count 为1,即上面设置的参数为1

RabbitMQ - 图20

如果我还没有处理完或者我还没有应答你,你先别分配给我,我目前只能处理一个任务,然后 rabbitmq 就会把该任务分配给没有那么忙的那个空闲消费者,当然如果所有的消费者都没有完成手上任务,队列还在不停的添加新任务,队列有可能就会遇到队列被撑满的情况,这个时候就只能添加新的 worker 或者改变其他存储任务的策略。

预取值

就相当于channel中有个缓冲区,缓冲区的大小(所能暂存消息的数量)可以通过使用 basic.Qos “预取计数”值来设置。该值定义通道上允许的未确认消息的最大数量。一旦数量达到配置的数量,RabbitMQ 将停止在通道上传递更多消息。

例如,假设在通道上有未确认的消息 5、6、7,8,预取值设置为 4,此时RabbitMQ 将不会在该通道上再传递任何
消息。若 排在前面的消息 刚刚被确认 ACK,RabbitMQ 才会再发送消息。消息应答和 QoS 预取值对用户吞吐量有重大影响。

通常,增加预取值,来提高向消费者传递消息的速度。虽然自动应答传输消息速率是最佳的,但是,在这种情况下已传递但尚未处理的消息的数量也会增加,不同的负载该值取值也不同 100 到 300 范围内的值通常可提供最佳的吞吐量,并且不会给消费者带来太大的风险。预取值为 1 是最保守的。

发布确认

原理

生产者将信道设置成 confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID(从 1 开始),消息被投递到所有匹配的队列之后,mq就会发送一个确认信息给生产者(包含消息的唯一 ID),告诉生产者消息已经正确到达目的队列了。

如果消息队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,mq回传给生产者的确认消息中的delivery-tag 属性,包含了确认消息的序列号,此外 broker 也可以设置basic.ack 的multiple 属性,表示到这个序列号之前的所有消息都已经得到了处理。

confirm 模式最大的好处在于可以是异步的,发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息,生产者应用程序同样可以在回调方法中处理该 nack 消息。


2022-3-31

开启发布确认的方法

开启需要调用方法 confirmSelect,每当你要想使用发布确认,都需要在 channel 上调用该方法

RabbitMQ - 图21

单个确认发布

这是一种简单的确认方式,同步确认发布的方式,发布一个消息之后只有它被确认发布,后续的消息才能继续发布,waitForConfirmsOrDie(long)这个方法只有在消息被确认的时候才返回(返回布尔值),如果在指定时间范围内这个消息没有被确认那么它将抛出异常。

缺点:发布速度特别的慢,因为如果没有收到确认发布的消息就会阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量。适用简单和小应用程序。

  1. public class e111 {
  2. private static final int MESSAGE_COUNT = 1000;
  3. public static void main(String[] args) throws Exception {
  4. publishMessageIndividually();
  5. }
  6. public static void publishMessageIndividually() throws Exception {
  7. try (Channel channel = RabbitMqUtils.getChannel()) {
  8. //产生随机的队列名
  9. String queueName = UUID.randomUUID().toString();
  10. //声明队列
  11. channel.queueDeclare(queueName, false, false, false, null);
  12. //开启发布确认
  13. channel.confirmSelect();
  14. long begin = System.currentTimeMillis();
  15. //循环1000 发送消息,收到确认信息再发送消息
  16. for (int i = 0; i < MESSAGE_COUNT; i++) {
  17. String message = i + "";
  18. channel.basicPublish("", queueName, null, message.getBytes());
  19. //等待接受确认信息 服务端返回 false 或超时时间内未返回,生产者可以消息重发
  20. boolean flag = channel.waitForConfirms();
  21. if (flag) {
  22. System.out.println("消息发送成功");
  23. }
  24. }
  25. long end = System.currentTimeMillis();
  26. System.out.println("发布" + MESSAGE_COUNT + "个单独确认消息,耗时" + (end - begin) + "ms");
  27. }
  28. }
  29. }
  30. //结果 发布1000个单独确认消息,耗时15837ms

批量确认发布

与单个等待确认消息相比,先发布一批消息然后一起确认可以极大地提高吞吐量,缺点:当发生故障导致发布出现问题时,不知道是哪个消息出现问题,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。当然这种方案仍然是同步的,也一样阻塞消息的发布。

  1. public static void BatchPublishMessage() throws Exception {
  2. try (Channel channel = RabbitMqUtils.getChannel()) {
  3. String queueName = UUID.randomUUID().toString();
  4. channel.queueDeclare(queueName, false, false, false, null);
  5. channel.confirmSelect();
  6. long start = System.currentTimeMillis();
  7. for (int i = 0; i < MESSAGE_COUNT; i++) {
  8. String message = i + "";
  9. channel.basicPublish("", queueName, null, message.getBytes());
  10. if (MESSAGE_COUNT % (i+1) == 0) {
  11. boolean b = channel.waitForConfirms();
  12. if (b) {
  13. System.out.println("消息发送成功");
  14. }
  15. }
  16. }
  17. long end = System.currentTimeMillis();
  18. System.out.println("发布" + MESSAGE_COUNT + "个单独确认消息,耗时" + (end - start) + "ms");
  19. }
  20. }
  21. //结果 发布1000个单独确认消息,耗时344ms

异步确认发布

  1. public static void AsynchronousPublishMessage() throws Exception {
  2. try (Channel channel = RabbitMqUtils.getChannel()) {
  3. String queueName = UUID.randomUUID().toString();
  4. //开启发布确认
  5. channel.confirmSelect();
  6. /**
  7. * 线程安全有序的一个哈希表,适用于高并发的情况
  8. * 1.将序号与消息进行关联
  9. * 2.批量删除条目 只要给到序列号
  10. * 3.支持并发访问
  11. */
  12. ConcurrentSkipListMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
  13. /**
  14. * 确认收到消息的一个回调
  15. * 1.消息序列号
  16. * 2.multiple
  17. * true 未能收到当前序号的确认消息
  18. * false 确认收到当前序号消息
  19. */
  20. ConfirmCallback ackCallback = (sequenceNumber, multiple) -> {
  21. //只清除当前序列号的消息
  22. if (!multiple) {
  23. String message = outstandingConfirms.get(sequenceNumber);
  24. System.out.println("发布的消息" + message + "被确认");
  25. outstandingConfirms.remove(sequenceNumber);
  26. }
  27. };
  28. ConfirmCallback nackCallback = (sequenceNumber, multiple) -> {
  29. if (multiple) {
  30. String message = outstandingConfirms.get(sequenceNumber);
  31. System.out.println("发布的消息"+message+"未被确认,序列号"+sequenceNumber);
  32. }
  33. };
  34. /**
  35. * 添加一个异步确认的监听器
  36. * 1.确认收到消息的回调
  37. * 2.未收到消息的回调
  38. */
  39. channel.addConfirmListener(ackCallback, nackCallback);
  40. long begin = System.currentTimeMillis();
  41. for (int i = 0; i < MESSAGE_COUNT; i++) {
  42. String message = "消息" + i;
  43. /**
  44. * channel.getNextPublishSeqNo()获取下一个消息的序列号
  45. * 通过序列号与消息体进行一个关联
  46. * 接收到的确认的,未确认的消息都收集到map中
  47. */
  48. outstandingConfirms.put(channel.getNextPublishSeqNo(), message);
  49. channel.basicPublish("", queueName, null, message.getBytes());
  50. System.out.println("消息发送成功");
  51. }
  52. long end = System.currentTimeMillis();
  53. System.out.println("发布" + MESSAGE_COUNT + "个异步确认消息,耗时" + (end - begin) + "ms");
  54. SleepUtils.sleep(20);
  55. }
  56. }
  57. //结果 发布1000个异步确认消息,耗时62ms

如何处理异步未确认消息

把未确认的消息放到一个基于内存的能被发布线程访问的队列,比如说用ConcurrentLinkedQueue 这个队列在 confirm callbacks 与发布线程之间进行消息的传递。

对比

  • 发布1000个单独确认消息,耗时15837ms

  • 发布1000个单独确认消息,耗时344ms

  • 发布1000个异步确认消息,耗时62ms

交换机

在之前,将做一些完全不同的信息传达给多个消费者的模式称为 ”发布/订阅”。为了说明这种模式,我们将构建一个简单的日志系统。由两个程序组成:第一个程序将发出日志消息,第二个程序是消费者。其中我们会启动两个消费者,其中一个消费者接收到消息后把日志存储在磁盘,另外一个消费者接收到消息后把消息打印在屏幕上,事实上第一个程序发出的日志消息将广播给所有消费者。todo(解释的不清晰)。

实际上,生产者只能将消息发送到交换机(exchange),交换机工作的内容是,一方面接收来自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消息放到特定队列还是说把他们到许多队列中(这就是发布订阅模式(类似广播)),还是说应该丢弃它们。这些都由交换机的类型来决定。

RabbitMQ - 图22

Exchanges 的类型

默认交换机

填写“”即表示使用默认的交换机。消息能路由发送到队列中其实是由routingKey(bindingkey)绑定 key 指定的

RabbitMQ - 图23

临时队列

之前的章节我们使用的是具有特定名称的队列(还记得 hello 和 ack_queue 吗?)。队列的名称我们来说至关重要-我们需要指定我们的消费者去消费哪个队列的消息。每当我们连接到 Rabbit 时,我们都需要一个全新的空队列,为此我们可以创建一个具有随机名称的队列,或者能让服务器为我们选择一个随机队列名称那就更好了。其次一旦我们断开了消费者的连接,队列将被自动删除。创建临时队列的方式如下:

  1. String queueName = channel.queueDeclare().getQueue();

创建出来之后长成这样:

RabbitMQ - 图24

绑定(bindings)

binding 是 exchange 和 queue 之间的桥梁,表示exchange 和队列进行了绑定关系。比如说下面这张图告诉我们的就是 X 与 Q1 和 Q2 进行了绑定

RabbitMQ - 图25

Fanout

Fanout 是将接收到的所有消息广播到它知道的所有队列中。系统中默认有些 exchange 类型。

RabbitMQ - 图26

RabbitMQ - 图27

Logs 和临时队列的绑定关系如下图:

RabbitMQ - 图28

总结

消息队列的使用场景

分布式事务

重复消费、消息丢失、顺序消费

可用性

搞定!

人生无常,大肠包小肠!

继续学习吧!