应用安装

1.从官网 www.rabbitmq.com 下载rabbitmq版本以及对应版本支持的erlang
2.安装erlang, 再安装rabbitmq
3.启用管理插件
rabbitmq-plugins enable rabbitmq_management

rabbitmq核心配置文件
vim usr/lib/rabbitmq/lib/rabbitmq_server-3.7.7/ebin/rabbit.app 这个暂不做更改
vim /etc/rabbitmq/rabbitmq.conf
推荐配置如下:
参数参考: https://www.jianshu.com/p/294e0fde0676

  1. management.listener.port = 30005 --管理界面端口
  2. listeners.tcp.default = 30004 --tcp监听端口(默认为5672)
  3. loopback_users.guest = false --guest用户允许使用ip登录(默认只能localhost登录)
  4. heartbeat = 600 --心跳检测时间(若客户端重新设置则以客户端为准)

相关命令

  • 解压tar包: tar -zxvf xxxx
  • 安装rpm软件: rpm -ivh xxxx
  • 查找软件是否安装: rpm -qa|grep xxxx
  • 查看软件安装位置: rpm -ql xxxx
  • 卸载软件: rpm -e xxxx

系统指令: systemctl start/status/stop rabbitmq-server
rabbitmqctl命令

  1. 启动/状态/关闭 rabbitmqctl start_app/status/stop_app
  2. 用户相关
  3. rabbitmqctl add_user username password 添加用户
  4. rabbitmqctl list_users 列出所有用户
  5. rabbitmqctl delete_user username 删除用户
  6. rabbitmqctl clear_permissions -p vhostpath username 清除用户权限
  7. rabbitmqctl list_user_permissions username 列出用户权限
  8. rabbitmqctl change_password username newpassword 修改密码
  9. rabbitmqctl set_permissions -p vhostname username ".*"".*"".*" 设置用户权限
  10. 虚拟主机相关
  11. rabbitmqctl add_vhost vhostpath 创建虚拟主机
  12. rabbitmqctl list_vhosts 列出所有虚拟主机
  13. rabbitmqctl list_permissions -p vhostpath 列出虚拟主机上所有权限
  14. rabbitmqctl delete_vhost vhostpath 删除虚拟主机

AMQP协议

参考: https://blog.csdn.net/weixin_37641832/article/details/83270778
rabbitmq中文文档: http://rabbitmq.mr-ping.com/

AMQP概念

AMQP(Advanced Message Queuing Protocol,高级消息队列协议)是一个进程间传递异步消息的网络协议。
RabbitMQ基础(1) - 图1
理解
1、发布者、交换机、队列、消费者都可以有多个。同时因为 AMQP 是一个网络协议,所以这个过程中的发布者,消费者,消息代理 可以分别存在于不同的设备上。
2、发布者发布消息时可以给消息指定各种消息属性(Message Meta-data)。有些属性有可能会被消息代理(Brokers)使用,然而其他的属性则是完全不透明的,它们只能被接收消息的应用所使用。
3、从安全角度考虑,网络是不可靠的,又或是消费者在处理消息的过程中意外挂掉,这样没有处理成功的消息就会丢失。基于此原因,AMQP 模块包含了一个消息确认(Message Acknowledgements)机制:当一个消息从队列中投递给消费者后,不会立即从队列中删除,直到它收到来自消费者的确认回执(Acknowledgement)后,才完全从队列中删除。
4、在某些情况下,例如当一个消息无法被成功路由时(无法从交换机分发到队列),消息或许会被返回给发布者并被丢弃。或者,如果消息代理执行了延期操作,消息会被放入一个所谓的死信队列中。此时,消息发布者可以选择某些参数来处理这些特殊情况。


Exchange交换机

交换机是用来发送消息的 AMQP 实体。
交换机拿到一个消息之后将它路由给一个或零个队列。
它使用哪种路由算法是由交换机类型和绑定(Bindings)规则所决定的。
AMQP 0-9-1 的代理提供了四种交换机:
RabbitMQ基础(1) - 图2


默认交换机

默认交换机(default exchange)实际上是一个由消息代理预先声明好的没有名字(名字为空字符串)的直连交换机(direct exchange)。
每个新建队列(queue)都会自动绑定到默认交换机上,绑定的路由键(routing key)名称与队列名称相同。


直连交换机

直连型交换机(direct exchange)是根据消息携带的路由键(routing key)将消息投递给对应绑定键的队列。直连交换机用来处理消息的单播路由(unicast routing)(尽管它也可以处理多播路由)。下边介绍它是如何工作的:

1)将一个队列绑定到某个交换机上时,赋予该绑定一个绑定键(Binding Key),假设为R;
2)当一个携带着路由键(Routing Key)为R的消息被发送给直连交换机时,交换机会把它路由给绑定键为R的队列。

直连交换机的队列通常是循环分发任务给多个消费者(我们称之为轮询)。比如说有3个消费者,4个任务。分别分发每个消费者一个任务后,第4个任务又分发给了第一个消费者。综上,我们很容易得出一个结论,在 AMQP 0-9-1 中,消息的负载均衡是发生在消费者(consumer)之间的,而不是队列(queue)之间。

直连型交换机图例:
RabbitMQ基础(1) - 图3

当生产者(P)发送消息时 Rotuing key=booking 时,这时候将消息传送给 Exchange,Exchange 获取到生产者发送过来消息后,会根据自身的规则进行与匹配相应的 Queue,这时发现 Queue1 和 Queue2 都符合,就会将消息传送给这两个队列。

如果我们以 Rotuing key=create 和 Rotuing key=confirm 发送消息时,这时消息只会被推送到 Queue2 队列中,其他 Routing Key 的消息将会被丢弃。


扇型交换机

扇型交换机(funout exchange)将消息路由给绑定到它身上的所有队列,而不理会绑定的路由键。如果 N 个队列绑定到某个扇型交换机上,当有消息发送给此扇型交换机时,交换机会将消息的拷贝分别发送给这所有的 N 个队列。扇型用来交换机处理消息的广播路由(broadcast routing)。

扇型交换机图例:

RabbitMQ基础(1) - 图4

上图所示,生产者(P)生产消息 1 将消息 1 推送到 Exchange,由于 Exchange Type=fanout 这时候会遵循 fanout 的规则将消息推送到所有与它绑定 Queue,也就是图上的两个 Queue 最后两个消费者消费。


主题交换机

前面提到的 direct 规则是严格意义上的匹配,换言之 Routing Key 必须与 Binding Key 相匹配的时候才将消息传送给 Queue.而Topic 的路由规则是一种模糊匹配,可以通过通配符满足一部分规则就可以传送。

它的约定是:

1)binding key 中可以存在两种特殊字符 “” 与“#”,用于做模糊匹配,其中 “” 用于匹配一个单词,“#”用于匹配多个单词(可以是零个)

2)routing key 为一个句点号 “.” 分隔的字符串(我们将被句点号 “. ” 分隔开的每一段独立的字符串称为一个单词),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”
binding key 与 routing key 一样也是句点号 “.” 分隔的字符串

主题交换机图例:
RabbitMQ基础(1) - 图5

当生产者发送消息 Routing Key=F.C.E 的时候,这时候只满足 Queue1,所以会被路由到 Queue 中,如果 Routing Key=A.C.E 这时候会被同是路由到 Queue1 和 Queue2 中,如果 Routing Key=A.F.B 时,这里只会发送一条消息到 Queue2 中。


头交换机

headers 类型的 Exchange 不依赖于 routing key 与 binding key 的匹配规则来路由消息,而是根据发送的消息内容中的 headers 属性进行匹配。

头交换机可以视为直连交换机的另一种表现形式。但直连交换机的路由键必须是一个字符串,而头属性值则没有这个约束,它们甚至可以是整数或者哈希值(字典)等。灵活性更强(但实际上我们很少用到头交换机)。工作流程:

1)绑定一个队列到头交换机上时,会同时绑定多个用于匹配的头(header)。
2)传来的消息会携带header,以及会有一个 “x-match” 参数。当 “x-match” 设置为 “any” 时,消息头的任意一个值被匹配就可以满足条件,而当 “x-match” 设置为 “all” 的时候,就需要消息头的所有值都匹配成功。
image.png


Queue队列

AMQP 中的队列(queue)跟其他消息队列或任务队列中的队列是很相似的:它们存储着即将被应用消费掉的消息。
队列跟交换机共享某些属性,但是队列也有一些另外的属性。

队列属性:

  • Name
  • Durable(消息代理重启后,队列依旧存在)
  • Exclusive(只被一个连接(connection)使用,而且当连接关闭后队列即被删除)
  • Auto-delete(当最后一个消费者退订后即被删除)
  • Arguments(一些消息代理用他来完成类似与 TTL 的某些额外功能)

队列创建
队列在声明(declare)后才能被使用。如果一个队列尚不存在,声明一个队列会创建它。如果声明的队列已经存在,并且属性完全相同,那么此次声明不会对原有队列产生任何影响。如果声明中的属性与已存在队列的属性有差异,那么一个错误代码为 406 的通道级异常就会被抛出

队列持久化
持久化队列(Durable queues)会被存储在磁盘上,当消息代理(broker)重启的时候,它依旧存在。没有被持久化的队列称作暂存队列(Transient queues)。并不是所有的场景和案例都需要将队列持久化。

持久化的队列并不会使得路由到它的消息也具有持久性。倘若消息代理挂掉了,重新启动,那么在重启的过程中持久化队列会被重新声明,无论怎样,只有经过持久化的消息才能被重新恢复。


消息机制

消息确认:
AMQP 0-9-1 规范给我们两种建议:
1)自动确认模式:当消息代理(broker)将消息发送给应用后立即删除。(使用 AMQP 方法:basic.deliver 或 basic.get-ok))
2)显式确认模式:待应用(application)发送一个确认回执(acknowledgement)后再删除消息。(使用 AMQP 方法:basic.ack)

拒绝消息:
当一个消费者接收到某条消息后,处理过程有可能成功,有可能失败。应用可以向消息代理表明,本条消息由于 “拒绝消息(Rejecting Messages)” 的原因处理失败了(或者未能在此时完成)。

当拒绝某条消息时,应用可以告诉消息代理如何处理这条消息——销毁它或者重新放入队列。

当此队列只有一个消费者时,请确认不要由于拒绝消息并且选择了重新放入队列的行为而引起消息在同一个消费者身上无限循环的情况发生。

在 AMQP 中,basic.reject 方法用来执行拒绝消息的操作。但 basic.reject 有个限制:你不能使用它决绝多个带有确认回执(acknowledgements)的消息。但是如果你使用的是 RabbitMQ,那么你可以使用被称作 negative acknowledgements(也叫 nacks)的 AMQP 0-9-1 扩展来解决这个问题。

预取消息
在多个消费者共享一个队列的案例中,明确指定在收到下一个确认回执前每个消费者一次可以接受多少条消息是非常有用的。这可以在试图批量发布消息的时候起到简单的负载均衡和提高消息吞吐量的作用。注意,RabbitMQ 只支持通道级的预取计数,而不是连接级的或者基于大小的预取。

消息属性

  • Content type(内容类型)
  • Content encoding(内容编码)
  • Routing key(路由键)
  • Delivery mode (persistent or not)
  • 投递模式(持久化 或 非持久化)
  • Message priority(消息优先权)
  • Message publishing timestamp(消息发布的时间戳)
  • Expiration period(消息有效期)
  • Publisher application id(发布应用的 ID)

消息主体
AMQP 的消息除属性外,也含有一个有效载荷 - Payload(消息实际携带的数据),它被 AMQP 代理当作不透明的字节数组来对待。

消息代理不会检查或者修改有效载荷。消息可以只包含属性而不携带有效载荷。它通常会使用类似 JSON 这种序列化的格式数据,为了节省,协议缓冲器和 MessagePack 将结构化数据序列化,以便以消息的有效载荷的形式发布。AMQP 及其同行者们通常使用 “content-type” 和 “content-encoding” 这两个字段来与消息沟通进行有效载荷的辨识工作,但这仅仅是基于约定而已。

消息持久化
消息能够以持久化的方式发布,AMQP 代理会将此消息存储在磁盘上。如果服务器重启,系统会确认收到的持久化消息未丢失。

简单地将消息发送给一个持久化的交换机或者路由给一个持久化的队列,并不会使得此消息具有持久化性质:它完全取决与消息本身的持久模式(persistence mode)。将消息以持久化方式发布时,会对性能造成一定的影响(就像数据库操作一样,健壮性的存在必定造成一些性能牺牲)。


其他

连接
AMQP 连接通常是长连接。AMQP 是一个使用 TCP 提供可靠投递的应用层协议。AMQP 使用认证机制并且提供 TLS(SSL)保护。当一个应用不再需要连接到 AMQP 代理的时候,需要优雅的释放掉 AMQP 连接,而不是直接将 TCP 连接关闭。

通道
有些应用需要与 AMQP 代理建立多个连接。无论怎样,同时开启多个 TCP 连接都是不合适的,因为这样做会消耗掉过多的系统资源并且使得防火墙的配置更加困难。AMQP 0-9-1 提供了通道(channels)来处理多连接,可以把通道理解成共享一个 TCP 连接的多个轻量化连接。

在涉及多线程 / 进程的应用中,为每个线程 / 进程开启一个通道(channel)是很常见的,并且这些通道不能被线程 / 进程共享。

一个特定通道上的通讯与其他通道上的通讯是完全隔离的,因此每个 AMQP 方法都需要携带一个通道号,这样客户端就可以指定此方法是为哪个通道准备的。

虚拟主机
为了在一个单独的代理上实现多个隔离的环境(用户、用户组、交换机、队列 等),AMQP 提供了一个虚拟主机(virtual hosts - vhosts)的概念。这跟 Web servers 虚拟主机概念非常相似,这为 AMQP 实体提供了完全隔离的环境。当连接被建立的时候,AMQP 客户端来指定使用哪个虚拟主机。


Rabbitmq基础回顾

官方文档: https://www.rabbitmq.com/getstarted.html

基本概念

rabbitmq基本结构
image.png

  • Broker:消息队列服务进程,此进程包括两个部分:Exchange和Queue。
  • Exchange:消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过虑。
  • Queue:消息队列,存储消息的队列,消息到达队列并转发给指定的消费方。
  • Producer:消息生产者,即生产方客户端,生产方客户端将消息发送到MQ。
  • Consumer:消息消费者,即消费方客户端,接收MQ转发的消息。
  • connection是生产者或消费者与broker的一个TCP连接
  • channel是在建立在 Connection 之上的虚拟连接

模式

参考: https://www.rabbitmq.com/getstarted.html

Work queues 工作队列模式

image.png
多个消费端共同消费同一个队列中的消息。
1、一条消息只会被一个消费者接收;
2、rabbit采用轮询的方式将消息是平均发送给消费者的;
3、消费者在处理完某条消息后,才会收到下一条消息。

Publish/subscribe 发布订阅模式

image.png
每个消费者监听自己的队列,生产者将消息发给broker, 由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息

Routing 路由模式

image.png
每个消费者监听自己的队列,并且设置routingkey, 生产者将消息发给broker,由交换机根据routingkey来转发消息到指定的队列

Topics 通配符模式

image.png
1、每个消费者监听自己的队列,并且设置带统配符的routingkey。
2、生产者将消息发给broker,由交换机根据routingkey来转发消息到指定的队列。
*可以代替一个单词。
#可以替代零个或多个单词。

RPC模式

image.png
RPC即客户端远程调用服务端的方法 ,使用MQ可以实现RPC的异步调用


消息如何保障100%的投递成功

生产端的可靠性投递
  • 保障消息的成功发出
  • 保障mq节点的成功接收
  • 发送端收到mq节点的确认应答
  • 完善的消息补偿机制

互联网大厂的解决方案:

  • 消息落库, 对消息状态进行打标

image.png

  • 消息的延迟投递, 做二次确认, 回调检查

image.png

消费端的幂等性

唯一ID+指纹码机制
利用Redis原子特性实现


Java Client快速入门

文档: https://www.rabbitmq.com/api-guide.html#exchanges-and-queues
依赖

  1. <dependency>
  2. <groupId>com.rabbitmq</groupId>
  3. <artifactId>amqp-client</artifactId>
  4. <version>5.8.0</version>
  5. </dependency>

Hello Rabbitmq示例

Producer

  1. package top.xinzhang0618.consumer.quick.start;
  2. import com.rabbitmq.client.AMQP;
  3. import com.rabbitmq.client.AMQP.BasicProperties;
  4. import com.rabbitmq.client.AMQP.BasicProperties.Builder;
  5. import com.rabbitmq.client.Channel;
  6. import com.rabbitmq.client.Connection;
  7. import com.rabbitmq.client.ConnectionFactory;
  8. import java.io.IOException;
  9. import java.util.HashMap;
  10. import java.util.concurrent.TimeoutException;
  11. /**
  12. * Producer
  13. *
  14. * @author xinzhang
  15. * @author Shenzhen Greatonce Co Ltd
  16. * @version 2020/2/29
  17. * 文档: https://www.rabbitmq.com/api-guide.html
  18. */
  19. public class Producer {
  20. public static void main(String[] args) throws IOException, TimeoutException {
  21. ConnectionFactory connectionFactory = new ConnectionFactory();
  22. connectionFactory.setHost("139.9.62.232");
  23. connectionFactory.setPort(30004);
  24. connectionFactory.setVirtualHost("xinzhang");
  25. connectionFactory.setUsername("xinzhang");
  26. connectionFactory.setPassword("Xinzhang123");
  27. Connection connection = connectionFactory.newConnection();
  28. Channel channel = connection.createChannel();
  29. String exchangeName = "xztest";
  30. channel.exchangeDeclare(exchangeName, "topic", true);
  31. String routingKey = "test";
  32. HashMap<String, Object> map = new HashMap<>();
  33. map.put("1", "测试参数1");
  34. BasicProperties properties = new Builder()
  35. .deliveryMode(2)
  36. .contentEncoding("utf-8")
  37. .expiration("15000")
  38. .headers(map)
  39. .build();
  40. String message = "hello! rabbitmq! 2020-02-29";
  41. for (int i = 0; i < 5; i++) {
  42. channel.basicPublish(exchangeName, routingKey, properties, message.getBytes());
  43. System.out.println("发送消息: =====>" + message);
  44. }
  45. channel.close();
  46. connection.close();
  47. }
  48. }

Consumer

  1. package top.xinzhang0618.consumer.quick.start;
  2. import com.rabbitmq.client.AMQP.BasicProperties;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.Connection;
  5. import com.rabbitmq.client.ConnectionFactory;
  6. import com.rabbitmq.client.DefaultConsumer;
  7. import com.rabbitmq.client.Delivery;
  8. import com.rabbitmq.client.Envelope;
  9. import java.io.IOException;
  10. import java.util.Queue;
  11. import java.util.concurrent.TimeoutException;
  12. /**
  13. * Consumer
  14. *
  15. * @author xinzhang
  16. * @author Shenzhen Greatonce Co Ltd
  17. * @version 2020/2/29
  18. * 文档: https://www.rabbitmq.com/api-guide.html
  19. */
  20. public class Consumer {
  21. public static void main(String[] args) throws IOException, TimeoutException {
  22. ConnectionFactory connectionFactory = new ConnectionFactory();
  23. connectionFactory.setHost("139.9.62.232");
  24. connectionFactory.setPort(30004);
  25. connectionFactory.setVirtualHost("xinzhang");
  26. connectionFactory.setUsername("xinzhang");
  27. connectionFactory.setPassword("Xinzhang123");
  28. Connection connection = connectionFactory.newConnection();
  29. Channel channel = connection.createChannel();
  30. String exchangeName = "xztest";
  31. channel.exchangeDeclare(exchangeName, "topic", true);
  32. String queueName = "xztest01";
  33. channel.queueDeclare(queueName, true, false, false, null);
  34. String routingKey = "test";
  35. channel.queueBind(queueName, exchangeName, routingKey);
  36. channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
  37. @Override
  38. public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
  39. throws IOException {
  40. System.out.println("接收到消息: ====>" + new String(body));
  41. System.out.println("交换机以及路由为: " + envelope.getExchange() + "---" + envelope.getRoutingKey());
  42. System.out.println("过期时间以及携带自定义参数为: " + properties.getExpiration() + "===" + properties.getHeaders().get("1"));
  43. }
  44. });
  45. //这里关闭了代码就结束了, 回调线程也结束, 会看不到控制台输出
  46. // channel.close();
  47. // connection.close();
  48. }
  49. }

Confirm确认消息

消息的确认是指生产者投递消息过后, 如果broker收到消息则会给生产者一个应答; 生产者进行接收应答, 用来确认这条消息是否正常的发送到broker

关联配置(在生产端配置):
1.channel.confirmSelect();
2. channel.addConfirmListener(new ConfirmListener() {…})

  1. // 打开确认模式
  2. channel.confirmSelect();
  3. channel.addConfirmListener(new ConfirmListener() {...})

image.png


消费者应答(ACK)和发布者确认(Confirm)

参考:
https://blog.csdn.net/cadem/article/details/69627523
https://blog.bossma.cn/rabbitmq/consumer-ack-and-publisher-confirm/

  • rabbitmq-server也成为Broker
  • AMQP协议定义的确认(acknowledgement)是从consumer到mq的确认, 表示一条消息已经被客户端正确处理RabbitMQ扩展了AMQP协议,定义了从broker到publisher的”确认”,但将其称之为confirm。所以RabbitMQ的确认有2种,叫不同的名字,一个consumer acknowledgement,一个叫publisher confirm。
  • consumer ACK是通过basic.ack实现, 默认开启, 可以在basic.consume中指定关闭
  • publishConfirm是通过复用basic.ack方法实现, 默认关闭, 可以设置channel.confirmSelect开启

Return消息机制

Return Listener用于处理一些不可路由的消息
(在发送消息时, 当前的exchange不存在或者指定的routingKey路由不到)

关联配置(在生产端配置):
1.Mandatory: 为true时, 监听器接收路由不可达消息, 为false则broker端自动删除该消息
2.channel.addReturnListener(new ReturnListener() {…})

  1. // 第三个参数mandatory设为true, 监听不可达消息
  2. channel.basicPublish(exchangeName, routingKey2, true,null, message.getBytes());
  3. channel.addReturnListener(new ReturnListener() {...})

image.png


Confirm/Return消息机制示例

Producer

  1. package top.xinzhang0618.consumer.confirm;
  2. import com.rabbitmq.client.AMQP.BasicProperties;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.ConfirmListener;
  5. import com.rabbitmq.client.Connection;
  6. import com.rabbitmq.client.ConnectionFactory;
  7. import com.rabbitmq.client.ReturnListener;
  8. import java.io.IOException;
  9. import java.util.concurrent.TimeoutException;
  10. /**
  11. * Producer
  12. *
  13. * @author xinzhang
  14. * @author Shenzhen Greatonce Co Ltd
  15. * @version 2020/2/29
  16. * 文档: https://www.rabbitmq.com/api-guide.html
  17. */
  18. public class Producer {
  19. public static void main(String[] args) throws IOException, TimeoutException {
  20. ConnectionFactory connectionFactory = new ConnectionFactory();
  21. connectionFactory.setHost("139.9.62.232");
  22. connectionFactory.setPort(30004);
  23. connectionFactory.setVirtualHost("xinzhang");
  24. connectionFactory.setUsername("xinzhang");
  25. connectionFactory.setPassword("Xinzhang123");
  26. Connection connection = connectionFactory.newConnection();
  27. Channel channel = connection.createChannel();
  28. // 打开确认模式
  29. channel.confirmSelect();
  30. String exchangeName = "xztest_confirm";
  31. channel.exchangeDeclare(exchangeName, "topic", true);
  32. String routingKey = "test.confirm.save";
  33. String routingKey2 = "return";
  34. String message = "发送确认消息! 2020-02-29";
  35. channel.basicPublish(exchangeName, routingKey, null, message.getBytes());
  36. System.out.println("发送消息-confirm: =====>" + message);
  37. // 第三个参数mandatory设为true, 监听不可达消息
  38. channel.basicPublish(exchangeName, routingKey2, true,null, message.getBytes());
  39. System.out.println("发送消息-return: =====>" + message);
  40. channel.addConfirmListener(new ConfirmListener() {
  41. @Override
  42. public void handleAck(long l, boolean b) throws IOException {
  43. System.out.println("------------ack------------");
  44. }
  45. @Override
  46. public void handleNack(long l, boolean b) throws IOException {
  47. System.out.println("------------no---ack------------");
  48. }
  49. });
  50. channel.addReturnListener(new ReturnListener() {
  51. @Override
  52. public void handleReturn(int i, String s, String s1, String s2, BasicProperties basicProperties, byte[] bytes)
  53. throws IOException {
  54. System.out.println("------------return------------");
  55. System.out.println("replyCode: " + i);
  56. System.out.println("replyText: " + s);
  57. System.out.println("exchange: " + s1);
  58. System.out.println("routingKey: " + s2);
  59. System.out.println("msg: " + new String(bytes));
  60. }
  61. });
  62. // channel.close();
  63. // connection.close();
  64. }
  65. }

Consumer

  1. package top.xinzhang0618.consumer.confirm;
  2. import com.rabbitmq.client.AMQP.BasicProperties;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.Connection;
  5. import com.rabbitmq.client.ConnectionFactory;
  6. import com.rabbitmq.client.DefaultConsumer;
  7. import com.rabbitmq.client.Envelope;
  8. import java.io.IOException;
  9. import java.util.concurrent.TimeoutException;
  10. /**
  11. * Consumer
  12. *
  13. * @author xinzhang
  14. * @author Shenzhen Greatonce Co Ltd
  15. * @version 2020/2/29
  16. * 文档: https://www.rabbitmq.com/api-guide.html
  17. */
  18. public class Consumer {
  19. public static void main(String[] args) throws IOException, TimeoutException {
  20. ConnectionFactory connectionFactory = new ConnectionFactory();
  21. connectionFactory.setHost("139.9.62.232");
  22. connectionFactory.setPort(30004);
  23. connectionFactory.setVirtualHost("xinzhang");
  24. connectionFactory.setUsername("xinzhang");
  25. connectionFactory.setPassword("Xinzhang123");
  26. Connection connection = connectionFactory.newConnection();
  27. Channel channel = connection.createChannel();
  28. String exchangeName = "xztest_confirm";
  29. channel.exchangeDeclare(exchangeName, "topic", true);
  30. String queueName = "xztest02";
  31. channel.queueDeclare(queueName, true, false, false, null);
  32. String routingKey = "test.#";
  33. channel.queueBind(queueName, exchangeName, routingKey);
  34. channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
  35. @Override
  36. public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
  37. throws IOException {
  38. System.out.println("接收到消息: ====>" + new String(body));
  39. System.out.println("交换机以及路由为: " + envelope.getExchange() + "---" + envelope.getRoutingKey());
  40. }
  41. });
  42. //这里关闭了代码就结束了, 回调线程也结束, 会看不到控制台输出
  43. // channel.close();
  44. // connection.close();
  45. }
  46. }

消费端限流

Rabbitmq提供了一种qos(服务质量保证的功能), 即在非自动确认消息的前提下, 如果一定数目的消息(通过基于consume或者channel设置qos的值)未被确认前, 不进行消费新的消息
关联配置(在消费端设置):
1.关闭autoACK
2.设置qos

  1. // 参数: 限制消费的消息大小(为0则不作限制), 限制一次消费的消息的数量, 限流策略是channel(true)还是consumer(false)
  2. channel.basicQos(0, 1, false);
  3. channel.basicConsume(queueName, true, new DefaultConsumer(channel) {...})

Ack与消息限流机制示例

Producer

  1. public class Producer {
  2. public static void main(String[] args) throws IOException, TimeoutException {
  3. ConnectionFactory connectionFactory = new ConnectionFactory();
  4. connectionFactory.setHost("139.9.62.232");
  5. connectionFactory.setPort(30004);
  6. connectionFactory.setVirtualHost("xinzhang");
  7. connectionFactory.setUsername("xinzhang");
  8. connectionFactory.setPassword("Xinzhang123");
  9. Connection connection = connectionFactory.newConnection();
  10. Channel channel = connection.createChannel();
  11. // 打开确认模式
  12. channel.confirmSelect();
  13. String exchangeName = "xztest_ack";
  14. channel.exchangeDeclare(exchangeName, "topic", true);
  15. String routingKey = "test.ack.save";
  16. String message = "发送非自动ack消息! 2020-03-01";
  17. for (int i = 0; i < 5; i++) {
  18. channel.basicPublish(exchangeName, routingKey, null, message.getBytes());
  19. System.out.println("发送消息 =====>" + message);
  20. }
  21. channel.close();
  22. connection.close();
  23. }
  24. }

Consumer

  1. public class Consumer {
  2. public static void main(String[] args) throws IOException, TimeoutException {
  3. ConnectionFactory connectionFactory = new ConnectionFactory();
  4. connectionFactory.setHost("139.9.62.232");
  5. connectionFactory.setPort(30004);
  6. connectionFactory.setVirtualHost("xinzhang");
  7. connectionFactory.setUsername("xinzhang");
  8. connectionFactory.setPassword("Xinzhang123");
  9. Connection connection = connectionFactory.newConnection();
  10. Channel channel = connection.createChannel();
  11. String exchangeName = "xztest_ack";
  12. channel.exchangeDeclare(exchangeName, "topic", true);
  13. String queueName = "xztest03";
  14. channel.queueDeclare(queueName, true, false, false, null);
  15. String routingKey = "test.#";
  16. channel.queueBind(queueName, exchangeName, routingKey);
  17. // 参数: 限制消费的消息大小(为0则不作限制), 限制一次消费的消息的数量, 限流策略是channel(true)还是consumer(false)
  18. channel.basicQos(0, 1, false);
  19. // 第二个参数, 关闭autoAck
  20. channel.basicConsume(queueName, false, new MyConsumer(channel));
  21. //这里关闭了代码就结束了, 回调线程也结束, 会看不到控制台输出
  22. // channel.close();
  23. // connection.close();
  24. }
  25. }

MyConsumer

  1. public class MyConsumer extends DefaultConsumer {
  2. private Channel channel;
  3. public MyConsumer(Channel channel) {
  4. super(channel);
  5. this.channel = channel;
  6. }
  7. @Override
  8. public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
  9. throws IOException {
  10. System.out.println("接收到消息: ====>" + new String(body));
  11. System.out.println("交换机以及路由为: " + envelope.getExchange() + "---" + envelope.getRoutingKey());
  12. channel.basicAck(envelope.getDeliveryTag(), false);
  13. }
  14. }

消息重回队列

在手动ack的情况下:

  • 当投递使用的通道(或连接)被关闭时,任何没有被应答的投递(消息)将自动的重新入队列。这包括客户端丢失TCP连接,消费者应用(处理)故障,以及通道级的协议异常
  • channel.basicNack()方法可以设置重回队列
    1. // 参数: deliveryTag, multiple, requeue
    2. channel.basicNack(envelope.getDeliveryTag(),false,true);

Rabbitmq实现延迟队列

参考: https://www.cnblogs.com/yinfengjiujian/p/9204600.html

  • 消息的TTL(Time To Live)

消息的TTL就是消息的存活时间。
RabbitMQ可以对队列和消息分别设置TTL。对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置。超过了这个时间,我们认为这个消息就死了,称之为死信。如果队列设置了,消息也设置了,那么会取小的。所以一个消息如果被路由到不同的队列中,这个消息死亡的时间有可能不一样(不同的队列设置).

  • 死信队列(DLX, Dead Letter Exchanges)

利用DLX, 当消息在一个队列中变成死信(dead message)之后, 它能被重新publish到另一个exchange, 这个exchange就是DLX.

消息变成死信的情况:

  • 消息被拒绝(basic.reject/basic.nack), 并且requeue=false
  • 消息TTL过期
  • 队列达到最大长度

死信队列/延迟消息示例

Producer

  1. public class Producer {
  2. public static void main(String[] args) throws IOException, TimeoutException {
  3. ConnectionFactory connectionFactory = new ConnectionFactory();
  4. connectionFactory.setHost("139.9.62.232");
  5. connectionFactory.setPort(30004);
  6. connectionFactory.setVirtualHost("xinzhang");
  7. connectionFactory.setUsername("xinzhang");
  8. connectionFactory.setPassword("Xinzhang123");
  9. Connection connection = connectionFactory.newConnection();
  10. Channel channel = connection.createChannel();
  11. String exchangeName = "xztest_dlx";
  12. channel.exchangeDeclare(exchangeName, "topic", true);
  13. String routingKey = "test.dlx.save";
  14. String message = "发送dlx消息到xztest_dlx, 消息将于10s过期! -->" + LocalDateTime.now();
  15. BasicProperties properties = new Builder().expiration("10000").build();
  16. channel.basicPublish(exchangeName, routingKey, properties, message.getBytes());
  17. System.out.println(message);
  18. channel.close();
  19. connection.close();
  20. }
  21. }

Consumer
代码逻辑说明: 交换机xztest_dlx绑定了xztest04(key=test.#, 过期时间为20秒), test05(key=already.*)
1.producer以路由test.dlx.save发送过期时间为10s的消息到交换机xztest_dlx;
2.xztest01队列没有消费者, 于是在10s秒后消息过期(消息过期时间10s<队列过期时间20s);
3.xztest04将消息转发到xztest_dlx, 路由为already.ttl;
4.xztest05接收到消息并消费, 测试结果如下:
—————————-
接收到dlx消息: 发送dlx消息到xztest_dlx, 消息将于10s过期! —>2020-03-01T16:49:50.598, 当前时间为: 2020-03-01T16:50:00.652
路由为: alread.ttl

  1. public class Consumer {
  2. public static void main(String[] args) throws IOException, TimeoutException {
  3. ConnectionFactory connectionFactory = new ConnectionFactory();
  4. connectionFactory.setHost("139.9.62.232");
  5. connectionFactory.setPort(30004);
  6. connectionFactory.setVirtualHost("xinzhang");
  7. connectionFactory.setUsername("xinzhang");
  8. connectionFactory.setPassword("Xinzhang123");
  9. Connection connection = connectionFactory.newConnection();
  10. Channel channel = connection.createChannel();
  11. String exchangeName = "xztest_dlx";
  12. channel.exchangeDeclare(exchangeName, "topic", true);
  13. String queueName = "xztest04";
  14. HashMap<String, Object> arguments = new HashMap<>(1);
  15. // 设置xztest04队列中消息的统一过期时间为20s, 若队列中消息过期, 会发送到xztest_dlx的路由为alread.ttl的队列中
  16. arguments.put("x-message-ttl", 20 * 1000);
  17. arguments.put("x-dead-letter-exchange", "xztest_dlx");
  18. arguments.put("x-dead-letter-routing-key", "already.ttl");
  19. channel.queueDeclare(queueName, true, false, false, arguments);
  20. String routingKey = "test.#";
  21. channel.queueBind(queueName, exchangeName, routingKey);
  22. channel.queueDeclare("xztest05", true, false, false, null);
  23. channel.queueBind("xztest05", exchangeName, "already.*");
  24. channel.basicConsume("xztest05", true, new DefaultConsumer(channel) {
  25. @Override
  26. public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
  27. throws IOException {
  28. System.out.println("接收到dlx消息: " + new String(body) + ", 当前时间为: " + LocalDateTime.now());
  29. System.out.println("路由为: " + envelope.getRoutingKey());
  30. }
  31. });
  32. //这里关闭了代码就结束了, 回调线程也结束, 会看不到控制台输出
  33. // channel.close();
  34. // connection.close();
  35. }
  36. }