目录

文章很长,放个目录方便跳转

简介 AMQP角色 RabbitMQ内部结构 安装 简单使用 核心API RabbitMQ解决消息丢失 RabbitMQ解决重复消费

简介

RabbitMQ是实现了高级消息队列协议(AMQP:Advanced Message Queue)的开源消息代理软件(亦称面向消息的中间件)。

RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。

主要特性

  • 可靠性(Reliability)

RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。

  • 灵活的路由(Flexible Routing)

在消息进入队列之前,通过 Exchange 路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现自己的 Exchange 。

  • 消息集群(Clustering)

多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker 。

  • 高可用(Highly Available Queues)

    队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。

  • 管理界面(Management UI)

RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。

  • 跟踪机制(Tracing)

如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么。

AMQP角色

  • Message(消息):消息服务器处理消息的原子单元,包括一个内容头,一组属性和一个内容体。
    __使用AMQP协议,消息服务器不能修改内容体和内容头,但可以在内容头上添加额外信息。

_

  • PubLisher(消息生产者):发送消息
  • Consumer(消息消费者):消费消息
  • Broker(消息代理):消息队列服务器,负责接收客户端连接,路由消息。
  • Queue(消息队列):Broker中的一个角色,一个Broker中可以有多个Queue,负责保存消息直到发送给不同的消费者。算是消息的容器一个消息可以被投入一个或多个队列中,每个队列的消息都会等待消费者连接到这个队列并被取走。
  • Exchange(交换路由):Broker中的一个角色,负责接收生产者发送的消息,并路由给服务器中的队列。可以被理解成一个规则表,指明消息该被投到哪个队列中
  • Channel(信道):信道是一条独立的双向数据流通道。为了解决操作系统无法承受每秒建立特别多的TCP连接问题

RabbitMQ内部结构

RabbitMQ - 图1
1、Message

消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。

2、Publisher

消息的生产者,也是一个向交换器发布消息的客户端应用程序。

3、Exchange

交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。

4、Binding

绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。

5、Queue

消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连到这个队列将其取走。

6、Connection

网络连接,比如一个TCP连接。

7、Channel

信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。

8、Consumer

消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。

9、Virtual Host

虚拟主机表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 /

10、Broker

表示消息队列服务器实体。

相关名词解释

RoutingKey:指定当前消息被谁(哪个队列)接受【由生产者发送消息时携带发送给Exchange】
BindingKey: 指定当前Exchange下,什么样的RoutingKey会被下派到当前绑定的Queue中【由队列创建时与Exchange之间创建】
image.png

为什么还要引入信道呢

  1. TCP的创建和销毁,开销大,创建需要三次握手,销毁需要四次分手

  2. 如果不使用信道,那么引用程序就会使用TCP的方式连接到rabbitmq,高峰时每秒成千上万条连接会造成资源的巨大浪费(一条tcp消耗资源,成千上万的tcp会非常消耗资源),而且操作系统每秒处理TCP连接数量也是有限的,必定会造成性能瓶颈

3.信道的原理是一条线程一条信道,多条线程多条信道共同使用一条TCP连接。一条TCP连接可以容纳无限的信道,及时每秒造成成千上万的请求也不会造成性能瓶颈。

四种交换器(Exchange)

在RabbitMQ中,交换机负责接收生产者发送的消息并将这些消息路由给服务器中的队列。
常用的交换器类型有directtopicfanoutheaders四种。

Direct Exchange-直连交换器

该类型的交换器将所有发送到该交换器的消息被转发到RoutingKey指定的队列中,也就是说路由到BindingKey和RoutingKey完全匹配的队列中。

即如果一个队列绑定到交换机要求路由键为“order”,则只转发 routing key 标记为“order”的消息,不会转发“order.score”,也不会转发“order.sms”等等。它是完全匹配、单播的模式
image.png

Topic Exchange-主题交换器

Topic 交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。
它同样也会识别两个通配符:符号“#”和符号“”。#匹配0个或多个单词,”“匹配多个单词。

image.png

Fanout Exchange-扇形交换器

每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。
fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。
fanout 类型转发消息是最快的

image.png

Headers Exchange-头交换器

该类型的交换器不依赖路由规则来路由消息,而是根据消息内容中的headers属性进行匹配。
headers类型交换器性能差,在实际中并不常用。

六种工作模式

官网说明:https://www.rabbitmq.com/getstarted.html

简单队列模式(点对点模式)

一条消息由一个消费者进行消费。
image.png
P代表生产者,C代表消费者,红色代码消息队列。P将消息发送到消息队列,C对消息进行处理。

工作队列模式(Work Queues)

一个生产者,多个消费者共同消费同一个队列的消息,每个消费者获取到的消息唯一。

这种模式与简单模式的区别就是多了消费者
RabbitMQ - 图7
在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系,
队列采用轮询的方式将消息是平均发送给消费者**,消费者在处理完某条消息后,才会收到下一条消息

发布/订阅模式(Publish/Subscribe)

相比工作队列模式,多了个交换机

生产端先把消息发送到交换机,再由交换机把消息发送到绑定的队列中,每个绑定的队列都能收到由生产端发送的消息。

使用的是Fanout扇形交换器**
image.png
应用场景:用户通知,当用户充值成功或转账完成系统通知用户,通知方式有短信、邮件多种方法;

路由模式(Routing)

该种模式除了要绑定交换机外,发消息的时候还要制定routing key,即路由key,队列通过通道绑定交换机的时候,需要指定自己的routing key,这样,生产端发送消息的时候也会指定routing key,通过routing key就可以把相应的消息发送到绑定相应routing key的队列中去
image.png
使用的是direct直连交换器

主题模式(Topics)

Topics 模式和Routing 路由模式最大的区别就是,Topics 模式发送消息和消费消息的时候是通过通配符去进行匹配的。
image.png
使用的是topic主题交换器
**

RPC模式(Remote procedure call)

RPC即客户端远程调用服务端的方法 ,使用MQ可以实现RPC的异步调用,基于Direct交换机实现

1、客户端即是生产者也是消费者,向RPC请求队列发送RPC调用消息,同时监听RPC响应队列

2、服务端监听RPC请求队列的消息,收到消息后执行服务端的方法,得到方法返回的结果。

3、服务端将RPC方法 的结果发送到RPC响应队列

4、客户端(RPC调用方)监听RPC响应队列,接收到RPC调用结果。
image.png

安装

第一步:下载安装Erlang

RabbitMQ服务端代码是使用并发式语言Erlang编写的,安装Rabbit MQ的前提是安装Erlang。【就像安装ActiveMQ之前需要保证安装了JDK一样】

下载地址:http://www.erlang.org/downloads

选择合适版本下载,此处我选择64位20.3版本安装程序
image.png
下载完成,双击安装
image.png
默认点击Next即可
image.png
安装完成,配置环境变量
此电脑—>鼠标右键“属性”—>高级系统设置—>环境变量—>“新建”系统环境变量
image.png
变量名:ERLANG_HOME
变量值就是刚才erlang的安装地址,点击确定。

修改path环境变量,将%ERLANG_HOME%\bin加入到path中
image.png
image.png
验证是否安装成功,在Dos窗口输入erl,出现erlang的版本号即表示安装完成
image.png

第二步:下载并安装RabbitMQ

官网地址:https://www.rabbitmq.com/download.html
image.png
下载完成后双击安装,点击next即可
image.png
image.png
image.png
image.png

第三步:启用RabbitMQ管理插件

打开命令行cd,输入RabbitMQ的sbin目录。
image.png
输入命令:
   rabbitmq-plugins enable rabbitmq_management
image.png

验证是否成功,输入rabbitmqctl status命令,出现如下输出则表明成功且处于启动状态
image.png

第四步:启动RabbitMQ服务器

前往sbin目录,双击rabbitmq-server.bat
image.png
image.png
启动成功

第五步:访问管理页面

访问http://localhost:15672,输入默认账密guest/guest
image.png

image.png

界面介绍

Overview-概览

image.png

统计内容:

  • Connections:连接数
  • Channels:通道数
  • Exchanges:交换机数
  • Queues:队列数
  • Consumers:消费者数

Nodes中包含RabbitMQ节点的进程、内存、配置文件路径等内容
image.png
Ports and contexts中显示RabbitMQ所开启的端口号,amqp是RabbitMQ TCP端口号、clustering是集群端口号、下方还有管控台端口号
image.png
Impot / export definitions 用于导入、导出RabbitMQ的所有配置(建议经常备份配置,以便在版本升级等场景下进行现场还原)
image.png

Connects-连接管理

连接,无论生产者还是消费者,都需要与 RabbitMQ 建立连接后才可以完成消息的生产和消费,在这里可以查看连接情况
image.png

Channels-通道管理

通道,建立连接后,会形成通道,消息的投递与获取所依赖的通道
image.png

Exchanges-交换机管理

交换机,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
image.png

Queues-队列管理

就是消息队列,消息存放在队列中,等待消费,消费后会被移除队列
image.png

Admin-用户管理

可以创建和管理访问管控台的用户,线上环境需要删除默认的guest用户重新创建。
image.png

  • 超级管理员administrator,可以登录控制台,查看所有信息,可以对用户和策略进行操作
  • 监控者monitoring,可以登录控制台,可以查看节点的相关信息,比如进程数,内存磁盘使用情况
  • 策略制定者policymaker ,可以登录控制台,制定策略,但是无法查看节点信息
  • 普通管理员 management 仅能登录控制台
  • 其他, 无法登录控制台,一般指的是提供者和消费者

简单使用

创建用户

先在RabbitMQ新建一个用户mytest,授予管理员角色
image.png
初始创建时没有权限(No access),需要点击名称进行设置
image.png
进入权限设置页面后点击Set permission按钮即可,自动设置全读写权限
image.png
完成后mytest用户信息如下
image.png

引入依赖

  1. <dependency>
  2. <groupId>com.rabbitmq</groupId>
  3. <artifactId>amqp-client</artifactId>
  4. <version>4.1.0</version>
  5. </dependency>

统一工具类

  1. package com.mq.rabbitmq.queue;
  2. import com.rabbitmq.client.Connection;
  3. import com.rabbitmq.client.ConnectionFactory;
  4. import java.io.IOException;
  5. import java.util.concurrent.TimeoutException;
  6. /**
  7. * @description 通用工具类
  8. * @date: 2020-12-10 15:54
  9. */
  10. public class CommonUtil {
  11. private static Connection connection = null;
  12. private static ConnectionFactory connectionFactory = null;
  13. /**
  14. * 获取RabbitMQ连接对象
  15. * @return
  16. */
  17. public static Connection getConnection() throws IOException, TimeoutException {
  18. if(connection!=null){
  19. return connection;
  20. }
  21. if(connectionFactory!=null){
  22. connection = connectionFactory.newConnection();
  23. return connection;
  24. }
  25. // 连接工厂
  26. connectionFactory = new ConnectionFactory();
  27. // 设置RabbitMQ服务器连接信息
  28. factory.setHost("127.0.0.1");
  29. // amqp的端口默认是5672
  30. factory.setPort(5672);
  31. factory.setUsername("mytest");
  32. factory.setPassword("123456");
  33. // rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器
  34. connectionFactory.setVirtualHost("/");
  35. // 创建连接
  36. connection = factory.newConnection();
  37. return connection;
  38. }
  39. }

简单模式

消息生产者

  1. package com.mq.rabbitmq.queue;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. import java.util.Scanner;
  6. /**
  7. * @description rabbitMQ消息队列生产者
  8. * @date: 2020-12-09 15:23
  9. */
  10. public class RabbitMQSender {
  11. // 队列名称
  12. final static String QUEUE_NAME = "queueOne";
  13. public static void main(String[] args) throws Exception {
  14. // 获取连接对象
  15. Connection connection = CommonUtil.getConnection();
  16. // 创建信道
  17. Channel channel = connection.createChannel();
  18. /**
  19. * 声明队列,如果Rabbit中没有此队列将自动创建
  20. * param1:队列名称
  21. * param2:是否持久化
  22. * param3:队列是否独占此连接
  23. * param4:队列不再使用时是否自动删除此队列
  24. * param5:队列参数
  25. * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
  26. */
  27. channel.queueDeclare(QUEUE_NAME,false,false,false,null);
  28. //消息发布
  29. Scanner scanner = new Scanner(System.in);
  30. String message = "";
  31. do{
  32. message = scanner.nextLine();
  33. channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
  34. System.out.println("生产者发送消息:"+message);
  35. }while(!message.equals("bye")); // 当输入bye时结束输入,推出循环
  36. //关闭资源
  37. channel.close();
  38. connection.close();
  39. }
  40. }

消息消费者

  1. package com.mq.rabbitmq.queue;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. /**
  5. * @description rabbitMQ消息队列消费者
  6. * @date: 2020-12-09 16:10
  7. */
  8. public class RabbitMQReceiver {
  9. // 队列名称
  10. final static String QUEUE_NAME = "queueOne";
  11. //消费者消费消息
  12. public static void main(String[] args) throws Exception {
  13. // 获取连接对象
  14. Connection connection = CommonUtil.getConnection();
  15. // 创建信道,每个连接可以创建多个信道,每个信道代表一个会话任务
  16. Channel channel = connection.createChannel();
  17. /**
  18. * 声明队列,如果Rabbit中没有此队列将自动创建
  19. * param1:队列名称
  20. * param2:是否持久化
  21. * param3:队列是否独占此连接
  22. * param4:队列不再使用时是否自动删除此队列
  23. * param5:队列参数
  24. * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
  25. */
  26. channel.queueDeclare(QUEUE_NAME,false,false,false,null);
  27. // 每次取5条消息
  28. channel.basicQos(5);
  29. //定义消费者
  30. DefaultConsumer consumer = new DefaultConsumer(channel){
  31. // 监听消息
  32. @Override
  33. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  34. //消费消息
  35. String msg = new String(body,"utf-8");
  36. System.out.println("consume msg: "+msg);
  37. }
  38. };
  39. //参数1:接收哪个队列的数据
  40. //参数2:消息确认 是否应答,收到消息是否回复
  41. //参数3:消费者
  42. channel.basicConsume(QUEUE_NAME,true,consumer);
  43. //关闭资源(这里一定要注意,别关闭通道【或者加个休眠再停止】,否则还没接收消息就停止程序了)
  44. // channel.close();
  45. // connection.close();
  46. }
  47. }

启动生产者者并发送消息

image.png
队列多了一条消息
image.png

启动消费者消费消息

image.png
队列中没有消息,且多了一个消费者
image.png

此时的概览页
image.png

工作队列模式(Work Queues)

消息生产者

  1. package com.mq.rabbitmq.workqueue;
  2. import com.mq.rabbitmq.CommonUtil;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.Connection;
  5. import java.util.Scanner;
  6. /**
  7. * @description rabbitMQ消息队列生产者
  8. * @date: 2020-12-09 15:23
  9. */
  10. public class WorkQueueSender {
  11. // 队列名称
  12. final static String QUEUE_NAME = "workQueue";
  13. public static void main(String[] args) throws Exception {
  14. // 获取连接对象
  15. Connection connection = CommonUtil.getConnection();
  16. // 创建信道,每个连接可以创建多个信道,每个信道代表一个会话任务
  17. Channel channel = connection.createChannel();
  18. /**
  19. * 声明队列,如果Rabbit中没有此队列将自动创建
  20. * param1:队列名称
  21. * param2:是否持久化
  22. * param3:队列是否独占此连接
  23. * param4:队列不再使用时是否自动删除此队列
  24. * param5:队列参数
  25. * String common, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
  26. */
  27. channel.queueDeclare(QUEUE_NAME,false,false,false,null);
  28. //消息发布
  29. for(int i=1;i<=20;i++){
  30. String message = "第"+i+"条消息";
  31. System.out.println("生产者发送消息:"+message);
  32. channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
  33. }
  34. //关闭资源
  35. channel.close();
  36. connection.close();
  37. }
  38. }

消息消费者(多个)

  1. package com.mq.rabbitmq.workqueue;
  2. import com.mq.rabbitmq.CommonUtil;
  3. import com.rabbitmq.client.*;
  4. import java.io.IOException;
  5. import java.util.concurrent.TimeoutException;
  6. /**
  7. * @description 工作队列模式消费者
  8. * @date: 2020-12-09 16:10
  9. */
  10. public class WorkQueueReceiver {
  11. // 队列名称
  12. final static String QUEUE_NAME = "workQueue";
  13. //消费者消费消息
  14. public static void main(String[] args) throws Exception {
  15. MyConsumerFactory myConsumerFactory = new MyConsumerFactory(QUEUE_NAME);
  16. // 创建多个消费者,消费同一个队列
  17. for(int i=1;i<=5;i++){
  18. myConsumerFactory.createConsumer(i+"号");
  19. }
  20. }
  21. /**
  22. * 自定义消费者工厂类
  23. */
  24. static class MyConsumerFactory{
  25. String queueName; // 目标队列名称
  26. /**
  27. * 初始化消费者
  28. * @param queueName 目标队列名称
  29. */
  30. public MyConsumerFactory(String queueName) {
  31. this.queueName = queueName;
  32. }
  33. /**
  34. * 创建消费者
  35. */
  36. public void createConsumer(final String consumerName){
  37. try {
  38. // 获取连接对象
  39. Connection connection = CommonUtil.getConnection();
  40. // 创建信道,每个连接可以创建多个信道,每个信道代表一个会话任务
  41. Channel channel = connection.createChannel();
  42. /**
  43. * 声明队列,如果Rabbit中没有此队列将自动创建
  44. * param1:队列名称
  45. * param2:是否持久化
  46. * param3:队列是否独占此连接
  47. * param4:队列不再使用时是否自动删除此队列
  48. * param5:队列参数
  49. * String common, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
  50. */
  51. channel.queueDeclare(queueName,false,false,false,null);
  52. // 每次取1条消息
  53. channel.basicQos(1);
  54. //定义消费者
  55. DefaultConsumer consumer = new DefaultConsumer(channel){
  56. // 监听消息
  57. @Override
  58. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  59. //消费消费
  60. String msg = new String(body,"utf-8");
  61. System.out.println("消费者"+consumerName+": "+msg);
  62. // 休眠个1秒钟,将消息让给其他消费者
  63. try {
  64. Thread.sleep(1000);
  65. } catch (InterruptedException e) {
  66. e.printStackTrace();
  67. }
  68. //手动消息确认[如果自动确认消息,速度过快,根本不可能轮得到别的消费者,就一直都是1号消费者在消费]
  69. getChannel().basicAck(envelope.getDeliveryTag(),false);
  70. }
  71. };
  72. //参数1:接收哪个队列的数据
  73. //参数2:消息确认 是否应答,收到消息是否回复
  74. //参数3:消费者
  75. channel.basicConsume(queueName,false,consumer);
  76. } catch (IOException e) {
  77. e.printStackTrace();
  78. } catch (TimeoutException e) {
  79. e.printStackTrace();
  80. }
  81. }
  82. }
  83. }

启动生产者并发送消息

image.png
此时消息队列workQueue有20条消息待消费
image.png
image.png

启动消费者并消费消息

image.png
消息队列workQueue里的消息被消费完了,总共有五个消费者
image.png

发布/订阅模式(Publish/Subscribe)

消息生产者

  1. package com.mq.rabbitmq.PublishAndSubscribe;
  2. import com.mq.rabbitmq.CommonUtil;
  3. import com.rabbitmq.client.BuiltinExchangeType;
  4. import com.rabbitmq.client.Channel;
  5. import com.rabbitmq.client.Connection;
  6. /**
  7. * @description 发布订阅模式生产者
  8. * @date: 2020-12-11 11:18
  9. */
  10. public class Publisher {
  11. // 队列1名称
  12. final static String QUEUE_NAME_ONE = "QUEUE_ONE";
  13. // 队列2名称
  14. final static String QUEUE_NAME_TWO = "QUEUE_TWO";
  15. // 交换机名称
  16. final static String EXCHANGE_NAME = "MY_EXCHANGE";
  17. public static void main(String[] args) throws Exception {
  18. // 获取连接对象
  19. Connection connection = CommonUtil.getConnection();
  20. // 创建信道,每个连接可以创建多个信道,每个信道代表一个会话任务
  21. Channel channel = connection.createChannel();
  22. /**
  23. * 通道绑定交换机
  24. * 1、交换机名称
  25. * 2、交换机类型,fanout、topic、direct、headers
  26. */
  27. //Publish/subscribe发布订阅模式
  28. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
  29. /**
  30. * 声明队列,如果Rabbit中没有此队列将自动创建
  31. * param1:队列名称
  32. * param2:是否持久化
  33. * param3:队列是否独占此连接
  34. * param4:队列不再使用时是否自动删除此队列
  35. * param5:队列参数
  36. * String common, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
  37. */
  38. channel.queueDeclare(QUEUE_NAME_ONE,false,false,false,null); // 通道绑定队列1
  39. channel.queueDeclare(QUEUE_NAME_TWO,false,false,false,null); // 通道绑定队列2
  40. /**
  41. * 交换机和队列绑定
  42. * 1、队列名称
  43. * 2、交换机名称
  44. * 3、路由key
  45. */
  46. //Publish/subscribe发布订阅模式
  47. channel.queueBind(QUEUE_NAME_ONE,EXCHANGE_NAME,""); // 交换机绑定队列1
  48. channel.queueBind(QUEUE_NAME_TWO,EXCHANGE_NAME,""); // 交换机绑定队列2
  49. //消息发布
  50. for(int i=1;i<=10;i++){
  51. String message = "第"+i+"条消息";
  52. System.out.println("生产者发送消息:"+message);
  53. /**
  54. * 消息发布方法
  55. * param1:Exchange的名称,如果没有指定,则使用Default Exchange
  56. * param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列
  57. * param3:消息包含的属性
  58. * param4:消息体
  59. */
  60. channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes());
  61. }
  62. //关闭资源
  63. channel.close();
  64. connection.close();
  65. }
  66. }

消息消费者(多个)

  1. package com.mq.rabbitmq.PublishAndSubscribe;
  2. import com.mq.rabbitmq.CommonUtil;
  3. import com.rabbitmq.client.*;
  4. import java.io.IOException;
  5. import java.util.concurrent.TimeoutException;
  6. /**
  7. * @description 发布订阅模式消费者
  8. * @date: 2020-12-11 11:18
  9. */
  10. public class Subscriber {
  11. // 队列1名称
  12. final static String QUEUE_NAME_ONE = "QUEUE_ONE";
  13. // 队列2名称
  14. final static String QUEUE_NAME_TWO = "QUEUE_TWO";
  15. public static void main(String[] args) {
  16. MyConsumerFactory myConsumerFactory = new MyConsumerFactory();
  17. // 创建队列1的消费者
  18. myConsumerFactory.createConsumer("(队列1)", QUEUE_NAME_ONE);
  19. // 创建队列2的消费者
  20. myConsumerFactory.createConsumer("(队列2)", QUEUE_NAME_TWO);
  21. }
  22. /**
  23. * 自定义消费者工厂类
  24. */
  25. static class MyConsumerFactory{
  26. /**
  27. * 创建消费者
  28. */
  29. public void createConsumer(final String consumerName,String queueName){
  30. try {
  31. // 获取连接对象
  32. Connection connection = CommonUtil.getConnection();
  33. // 创建信道,每个连接可以创建多个信道,每个信道代表一个会话任务
  34. Channel channel = connection.createChannel();
  35. /**
  36. * 声明队列,如果Rabbit中没有此队列将自动创建
  37. * param1:队列名称
  38. * param2:是否持久化
  39. * param3:队列是否独占此连接
  40. * param4:队列不再使用时是否自动删除此队列
  41. * param5:队列参数
  42. * String common, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
  43. */
  44. channel.queueDeclare(queueName,false,false,false,null);
  45. // 每次取1条消息
  46. channel.basicQos(1);
  47. //定义消费者
  48. DefaultConsumer consumer = new DefaultConsumer(channel){
  49. // 监听消息
  50. @Override
  51. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  52. //消费消费
  53. String msg = new String(body,"utf-8");
  54. System.out.println("消费者"+consumerName+" 收到消息: "+msg);
  55. //手动消息确认[如果自动确认消息,速度过快,根本不可能轮得到别的消费者,就一直都是1号消费者在消费]
  56. getChannel().basicAck(envelope.getDeliveryTag(),false);
  57. }
  58. };
  59. //参数1:接收哪个队列的数据
  60. //参数2:消息确认 是否自动确认
  61. //参数3:消费者
  62. channel.basicConsume(queueName,false,consumer);
  63. System.out.println("消费者"+consumerName+"已就位");
  64. } catch (IOException e) {
  65. e.printStackTrace();
  66. } catch (TimeoutException e) {
  67. e.printStackTrace();
  68. }
  69. }
  70. }
  71. }

启动生产者并发送消息

image.png
两个消息队列都有10条待消费消息
image.png

启动消费者并消费消息

image.png
消息队列的消息都被消费完成
image.png

路由模式(Routing)

消息生产者

  1. package com.mq.rabbitmq.routing;
  2. import com.mq.rabbitmq.CommonUtil;
  3. import com.rabbitmq.client.BuiltinExchangeType;
  4. import com.rabbitmq.client.Channel;
  5. import com.rabbitmq.client.Connection;
  6. import java.util.Scanner;
  7. /**
  8. * @description 路由模式生产者
  9. * @auther: lai.guanfu
  10. * @date: 2020-12-11 11:18
  11. */
  12. public class Publisher {
  13. // 声明两个队列和一个交换机
  14. // 队列1名称
  15. final static String QUEUE_NAME_ONE = "QUEUE_ONE";
  16. // 队列2名称
  17. final static String QUEUE_NAME_TWO = "QUEUE_TWO";
  18. // 交换机名称
  19. final static String EXCHANGE_NAME = "MY_ROUTING_EXCHANGE";
  20. public static void main(String[] args) throws Exception {
  21. // 获取连接对象
  22. Connection connection = CommonUtil.getConnection();
  23. // 创建信道,每个连接可以创建多个信道,每个信道代表一个会话任务
  24. Channel channel = connection.createChannel();
  25. /**
  26. * 通道绑定交换机
  27. * 1、交换机名称
  28. * 2、交换机类型,fanout、topic、direct、headers
  29. */
  30. // Routing 路由模式
  31. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
  32. /**
  33. * 声明队列,如果Rabbit中没有此队列将自动创建
  34. * param1:队列名称
  35. * param2:是否持久化
  36. * param3:队列是否独占此连接
  37. * param4:队列不再使用时是否自动删除此队列
  38. * param5:队列参数
  39. * String common, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
  40. */
  41. channel.queueDeclare(QUEUE_NAME_ONE,false,false,false,null); // 通道绑定队列1
  42. channel.queueDeclare(QUEUE_NAME_TWO,false,false,false,null); // 通道绑定队列2
  43. /**
  44. * 交换机和队列绑定
  45. * 1、队列名称
  46. * 2、交换机名称
  47. * 3、路由key
  48. */
  49. // Routing 路由模式
  50. channel.queueBind(QUEUE_NAME_ONE,EXCHANGE_NAME,QUEUE_NAME_ONE); // 交换机绑定队列1
  51. channel.queueBind(QUEUE_NAME_TWO,EXCHANGE_NAME,QUEUE_NAME_TWO); // 交换机绑定队列2
  52. // 消息发布
  53. Scanner scanner = new Scanner(System.in);
  54. do {
  55. System.out.println("请发布内容:");
  56. String message = scanner.nextLine();
  57. System.out.println("请选择目标接收队列(1 队列1,2 队列2):");
  58. int i = scanner.nextInt();
  59. if(i==1){
  60. System.out.println("生产者向队列1发送消息:"+message);
  61. /**
  62. * 消息发布方法
  63. * param1:Exchange的名称,如果没有指定,则使用Default Exchange
  64. * param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列
  65. * param3:消息包含的属性
  66. * param4:消息体
  67. */
  68. channel.basicPublish(EXCHANGE_NAME,QUEUE_NAME_ONE,null,message.getBytes());
  69. }else{
  70. System.out.println("生产者向队列2发送消息:"+message);
  71. /**
  72. * 消息发布方法
  73. * param1:Exchange的名称,如果没有指定,则使用Default Exchange
  74. * param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列
  75. * param3:消息包含的属性
  76. * param4:消息体
  77. */
  78. channel.basicPublish(EXCHANGE_NAME,QUEUE_NAME_TWO,null,message.getBytes());
  79. }
  80. if(message.equals("bye")){
  81. break;
  82. }
  83. scanner.nextLine();
  84. }while(true);
  85. //关闭资源
  86. channel.close();
  87. connection.close();
  88. }
  89. }

消息消费者(多个)

  1. package com.mq.rabbitmq.routing;
  2. import com.mq.rabbitmq.CommonUtil;
  3. import com.rabbitmq.client.*;
  4. import java.io.IOException;
  5. import java.util.concurrent.TimeoutException;
  6. /**
  7. * @description 路由模式消费者
  8. * @date: 2020-12-11 11:18
  9. */
  10. public class Subscriber {
  11. // 队列1名称
  12. final static String QUEUE_NAME_ONE = "QUEUE_ONE";
  13. // 队列2名称
  14. final static String QUEUE_NAME_TWO = "QUEUE_TWO";
  15. public static void main(String[] args) {
  16. MyConsumerFactory myConsumerFactory = new MyConsumerFactory();
  17. // 创建队列1的消费者
  18. myConsumerFactory.createConsumer("(队列1)", QUEUE_NAME_ONE);
  19. // 创建队列2的消费者
  20. myConsumerFactory.createConsumer("(队列2)", QUEUE_NAME_TWO);
  21. }
  22. /**
  23. * 自定义消费者工厂类
  24. */
  25. static class MyConsumerFactory{
  26. /**
  27. * 创建消费者
  28. */
  29. public void createConsumer(final String consumerName,String queueName){
  30. try {
  31. // 获取连接对象
  32. Connection connection = CommonUtil.getConnection();
  33. // 创建信道,每个连接可以创建多个信道,每个信道代表一个会话任务
  34. Channel channel = connection.createChannel();
  35. /**
  36. * 声明队列,如果Rabbit中没有此队列将自动创建
  37. * param1:队列名称
  38. * param2:是否持久化
  39. * param3:队列是否独占此连接
  40. * param4:队列不再使用时是否自动删除此队列
  41. * param5:队列参数
  42. * String common, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
  43. */
  44. channel.queueDeclare(queueName,false,false,false,null);
  45. // 每次取1条消息
  46. channel.basicQos(1);
  47. //定义消费者
  48. DefaultConsumer consumer = new DefaultConsumer(channel){
  49. // 监听消息
  50. @Override
  51. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  52. //消费消费
  53. String msg = new String(body,"utf-8");
  54. System.out.println("消费者"+consumerName+" 收到消息: "+msg);
  55. }
  56. };
  57. //参数1:接收哪个队列的数据
  58. //参数2:消息确认 是否自动确认
  59. //参数3:消费者
  60. channel.basicConsume(queueName,true,consumer);
  61. System.out.println("消费者"+consumerName+"已就位");
  62. } catch (IOException e) {
  63. e.printStackTrace();
  64. } catch (TimeoutException e) {
  65. e.printStackTrace();
  66. }
  67. }
  68. }
  69. }

启动生产者并发送消息

image.png
image.png

启动消费者并消费消息

image.png
image.png

主题模式(Topics)

消息生产者

  1. package com.mq.rabbitmq.topics;
  2. import com.mq.rabbitmq.CommonUtil;
  3. import com.rabbitmq.client.BuiltinExchangeType;
  4. import com.rabbitmq.client.Channel;
  5. import com.rabbitmq.client.Connection;
  6. import java.util.Scanner;
  7. /**
  8. * @description 主题模式生产者
  9. * @date: 2020-12-11 11:18
  10. */
  11. public class TopicPublisher {
  12. // 声明两个队列和一个交换机
  13. // 队列1名称
  14. final static String QUEUE_NAME_ONE = "QUEUE_ONE";
  15. // 队列2名称
  16. final static String QUEUE_NAME_TWO = "QUEUE_TWO";
  17. // 交换机名称
  18. final static String EXCHANGE_NAME = "MY_TOPIC_EXCHANGE";
  19. public static void main(String[] args) throws Exception {
  20. // 获取连接对象
  21. Connection connection = CommonUtil.getConnection();
  22. // 创建信道,每个连接可以创建多个信道,每个信道代表一个会话任务
  23. Channel channel = connection.createChannel();
  24. /**
  25. * 通道绑定交换机
  26. * 1、交换机名称
  27. * 2、交换机类型,fanout、topics、direct、headers
  28. */
  29. // Topics 主题模式
  30. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
  31. /**
  32. * 声明队列,如果Rabbit中没有此队列将自动创建
  33. * param1:队列名称
  34. * param2:是否持久化
  35. * param3:队列是否独占此连接
  36. * param4:队列不再使用时是否自动删除此队列
  37. * param5:队列参数
  38. * String common, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
  39. */
  40. channel.queueDeclare(QUEUE_NAME_ONE,false,false,false,null); // 通道绑定队列1
  41. channel.queueDeclare(QUEUE_NAME_TWO,false,false,false,null); // 通道绑定队列2
  42. /**
  43. * 交换机和队列绑定
  44. * 1、队列名称
  45. * 2、交换机名称
  46. * 3、路由key
  47. */
  48. // Topics 主题模式
  49. // 交换机绑定队列,注意此处的路由key使用#进行匹配,并不和队列名称一致
  50. channel.queueBind(QUEUE_NAME_ONE,EXCHANGE_NAME,"info.#.one.#");
  51. channel.queueBind(QUEUE_NAME_TWO,EXCHANGE_NAME,"info.#.two.#");
  52. // 消息发布
  53. Scanner scanner = new Scanner(System.in);
  54. do {
  55. System.out.println("请发布内容:");
  56. String message = scanner.nextLine();
  57. System.out.println("请输入目标routingKey:");
  58. String routingKey = scanner.nextLine();
  59. System.out.println("生产者发送消息:"+message+",目标routingKey为:"+routingKey);
  60. /**
  61. * 消息发布方法
  62. * param1:Exchange的名称,如果没有指定,则使用Default Exchange
  63. * param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列
  64. * param3:消息包含的属性
  65. * param4:消息体
  66. */
  67. // 指定routingKey由交换机和Binding去分析应该发送给哪个队列
  68. channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes());
  69. if(message.equals("bye")){
  70. break;
  71. }
  72. scanner.nextLine();
  73. }while(true);
  74. //关闭资源
  75. channel.close();
  76. connection.close();
  77. }
  78. }

消息消费者(多个)

  1. package com.mq.rabbitmq.topics;
  2. import com.mq.rabbitmq.CommonUtil;
  3. import com.rabbitmq.client.*;
  4. import java.io.IOException;
  5. import java.util.concurrent.TimeoutException;
  6. /**
  7. * @description 主题模式消费者
  8. * @date: 2020-12-11 11:18
  9. */
  10. public class TopicSubscriber {
  11. // 队列1名称
  12. final static String QUEUE_NAME_ONE = "QUEUE_ONE";
  13. // 队列2名称
  14. final static String QUEUE_NAME_TWO = "QUEUE_TWO";
  15. public static void main(String[] args) {
  16. MyConsumerFactory myConsumerFactory = new MyConsumerFactory();
  17. // 创建队列1的消费者
  18. myConsumerFactory.createConsumer("(队列1)", QUEUE_NAME_ONE);
  19. // 创建队列2的消费者
  20. myConsumerFactory.createConsumer("(队列2)", QUEUE_NAME_TWO);
  21. }
  22. /**
  23. * 自定义消费者工厂类
  24. */
  25. static class MyConsumerFactory{
  26. /**
  27. * 创建消费者
  28. */
  29. public void createConsumer(final String consumerName,String queueName){
  30. try {
  31. // 获取连接对象
  32. Connection connection = CommonUtil.getConnection();
  33. // 创建信道,每个连接可以创建多个信道,每个信道代表一个会话任务
  34. Channel channel = connection.createChannel();
  35. /**
  36. * 声明队列,如果Rabbit中没有此队列将自动创建
  37. * param1:队列名称
  38. * param2:是否持久化
  39. * param3:队列是否独占此连接
  40. * param4:队列不再使用时是否自动删除此队列
  41. * param5:队列参数
  42. * String common, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
  43. */
  44. channel.queueDeclare(queueName,false,false,false,null);
  45. // 每次取1条消息
  46. channel.basicQos(1);
  47. //定义消费者
  48. DefaultConsumer consumer = new DefaultConsumer(channel){
  49. // 监听消息
  50. @Override
  51. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  52. //消费消费
  53. String msg = new String(body,"utf-8");
  54. System.out.println("消费者"+consumerName+" 收到消息: "+msg);
  55. }
  56. };
  57. //参数1:接收哪个队列的数据
  58. //参数2:消息确认 是否自动确认
  59. //参数3:消费者
  60. channel.basicConsume(queueName,true,consumer);
  61. System.out.println("消费者"+consumerName+"已就位");
  62. } catch (IOException e) {
  63. e.printStackTrace();
  64. } catch (TimeoutException e) {
  65. e.printStackTrace();
  66. }
  67. }
  68. }
  69. }

启动生产者并发送消息

image.png
队列1和队列2都收到了两个消息
image.png

点击队列名称可查看队列详情
image.png
image.png

启动消费者并消费消息

队列1和队列2的消费者都各自消费了消息
image.png
image.png

RPC模式

客户端(Client)

  1. package com.mq.rabbitmq.RPC;
  2. import com.mq.rabbitmq.CommonUtil;
  3. import com.rabbitmq.client.*;
  4. import java.io.IOException;
  5. import java.util.Queue;
  6. import java.util.Scanner;
  7. import java.util.UUID;
  8. import java.util.concurrent.ArrayBlockingQueue;
  9. import java.util.concurrent.BlockingQueue;
  10. /**
  11. * @description RPC模式客户端
  12. * @date: 2020-12-11 11:18
  13. */
  14. public class RPCClient {
  15. // 声明队列和交换机
  16. // 队列名称
  17. final static String QUEUE_NAME = "QUEUE_RPC";
  18. // 交换机名称
  19. final static String EXCHANGE_NAME = "MY_RPC_EXCHANGE";
  20. public static void main(String[] args) throws Exception {
  21. // 获取连接对象
  22. Connection connection = CommonUtil.getConnection();
  23. // 创建信道,每个连接可以创建多个信道,每个信道代表一个会话任务
  24. final Channel channel = connection.createChannel();
  25. // 限制:每次最多给一个消费者发送1条消息
  26. channel.basicQos(1);
  27. /**
  28. * 通道绑定交换机
  29. * 1、交换机名称
  30. * 2、交换机类型,fanout、topics、direct、headers
  31. */
  32. // RPC模式
  33. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
  34. /**
  35. * 声明队列,如果Rabbit中没有此队列将自动创建
  36. * param1:队列名称
  37. * param2:是否持久化
  38. * param3:队列是否独占此连接
  39. * param4:队列不再使用时是否自动删除此队列
  40. * param5:队列参数
  41. * String common, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
  42. */
  43. channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 通道绑定队列1
  44. /**
  45. * 交换机和队列绑定
  46. * 1、队列名称
  47. * 2、交换机名称
  48. * 3、路由key
  49. */
  50. // RPC模式
  51. channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,QUEUE_NAME);
  52. // 本次请求唯一标志
  53. final String correlationId = UUID.randomUUID().toString();
  54. // 定义匿名的回调队列[由channel随机生成]
  55. // 这样做的好处是:每个客户端有属于自己的唯一回复队列,生命周期同客户端
  56. String replyToQueue = channel.queueDeclare().getQueue();
  57. // 此处为返回的核心,设置发送消息携带的信息,对方才能知道应该怎么回复
  58. final AMQP.BasicProperties props = new AMQP.BasicProperties
  59. .Builder()
  60. .correlationId(correlationId)// 队列的唯一值认证
  61. .replyTo(replyToQueue)// 回调队列(告诉对方应该回复到哪个队列)
  62. .build();
  63. System.out.println("创建连接,服务端回复队列为:"+replyToQueue);
  64. System.out.println("====================================");
  65. // 发送消息给Server
  66. System.out.println("请输入传送内容:");
  67. Scanner scanner = new Scanner(System.in);
  68. String message = scanner.nextLine();
  69. System.out.println("发送了一条消息:"+message);
  70. /**
  71. * 消息发布方法
  72. * param1:Exchange的名称,如果没有指定,则使用Default Exchange
  73. * param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列
  74. * param3:消息包含的属性
  75. * param4:消息体
  76. */
  77. // 将消息发送到 QUEUE_NAME 指定的队列中
  78. channel.basicPublish(EXCHANGE_NAME, QUEUE_NAME, props, message.getBytes("utf8"));
  79. System.out.println("等待回复中。。。。");
  80. // 监听回调,接收返回来的消息
  81. DefaultConsumer consumer = new DefaultConsumer(channel) {
  82. @Override
  83. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
  84. // 判断回传的消息是否与唯一id一致
  85. if (properties.getCorrelationId().equals(correlationId)) {
  86. System.out.println("收到回复:" + new String(body));
  87. }else{
  88. System.out.println("correlationID未对应上的消息:" + new String(body));
  89. }
  90. // 回复
  91. try {
  92. System.out.println("请输入回复内容:");
  93. Scanner scanner = new Scanner(System.in);
  94. String message = scanner.nextLine();
  95. channel.basicPublish(EXCHANGE_NAME,QUEUE_NAME , props, message.getBytes("utf8"));
  96. System.out.println("发送了一条消息:"+message);
  97. System.out.println("等待回复中。。。。");
  98. } catch (IOException e) {
  99. e.printStackTrace();
  100. }
  101. }
  102. };
  103. // 注册消费监听,监听的是回复队列(Server端会把消息放到这个队列里面)
  104. channel.basicConsume(replyToQueue, true,consumer );
  105. }
  106. }

服务端(Server)

  1. package com.mq.rabbitmq.RPC;
  2. import com.mq.rabbitmq.CommonUtil;
  3. import com.rabbitmq.client.*;
  4. import java.io.IOException;
  5. import java.util.Scanner;
  6. import java.util.concurrent.TimeoutException;
  7. /**
  8. * @description RPC模式服务端
  9. * @date: 2020-12-11 11:18
  10. */
  11. public class RPCServer {
  12. // 队列名称
  13. final static String QUEUE_NAME = "QUEUE_RPC";
  14. // 交换机名称
  15. final static String EXCHANGE_NAME = "MY_RPC_EXCHANGE";
  16. public static void main(String[] args) throws IOException, TimeoutException {
  17. final Channel channel = CommonUtil.getConnection().createChannel();
  18. /**
  19. * 通道绑定交换机
  20. * 1、交换机名称
  21. * 2、交换机类型,fanout、topics、direct、headers
  22. */
  23. // RPC 主题模式
  24. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
  25. /**
  26. * 交换机和队列绑定
  27. * 1、队列名称
  28. * 2、交换机名称
  29. * 3、路由key
  30. */
  31. // RPC模式
  32. channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,QUEUE_NAME);
  33. /**
  34. * 声明队列,如果Rabbit中没有此队列将自动创建
  35. * param1:队列名称
  36. * param2:是否持久化
  37. * param3:队列是否独占此连接
  38. * param4:队列不再使用时是否自动删除此队列
  39. * param5:队列参数
  40. * String common, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
  41. */
  42. channel.queueDeclare(QUEUE_NAME,false,false,false,null); // 通道绑定队列1
  43. // 构造消费者
  44. DefaultConsumer consumer = new DefaultConsumer(channel) {
  45. @Override
  46. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  47. System.out.println("收到消息:"+new String(body)+"----[回复队列:"+properties.getReplyTo()+"]");
  48. AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
  49. //我们在将要回复的消息属性中,放入从客户端传递过来的correlateId
  50. builder.correlationId(properties.getCorrelationId());
  51. AMQP.BasicProperties prop = builder.build();
  52. Scanner scanner = new Scanner(System.in);
  53. System.out.println("请输入回复内容:");
  54. String replyMessage = scanner.nextLine();
  55. /**
  56. * 消息发布方法
  57. * param1:Exchange的名称,如果没有指定,则使用Default Exchange
  58. * param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列
  59. * param3:消息包含的属性
  60. * param4:消息体
  61. */
  62. // 回复消息,将消息放到replyTo指定的队列中
  63. channel.basicPublish("", properties.getReplyTo(), prop, (replyMessage).getBytes());
  64. System.out.println("发送回复:"+replyMessage);
  65. }
  66. };
  67. // 注册消费,监听 QUEUE_NAME 指定的队列,Client端会将消息放到这个队列里面
  68. channel.basicConsume(QUEUE_NAME, true, consumer);
  69. }
  70. }

启动客户端(Client)并发送消息

image.png
客户端消息队列有一条消息待消费,回复队列没有消息
image.png
image.png

启动服务端(Server)并消费消息

Server端收到消息,并回复
image.png
客户端收到来自服务端的回复
image.png
image.png

核心API

从上述代码对各种模式的实现中,我们知道主要的操作都是依靠Channel和Consumer类实现,包括绑定交换器,绑定队列,发送消息,接收消息等。

com.rabbitmq.client.Channel—信道

exchangeDeclare—-声明并绑定注册器

参数解析:

  • exchange:交换器名称
  • type:交换器类型,有direct、fanout、topic、headers四种【见BuiltinExchangeType枚举】
  • durable:是否持久化交换器(true|false),true代表服务器重启会保留下来Exchange。警告:仅设置此选项,不代表消息持久化。即不保证重启后消息还在。
  • autoDelete:是否自动删除交换器(true|false).true表示当已经没有消费者时,服务器是否可以删除该Exchange
  • internal:是否内部交换消息(true|false),true表示交换是内部的,即客户端不能直接发布消息
  • arguments:其他附属属性

    1. /**
    2. * Declare an exchange, via an interface that allows the complete set of
    3. * arguments.
    4. * @see com.rabbitmq.client.AMQP.Exchange.Declare
    5. * @see com.rabbitmq.client.AMQP.Exchange.DeclareOk
    6. * @param exchange the name of the exchange
    7. * @param type the exchange type
    8. * @param durable true if we are declaring a durable exchange (the exchange will survive a server restart)
    9. * @param autoDelete true if the server should delete the exchange when it is no longer in use
    10. * @param internal true if the exchange is internal, i.e. can't be directly
    11. * published to by a client.
    12. * @param arguments other properties (construction arguments) for the exchange
    13. * @return a declaration-confirm method to indicate the exchange was successfully declared
    14. * @throws java.io.IOException if an error is encountered
    15. */
    16. Exchange.DeclareOk exchangeDeclare(String exchange,
    17. BuiltinExchangeType type,
    18. boolean durable,
    19. boolean autoDelete,
    20. boolean internal,
    21. Map<String, Object> arguments) throws IOException;

    queueDeclare—-声明并绑定队列

    参数解析:

  • queue:队列名称

  • durable:是否持久化队列(true|false),true代表服务器重启会保留下该队列
  • exclusive :是否为当前连接的专用队列,在连接断开后,会自动删除该队列
  • autodelete:当没有任何消费者使用时,自动删除该队列
  • arguments:其他附属属性
    1. /**
    2. * Declare a queue
    3. * @see com.rabbitmq.client.AMQP.Queue.Declare
    4. * @see com.rabbitmq.client.AMQP.Queue.DeclareOk
    5. * @param queue the name of the queue
    6. * @param durable true if we are declaring a durable queue (the queue will survive a server restart)
    7. * @param exclusive true if we are declaring an exclusive queue (restricted to this connection)
    8. * @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)
    9. * @param arguments other properties (construction arguments) for the queue
    10. * @return a declaration-confirm method to indicate the queue was successfully declared
    11. * @throws java.io.IOException if an error is encountered
    12. */
    13. Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
    14. Map<String, Object> arguments) throws IOException;

queueBind—-绑定队列与交换器

参数解析:

  • queue:队列名称
  • exchange:交换器名称
  • routingKey:路由key,即指定队列在指定交换器的注册路由
  • arguments:其他附属属性
    1. /**
    2. * Bind a queue to an exchange.
    3. * @see com.rabbitmq.client.AMQP.Queue.Bind
    4. * @see com.rabbitmq.client.AMQP.Queue.BindOk
    5. * @param queue the name of the queue
    6. * @param exchange the name of the exchange
    7. * @param routingKey the routing key to use for the binding
    8. * @param arguments other properties (binding parameters)
    9. * @return a binding-confirm method if the binding was successfully created
    10. * @throws java.io.IOException if an error is encountered
    11. */
    12. Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;

basicPublish—-发布消息

参数解析:

  • exchange:交换器名称
  • routingKey:路由键,#匹配0个或多个单词,*匹配一个单词,在topic exchange做消息转发用
  • mandatory:true:如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,那么会调用basic.return方法将消息返还给生产者。false:出现上述情形broker会直接将消息扔掉
  • immediate:true:如果exchange在将消息route到queue(s)时发现对应的queue上没有消费者,那么这条消息不会放入队列中。当与消息routeKey关联的所有queue(一个或多个)都没有消费者时,该消息会通过basic.return方法返还给生产者。
  • BasicProperties :需要注意的是BasicProperties.deliveryMode,0:不持久化 1:持久化 这里指的是消息的持久化,配合channel(durable=true),queue(durable=true)可以实现,即使服务器宕机,消息仍然保留
  • body:消息内容

简单来说:mandatory标志告诉服务器至少将该消息route到一个队列中,否则将消息返还给生产者;immediate标志告诉服务器如果该消息关联的queue上有消费者,则马上将消息投递给它,如果所有queue都没有消费者,直接把消息返还给生产者,不用将消息入队列等待消费者了。

  1. /**
  2. * Publish a message.
  3. *
  4. * Publishing to a non-existent exchange will result in a channel-level
  5. * protocol exception, which closes the channel.
  6. *
  7. * Invocations of <code>Channel#basicPublish</code> will eventually block if a
  8. * <a href="https://www.rabbitmq.com/alarms.html">resource-driven alarm</a> is in effect.
  9. *
  10. * @see com.rabbitmq.client.AMQP.Basic.Publish
  11. * @see <a href="https://www.rabbitmq.com/alarms.html">Resource-driven alarms</a>
  12. * @param exchange the exchange to publish the message to
  13. * @param routingKey the routing key
  14. * @param mandatory true if the 'mandatory' flag is to be set
  15. * @param immediate true if the 'immediate' flag is to be
  16. * set. Note that the RabbitMQ server does not support this flag.
  17. * @param props other properties for the message - routing headers etc
  18. * @param body the message body
  19. * @throws java.io.IOException if an error is encountered
  20. */
  21. void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)
  22. throws IOException;

basicConsume—-启动一个消息监听

参数解析:

  • queue:队列的名称
  • autoAck:设置是否自动确认。一般设置为false,即不自动确认。
  • consumerTag:消费者标签,用来区分多个消费者。
  • noLocal:设置为true,则表示不能将同一个Connection中生产者发送的消息传送给这个Connection中的消费者。
  • exclusive:设置是否排他。
  • arguments:设置消费者的其他参数
  • deliverCallback:处理推送过来的消息的回调函数。比如DefaultConsumer,重写其中的方法实现我们对消息的处理逻辑。
  • cancelCallback:当消费者取消订阅时的回调函数
  • shutdownSignalCallback:当Connection/Channel关闭时的回调函数
    1. /**
    2. * Start a consumer. Calls the consumer's {@link Consumer#handleConsumeOk}
    3. * method.
    4. * Provide access to <code>basic.deliver</code>, <code>basic.cancel</code>
    5. * and shutdown signal callbacks (which is sufficient
    6. * for most cases). See methods with a {@link Consumer} argument
    7. * to have access to all the application callbacks.
    8. * @param queue the name of the queue
    9. * @param autoAck true if the server should consider messages
    10. * acknowledged once delivered; false if the server should expect
    11. * explicit acknowledgements
    12. * @param consumerTag a client-generated consumer tag to establish context
    13. * @param noLocal True if the server should not deliver to this consumer
    14. * messages published on this channel's connection. Note that the RabbitMQ server does not support this flag.
    15. * @param exclusive true if this is an exclusive consumer
    16. * @param arguments a set of arguments for the consume
    17. * @param deliverCallback callback when a message is delivered
    18. * @param cancelCallback callback when the consumer is cancelled
    19. * @param shutdownSignalCallback callback when the channel/connection is shut down
    20. * @return the consumerTag associated with the new consumer
    21. * @throws java.io.IOException if an error is encountered
    22. * @see com.rabbitmq.client.AMQP.Basic.Consume
    23. * @see com.rabbitmq.client.AMQP.Basic.ConsumeOk
    24. * @since 5.0
    25. */
    26. String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException;

basicAck—-确认消息

参数解析:

  • deliveryTag用来标识信道中投递的消息,它是 一个 64 位的长整型值,最大值是9223372036854775807。RabbitMQ 推送消息给 Consumer 时,会附带一个 Delivery Tag,以便 Consumer 可以在消息确认时告诉 RabbitMQ 到底是哪条消息被确认了。可以从消费消息方法的Envelope对象中获取。
  • multiple:如果false,表示通知 RabbitMQ该deliveryTag消息被确认。如果为true,则将小于deliveryTag之前的所有消息全部确认。
    1. /**
    2. * Acknowledge one or several received
    3. * messages. Supply the deliveryTag from the {@link com.rabbitmq.client.AMQP.Basic.GetOk}
    4. * or {@link com.rabbitmq.client.AMQP.Basic.Deliver} method
    5. * containing the received message being acknowledged.
    6. * @see com.rabbitmq.client.AMQP.Basic.Ack
    7. * @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver}
    8. * @param multiple true to acknowledge all messages up to and
    9. * including the supplied delivery tag; false to acknowledge just
    10. * the supplied delivery tag.
    11. * @throws java.io.IOException if an error is encountered
    12. */
    13. void basicAck(long deliveryTag, boolean multiple) throws IOException;

basicReject—-拒绝消息

参数解析:

  • deliveryTag:用来标识信道中投递的消息
  • requeue:requeue 参数设置为true,则 RabbitMQ 会重新将这条消息存入队列,以便可以发送给下个订阅的消费者;如果 requeue 参数设置为 false,则 RabbitMQ立即会把消息从队列中移除,而不会把它发送给新的消费者。
    1. /**
    2. * Reject a message. Supply the deliveryTag from the {@link com.rabbitmq.client.AMQP.Basic.GetOk}
    3. * or {@link com.rabbitmq.client.AMQP.Basic.Deliver} method
    4. * containing the received message being rejected.
    5. * @see com.rabbitmq.client.AMQP.Basic.Reject
    6. * @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver}
    7. * @param requeue true if the rejected message should be requeued rather than discarded/dead-lettered
    8. * @throws java.io.IOException if an error is encountered
    9. */
    10. void basicReject(long deliveryTag, boolean requeue) throws IOException;

basicNack—-批量拒绝消息

参数解析:

  • deliveryTag:用来标识信道中投递的消息
  • multiple: 如果false,表示通知 RabbitMQ 该deliveryTag消息被拒绝。如果为true,则将小于deliveryTag之前的所有消息全部拒绝。
  • requeue:requeue 参数设置为true,则 RabbitMQ 会重新将这条消息存入队列,以便可以发送给下个订阅的消费者;如果 requeue 参数设置为 false,则 RabbitMQ立即会把消息从队列中移除,而不会把它发送给新的消费者。
    1. /**
    2. * Reject one or several received messages.
    3. *
    4. * Supply the <code>deliveryTag</code> from the {@link com.rabbitmq.client.AMQP.Basic.GetOk}
    5. * or {@link com.rabbitmq.client.AMQP.Basic.GetOk} method containing the message to be rejected.
    6. * @see com.rabbitmq.client.AMQP.Basic.Nack
    7. * @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver}
    8. * @param multiple true to reject all messages up to and including
    9. * the supplied delivery tag; false to reject just the supplied
    10. * delivery tag.
    11. * @param requeue true if the rejected message(s) should be requeued rather
    12. * than discarded/dead-lettered
    13. * @throws java.io.IOException if an error is encountered
    14. */
    15. void basicNack(long deliveryTag, boolean multiple, boolean requeue)
    16. throws IOException;

com.rabbitmq.client.Consumer—消息监听

主要是提供一些待重写的方法

handleDelivery

用来处理队列推送的消息

  1. /**
  2. * Called when a <code><b>basic.deliver</b></code> is received for this consumer.
  3. * @param consumerTag the <i>consumer tag</i> associated with the consumer
  4. * @param envelope packaging data for the message
  5. * @param properties content header data for the message
  6. * @param body the message body (opaque, client-specific byte array)
  7. * @throws IOException if the consumer encounters an I/O error while processing the message
  8. * @see Envelope
  9. */
  10. void handleDelivery(String consumerTag,
  11. Envelope envelope,
  12. AMQP.BasicProperties properties,
  13. byte[] body)
  14. throws IOException;

handleShutdownSignal

当Channel或者Connection关闭的时候会调用。
**

  • consumerTag:消费者标签,用来区分多个消费者。
  • sig:如果抛出异常同时传入
  1. /**
  2. * Called when either the channel or the underlying connection has been shut down.
  3. * @param consumerTag the <i>consumer tag</i> associated with the consumer
  4. * @param sig a {@link ShutdownSignalException} indicating the reason for the shut down
  5. */
  6. void handleShutdownSignal(String consumerTag, ShutdownSignalException sig);

handleConsumeOk

任意Channel#basicConsume调用导致消费者被注册时调用,返回消费者标签

  1. /**
  2. * Called when the consumer is registered by a call to any of the
  3. * {@link Channel#basicConsume} methods.
  4. * @param consumerTag the <i>consumer tag</i> associated with the consumer
  5. */
  6. void handleConsumeOk(String consumerTag);

handleCancelOk

Channel#basicCancel调用导致的订阅取消时被调用。

  1. /**
  2. * Called when the consumer is cancelled by a call to {@link Channel#basicCancel}.
  3. * @param consumerTag the <i>consumer tag</i> associated with the consumer
  4. */
  5. void handleCancelOk(String consumerTag);

handleCancel

除了调用basicCancel的其他原因导致消息被取消时调用。

  1. /**
  2. * Called when the consumer is cancelled for reasons <i>other than</i> by a call to
  3. * {@link Channel#basicCancel}. For example, the queue has been deleted.
  4. * See {@link #handleCancelOk} for notification of consumer
  5. * cancellation due to {@link Channel#basicCancel}.
  6. * @param consumerTag the <i>consumer tag</i> associated with the consumer
  7. * @throws IOException
  8. */
  9. void handleCancel(String consumerTag) throws IOException;

RabbitMQ解决消息丢失

我们知道,消息队列的消息丢失可能发生在三个阶段:

  • 生产者向Broker发送消息的过程中丢失
  • Broker故障丢失
  • 消费者丢失数据

RabbitMQ对这三种情况的预防措施如下:

生产者弄丢了数据—事务机制和Confirm模式

  
生产者将数据发送到rabbitmq的时候,可能因为网络问题导致数据就在半路给搞丢了。

事务机制

RabbitMQ中与事务机制有关的方法有三个:Chanel#txSelect, Chanel#txCommit()以及Chanel#txRollback(), txSelect用于将当前channel设置成transaction模式,txCommit用于提交事务,txRollback用于回滚事务。

在通过txSelect开启事务之后,我们便可以发布消息给broker代理服务器了,如果txCommit提交成功了,则消息一定到达了broker了,如果在txCommit执行之前broker异常崩溃或者由于其他原因抛出异常,这个时候我们便可以捕获异常通过txRollback回滚事务了。

  1. do{
  2. message = scanner.nextLine();
  3. try {
  4. // 开启事务
  5. channel.txSelect();
  6. channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
  7. System.out.println("生产者发送消息:"+message);
  8. channel.txCommit(); // 事务提交
  9. } catch (IOException e) {
  10. try {
  11. channel.txRollback();
  12. } catch (IOException e1) {
  13. e1.printStackTrace();
  14. }
  15. e.printStackTrace();
  16. }
  17. }while(!message.equals("bye")); // 当输入bye时结束输入,推出循环

但是问题是,开启Rabbitmq事务机制,基本上吞吐量会下来,因为太耗性能。

confirm模式(推荐)

在生产者那里设置开启confirm模式之后,每次发送的消息都会分配一个唯一的id,然后如果成功写入了rabbitmq中,rabbitmq会回传一个ack消息,表明这个消息已确认存放。

如果rabbitmq没能处理这个消息,会回调你一个nack接口,告诉你这个消息接收失败,你可以重试。而且你可以结合这个机制自己在内存里维护每个消息id的状态,如果超过一定时间还没接收到这个消息的回调,那么你可以重发。

串行方式

每发送一条消息后,调用waitForConfirms()方法,等待服务器端confirm。

  1. // 开启confirm模式
  2. channel.confirmSelect();
  3. do{
  4. message = scanner.nextLine();
  5. channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
  6. // 等待确认
  7. if(!channel.waitForConfirms()){
  8. System.out.println("消息发送失败。");
  9. break;
  10. }
  11. System.out.println("生产者发送消息:"+message);
  12. }while(!message.equals("bye")); // 当输入bye时结束输入,推出循环

异步方式

提供一个回调方法,服务端confirm了一条或者多条消息后Client端会回调这个方法。
下述代码为channel添加了一个消息确认回调监听器,无论Broker接收消息成功与否,都会触发监听器内部的监听方法,并且不会阻塞channel继续发送消息

  1. // 开启confirm模式
  2. channel.confirmSelect();
  3. // 装载待确认回调的消息id
  4. final SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
  5. // 为channel添加确认回调监听器
  6. channel.addConfirmListener(new ConfirmListener() {
  7. /**
  8. * 消息确认成功的回调函数
  9. */
  10. public void handleAck(long deliveryTag, boolean multiple) throws IOException {
  11. if (multiple) {
  12. // 如果是批量确认,则清除从开始到当前消息的元素
  13. confirmSet.headSet(deliveryTag + 1).clear();
  14. } else {
  15. confirmSet.remove(deliveryTag);
  16. }
  17. System.out.println("消息发送成功");
  18. }
  19. /**
  20. * 消息确认失败的回调函数
  21. */
  22. public void handleNack(long deliveryTag, boolean multiple) throws IOException {
  23. // 这里可以处理消息确认发送失败的逻辑,如重发等
  24. if (multiple) {
  25. confirmSet.headSet(deliveryTag + 1).clear();
  26. } else {
  27. confirmSet.remove(deliveryTag);
  28. }
  29. System.out.println("消息发送失败");
  30. }
  31. });
  32. do{
  33. message = scanner.nextLine();
  34. // 获取即将发送的消息的唯一id
  35. long nextPublishSeqNo = channel.getNextPublishSeqNo();
  36. channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
  37. // 将已发送的消息id加入待确认集合
  38. confirmSet.add(nextPublishSeqNo);
  39. System.out.println("生产者发送消息:"+message);
  40. }while(!message.equals("bye")); // 当输入bye时结束输入,推出循环

事务机制和cnofirm机制最大的不同在于,事务机制是同步的,你提交一个事务之后会阻塞在那儿,但是confirm机制是异步的,你发送个消息之后就可以发送下一个消息,然后那个消息rabbitmq接收了之后会异步回调你一个接口通知你这个消息接收到了。

所以一般在生产者这块避免数据丢失,都是用**confirm模式**的。

Broker弄丢了数据

  为了防止rabbitmq自己弄丢了数据,必须开启rabbitmq的持久化,就是消息写入之后会持久化到磁盘,哪怕是rabbitmq自己挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢。除非极其罕见的是,rabbitmq还没持久化,自己就挂了,可能导致少量数据会丢失的,但是这个概率较小。

  设置持久化有两个步骤,第一个是创建queue的时候将其设置为持久化的,这样就可以保证rabbitmq持久化queue的元数据,但是不会持久化queue里的数据;第二个是发送消息的时候将消息的deliveryMode设置为2,就是将消息设置为持久化的,此时rabbitmq就会将消息持久化到磁盘上去。必须要同时设置这两个持久化才行,rabbitmq哪怕是挂了,再次重启,也会从磁盘上重启恢复queue以及恢复这个queue里的数据。

  而且持久化可以跟生产者那边的confirm机制配合起来,只有消息被持久化到磁盘之后,才会通知生产者ack了,所以哪怕是在持久化到磁盘之前,rabbitmq挂了,数据丢了,生产者收不到ack,你也是可以自己重发的。

队列持久化

  1. /**
  2. * 声明队列,如果Rabbit中没有此队列将自动创建
  3. * param1:队列名称
  4. * param2:是否持久化
  5. * param3:队列是否独占此连接
  6. * param4:队列不再使用时是否自动删除此队列
  7. * param5:队列参数
  8. * String common, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
  9. */
  10. channel.queueDeclare(QUEUE_NAME,true,false,false,null);

消息持久化

  1. /**
  2. * 消息发布方法
  3. * param1:Exchange的名称,如果没有指定,则使用Default Exchange
  4. * param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列
  5. * param3:消息包含的属性,MessageProperties.PERSISTENT_BASIC实际上是一个设置了持久化的BasicProperties对象
  6. * param4:消息体
  7. */
  8. channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_BASIC,message.getBytes());

  若生产者那边的confirm机制未开启的情况下,哪怕是你给rabbitmq开启了持久化机制,也有一种可能,就是这个消息写到了rabbitmq中,但是还没来得及持久化到磁盘上,结果不巧,此时rabbitmq挂了,就会导致内存里的一点点数据会丢失。

消费者弄丢了数据

  主要是因为消费者消费的时候,刚消费到,还没处理,结果进程挂了,比如重启了,那么就尴尬了,rabbitmq认为消息已经被消费了,这数据就丢了。【在Broker端是完全说的过去的】

  这个时候得用rabbitmq提供的ACK机制,简单来说,就是你关闭rabbitmq自动ACK,开启手动ACK,可以通过一个api来调用就行,然后每次你自己代码里确保处理完的时候,再程序里ACK一把。

  1. //定义消费者
  2. DefaultConsumer consumer = new DefaultConsumer(channel){
  3. // 监听消息
  4. @Override
  5. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  6. //消费消费
  7. String msg = new String(body,"utf-8");
  8. System.out.println("consume msg: "+msg);
  9. try {
  10. Thread.sleep(1000);
  11. } catch (InterruptedException e) {
  12. e.printStackTrace();
  13. }
  14. //手动消息确认
  15. /**
  16. * @param1 : 消息标识
  17. * @param2 : 是否批量确认
  18. */
  19. getChannel().basicAck(envelope.getDeliveryTag(),false);
  20. }
  21. };
  22. /**
  23. * 注册监听
  24. * @param1 :接收哪个队列的数据
  25. * @param2 :消息确认 是否应答,收到消息是否回复
  26. * @param3 :消费者
  27. */
  28. channel.basicConsume(QUEUE_NAME,false,consumer);

这样的话,如果你还没处理完,不就没有ack?那rabbitmq就认为你还没处理完,这个时候rabbitmq会把这个消费分配给别的consumer去处理,消息是不会丢的。

RabbitMQ解决重复消费

如上述场景,当消费端处理完成,打算返回ACK给Broker的时候,进程被干掉了,这就会导致Broker认为这条消息还没有被消费成功,重新派发消息给消费端去处理,这就是消息重复消费的一种原因

解决方案:保证消费者幂等性,重复消费也没有关系。

幂等性,通俗点说,就一个数据,或者一个请求,给你重复来多次,你得确保对应的数据是不会改变的,不能出错。
**
比如:一条数据重复出现两次,数据库里就只有一条数据,这就保证了系统的幂等性。

保证消费者消费的幂等性需要结合业务来分析:

  • 比如你拿个数据要写库,你先根据主键查一下,如果这数据都有了,你就别插入了,update 一下好吧。
  • 比如你是写 Redis,那没问题了,反正每次都是 set,天然幂等性。
  • 比如你不是上面两个场景,那做的稍微复杂一点,你需要让生产者发送每条数据的时候,里面加一个全局唯一的 id,类似订单 id 之类的东西,然后你这里消费到了之后,先根据这个 id 去比如 Redis 里查一下,之前消费过吗?如果没有消费过,你就处理,然后这个 id 写Redis。如果消费过了,那你就别处理了,保证别重复处理相同的消息即可。

    1. //定义消费者
    2. DefaultConsumer consumer = new DefaultConsumer(channel){
    3. // 监听消息
    4. @Override
    5. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    6. //消息
    7. String msg = new String(body,"utf-8");
    8. System.out.println("consume msg: "+msg);
    9. // 前置判断是否已经消费过了
    10. // consumedMap代表在redis缓存中存放的已消费的消息
    11. Map<Long,Object> consumedMap = redisHelper.getMap("consumedMap");
    12. if(consumedMap.containsKey(envelope.getDeliveryTag())){
    13. // 如果已经消费过,啥都不做,直接确认
    14. //手动消息确认
    15. /**
    16. * @param1 : 消息标识
    17. * @param2 : 是否批量确认
    18. */
    19. getChannel().basicAck(envelope.getDeliveryTag(),false);
    20. }
    21. try {
    22. // 此处模拟业务处理
    23. Thread.sleep(1000);
    24. } catch (InterruptedException e) {
    25. e.printStackTrace();
    26. }
    27. //手动消息确认
    28. /**
    29. * @param1 : 消息标识
    30. * @param2 : 是否批量确认
    31. */
    32. getChannel().basicAck(envelope.getDeliveryTag(),false);
    33. // 加入已消费缓存
    34. consumedMap.put(envelope.getDeliveryTag(),body);
    35. }
    36. };