1. RabbitMQ 高级

2. 消息的可靠投递(生产端)

RabbitMQ为我们提供了两种方式用来控制消息的投递可靠性模式

  • confirm 确认模式 已经过时
  • return 退回模式

rabbitmq 整个消息投递路径为

producer —-> rabbitmq broker —> exchange —-> queue —> consumer

  • 消息从 producer 到 exchange 则会返回一个 confirmCallback
  • 消息从 exchange —> queue 投递失败则会返回一个 returnCallback

利用这两个callback控制消息的可靠性投递

rabbitmq配置文件

  1. <!-- 加载配置文件-->
  2. <context:property-placeholder location="classpath:rabbitmq.properties"/>
  3. <!-- 定义rabbitmq connectionFactory-->
  4. <!-- returns开启回退模式设置为 publisher-returns="true" -->
  5. <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
  6. port="${rabbitmq.port}"
  7. username="${rabbitmq.username}"
  8. password="${rabbitmq.password}"
  9. virtual-host="${rabbitmq.virtual-host}"
  10. publisher-returns="true"
  11. />
  12. <!-- 消息可靠性投递(生产端) -->
  13. <rabbit:queue id="test_queue_confirm" name="test_queue_confirm"/>
  14. <rabbit:direct-exchange name="test_exchange_confirm">
  15. <rabbit:bindings>
  16. <rabbit:binding queue="test_queue_confirm" key="confirm"/>
  17. </rabbit:bindings>
  18. </rabbit:direct-exchange>
  19. <!-- 定义rabbitTemplate对象操作可以在代码中方便发送消息-->
  20. <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>

test

  1. /*
  2. 回退模式 :当消息发送给Exchange后,Exchange路由到Queue失败是 才会执行 ReturnCallBack
  3. */
  4. @Test
  5. public void testReturn(){
  6. //设置交换机处理失败消息的模式 必须开启之后 失败发送失败才会回调
  7. rabbitTemplate.setMandatory(true);
  8. //设置returncallback
  9. rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
  10. @Override
  11. public void returnedMessage(ReturnedMessage returnedMessage) {
  12. System.out.println("return 执行了");
  13. System.out.println(returnedMessage.getMessage()); //消息对象
  14. System.out.println(returnedMessage.getReplyCode()); //错误码
  15. System.out.println(returnedMessage.getReplyText()); //错误信息
  16. System.out.println(returnedMessage.getExchange()); //交换机
  17. System.out.println(returnedMessage.getRoutingKey()); //路由键
  18. }
  19. });
  20. //给不存在频道中的key发送消息
  21. rabbitTemplate.convertAndSend("test_exchange_confirm","confirm111","hello callback");
  22. }

3. Consumer Ack(消费端)

ack指 Acknowledge 确认 表示消费端收到消息后的确认方式

有三种确认方式

  • 默认为自动确认 acknowledge=”none”
  • 手动确认 acknowledge=”manual”
  • 根据异常情况确认 acknowledge=”auto”

rabbitmq配置文件 消费端

  1. <!--spring扫描监听器类 -->
  2. <context:component-scan base-package="com.itheima.listener"/>
  3. <!-- 定义监听器容器-->
  4. <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual">
  5. <rabbit:listener ref="ackListener" queue-names="test_queue_confirm" />
  6. </rabbit:listener-container>

监听类

  1. package com.itheima.listener;
  2. import com.rabbitmq.client.Channel;
  3. import org.springframework.amqp.core.Message;
  4. import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
  5. import org.springframework.stereotype.Component;
  6. import java.io.IOException;
  7. /**
  8. * Consumer ACK机制
  9. * 设置手动签收 acknowledge="manual"
  10. * 让监听器类实现 ChannelAwareMessageListener 接口 实现 onMessage方法
  11. */
  12. @Component
  13. public class AckListener implements ChannelAwareMessageListener {
  14. @Override
  15. public void onMessage(Message message, Channel channel) throws Exception {
  16. long deliveryTag = message.getMessageProperties().getDeliveryTag();
  17. try {
  18. //接受信息
  19. System.out.println(new String(message.getBody()));
  20. //业务逻辑
  21. System.out.println("处理");
  22. //手动签收
  23. channel.basicAck(deliveryTag, true);
  24. } catch (IOException e) {
  25. // e.printStackTrace();
  26. //拒绝签收 basicNack允许多条消息 第三个参数requeue 设置为true则重新回到队列中
  27. channel.basicNack(deliveryTag, true, true);
  28. // channel.basicReject(deliveryTag, true);
  29. }
  30. }
  31. }

4. 消费端限流(消费端)

rabbitmq配置文件

  1. <!--spring扫描监听器类 -->
  2. <context:component-scan base-package="com.itheima.listener"/>
  3. <!-- 定义监听器容器-->
  4. <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1">
  5. <!-- <rabbit:listener ref="ackListener" queue-names="test_queue_confirm" />-->
  6. <rabbit:listener ref="qosListener" queue-names="test_queue_confirm" />
  7. </rabbit:listener-container>

限流监听类

  1. package com.itheima.listener;
  2. import com.rabbitmq.client.Channel;
  3. import org.springframework.amqp.core.Message;
  4. import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
  5. import org.springframework.stereotype.Component;
  6. /**
  7. * 限流机制
  8. * acr机制设置为手动确认
  9. * listener-container配置属性 perfetch = 1 表示消费端每次从mq拉取一条消息 直到手动确认消费完毕后才去拉取下一条消息
  10. */
  11. @Component
  12. public class QosListener implements ChannelAwareMessageListener {
  13. @Override
  14. public void onMessage(Message message, Channel channel) throws Exception {
  15. //获取消息
  16. System.out.println(new String(message.getBody()));
  17. //处理业务逻辑
  18. System.out.println("业务逻辑");
  19. //签收 直到手动确认消费完毕后才去拉取下一条消息
  20. channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
  21. }
  22. }

5. TTL(生产端)

TTL 为存活时间 / 过期时间

当消息到达存活时间后 还没有被消费 会被自动清除

RabbitMQ可以对消息设置过期时间 也可以对整个队列设置过期时间

生产端配置文件

  1. <!-- ttl-->
  2. <rabbit:queue name="test_spring_queue_ttl" auto-declare="true">
  3. <rabbit:queue-arguments>
  4. <entry key="x-message-ttl" value-type="long" value="5000"/>
  5. </rabbit:queue-arguments>
  6. </rabbit:queue>
  7. <rabbit:topic-exchange name="test_exchange_ttl">
  8. <rabbit:bindings>
  9. <rabbit:binding pattern="ttl.#" queue="test_spring_queue_ttl"/>
  10. </rabbit:bindings>
  11. </rabbit:topic-exchange>

test发送消息

  1. @Test
  2. public void testTtl() {
  3. //ttl发送消息
  4. rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.eee", "hello ttl", new MessagePostProcessor() {
  5. //消息后处理对象 设置一些消息的参数信息
  6. @Override
  7. public Message postProcessMessage(Message message) throws AmqpException {
  8. //1.设置message的消息
  9. message.getMessageProperties().setExpiration("5000");//消息的过期时间 毫秒
  10. return message; //如果设置了队列过期时间和消息的过期时间 则以时间短的为准
  11. //队列过期后会将该队列的所有消息清空
  12. //消息过期后.只有消息在队列顶端,才会判断其是否过期
  13. }
  14. });
  15. }

6. 死信队列

DLX(Dead Letter Exchange) 死信交换机 当消息成为 Dead message后可以被重新发送到另外一台交换机 那么这个交换机就是DLX

15. RabbitMQ 高级 - 图1

消息成为死信的情况

  1. 队列消息长度到达限制
  2. 消费者拒接消费消息,basicNack/basicReject 并且不把消息重新放入原目标队列.requeue=false
  3. 原队列存在消费过期设置 消息到达超时时间未被消费

定义正常队列和死信队列

  1. <!-- 死信队列 -->
  2. <!-- 1.声明正常的队列(test_queue_dlx)和交换机(test_exchange_dlx)-->
  3. <rabbit:queue id="test_queue_dlx" name="test_queue_dlx">
  4. <!-- 3.正常队列绑定死信交换机-->
  5. <rabbit:queue-arguments>
  6. <!-- - x-dead-letter-exchange:死信交换机名称-->
  7. <entry key="x-dead-letter-exchange" value="exchange_dlx"/>
  8. <!-- - x-dead-letter-routing-key 死信交换机的routingkey-->
  9. <entry key="x-dead-letter-routing-key" value="dlx.hehe"/>
  10. <!-- 设置队列过期时间-->
  11. <entry key="x-message-ttl" value="5000" value-type="java.lang.Integer"/>
  12. <!-- 设置队列长度-->
  13. <entry key="x-max-length" value="10" value-type="java.lang.Integer"/>
  14. </rabbit:queue-arguments>
  15. </rabbit:queue>
  16. <rabbit:topic-exchange name="test_exchange_dlx">
  17. <rabbit:bindings>
  18. <rabbit:binding pattern="test.dlx.#" queue="test_queue_dlx"/>
  19. </rabbit:bindings>
  20. </rabbit:topic-exchange>
  21. <!-- 2.声明死信的队列(queue.dlx)和交换机(exchange_dxl)-->
  22. <rabbit:queue id="queue_dlx" name="queue_dlx"/>
  23. <rabbit:topic-exchange name="exchange_dlx">
  24. <rabbit:bindings>
  25. <rabbit:binding pattern="dlx.#" queue="queue_dlx"/>
  26. </rabbit:bindings>
  27. </rabbit:topic-exchange>

test

  1. @Test
  2. public void testDlx() {
  3. //1.测试过期时间
  4. // rabbitTemplate.convertAndSend("test_exchange_dlx", "test.dlx.haha", "dlx");
  5. //2.超过队列长度 消息死信
  6. // for (int i = 0; i < 20; i++) {
  7. // rabbitTemplate.convertAndSend("test_exchange_dlx", "test.dlx.haha", "dlx" + i);
  8. // }
  9. //3.消费者拒接签收消息 并设置不重回队列中
  10. rabbitTemplate.convertAndSend("test_exchange_dlx", "test.dlx.haha", "dlx");
  11. }

消费者拒接签收消息 监听配置

  1. <context:component-scan base-package="com.itheima.listener"/>
  2. <!-- 定义监听器容器-->
  3. <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1">
  4. <!-- <rabbit:listener ref="ackListener" queue-names="test_queue_confirm" />-->
  5. <!-- <rabbit:listener ref="qosListener" queue-names="test_queue_confirm" />-->
  6. <!-- 监听正常的队列-->
  7. <rabbit:listener ref="dlxListener" queue-names="test_queue_dlx"/>
  8. </rabbit:listener-container>

监听类

  1. package com.itheima.listener;
  2. import com.rabbitmq.client.Channel;
  3. import org.springframework.amqp.core.Message;
  4. import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
  5. import org.springframework.stereotype.Component;
  6. import java.io.IOException;
  7. @Component
  8. public class DlxListener implements ChannelAwareMessageListener {
  9. @Override
  10. public void onMessage(Message message, Channel channel) throws Exception {
  11. long deliveryTag = message.getMessageProperties().getDeliveryTag();
  12. try {
  13. //接受信息
  14. System.out.println(new String(message.getBody()));
  15. //业务逻辑
  16. System.out.println("处理");
  17. int i = 3 / 0;// 出现异常
  18. //手动签收
  19. channel.basicAck(deliveryTag, true);
  20. } catch (Exception e) {
  21. // e.printStackTrace();
  22. System.out.println("拒绝签收");
  23. //拒绝签收 basicNack允许多条消息 第三个参数requeue 设置为true则重新回到队列中 此处是死信队列所以我们不重回队列中
  24. channel.basicNack(deliveryTag, true, false);
  25. // channel.basicReject(deliveryTag, true);
  26. }
  27. }
  28. }

7. 延迟队列

延迟队列 即消息进入队列后不会立即被消费 只有到达指定时间后 才会被消费

15. RabbitMQ 高级 - 图2

rabbitmq中并没有提供延迟队列功能 但是我们可以通过使用 TTL+死信队列 组合实现延迟队列的效果

15. RabbitMQ 高级 - 图3

  1. 定义正常队列和死信队列 并且设置TTL过去时间
  1. <!-- 延迟队列 通过TTL+死信队列实现-->
  2. <!-- 定义正常队列 和交换机-->
  3. <rabbit:queue id="order_queue" name="order_queue">
  4. <!-- 绑定死信队列 和设置TTL过期时间-->
  5. <rabbit:queue-arguments>
  6. <entry key="x-dead-letter-exchange" value="order_exchange_dlx"/>
  7. <entry key="x-dead-letter-routing-key" value="dlx.order.cancel"/>
  8. <entry key="x-message-ttl" value="5000" value-type="java.lang.Integer"/>
  9. </rabbit:queue-arguments>
  10. </rabbit:queue>
  11. <rabbit:topic-exchange name="order_exchange">
  12. <rabbit:bindings>
  13. <rabbit:binding pattern="order.#" queue="order_queue"/>
  14. </rabbit:bindings>
  15. </rabbit:topic-exchange>
  16. <!-- 死信队列-->
  17. <rabbit:queue id="order_queue_dlx" name="order_queue_dlx">
  18. </rabbit:queue>
  19. <rabbit:topic-exchange name="order_exchange_dlx">
  20. <rabbit:bindings>
  21. <rabbit:binding pattern="dlx.order.#" queue="order_queue_dlx"/>
  22. </rabbit:bindings>
  23. </rabbit:topic-exchange>
  1. 发送消息
  1. @Test
  2. public void testDelay(){
  3. rabbitTemplate.convertAndSend("order_exchange","order.msg","延迟队列");
  4. }
  1. 监听配置
  1. <!--spring扫描监听器类 -->
  2. <context:component-scan base-package="com.itheima.listener"/>
  3. <!-- 定义监听器容器-->
  4. <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1">
  5. <!-- <rabbit:listener ref="ackListener" queue-names="test_queue_confirm" />-->
  6. <!-- <rabbit:listener ref="qosListener" queue-names="test_queue_confirm" />-->
  7. <!-- 监听正常的队列-->
  8. <!-- <rabbit:listener ref="dlxListener" queue-names="test_queue_dlx"/>-->
  9. <!-- 延迟队列 监听的是TTL过期后的死信队列-->
  10. <rabbit:listener ref="orderListener" queue-names="order_queue_dlx"/>
  11. </rabbit:listener-container>
  1. 监听类
  1. package com.itheima.listener;
  2. import com.rabbitmq.client.Channel;
  3. import org.springframework.amqp.core.Message;
  4. import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
  5. import org.springframework.stereotype.Component;
  6. import java.io.IOException;
  7. @Component
  8. public class OrderListener implements ChannelAwareMessageListener {
  9. @Override
  10. public void onMessage(Message message, Channel channel) throws Exception {
  11. long deliveryTag = message.getMessageProperties().getDeliveryTag();
  12. try {
  13. //接受信息
  14. System.out.println(new String(message.getBody()));
  15. //业务逻辑
  16. System.out.println("业务 处理");
  17. //手动签收
  18. channel.basicAck(deliveryTag, true);
  19. } catch (IOException e) {
  20. // e.printStackTrace();
  21. //拒绝签收 basicNack允许多条消息 第三个参数requeue 设置为true则重新回到队列中
  22. channel.basicNack(deliveryTag, true, true);
  23. // channel.basicReject(deliveryTag, true);
  24. }
  25. }

8. 日志监控

RabbitMQ默认日志存放路径 /var/log/rabbitmq/rabbit@xxx.log

15. RabbitMQ 高级 - 图4

8.1. 消息追踪

开启firehose后 默认路由会将消息重新发送一遍 并且包含消息从哪里传递等具体信息打包过去队列中

15. RabbitMQ 高级 - 图5

网页插件版

15. RabbitMQ 高级 - 图6

9. 消息补偿

15. RabbitMQ 高级 - 图7

10. 消息幂等性保障

幂等性指一次和多次请求某一个资源,对于资源本身应该具有同样的结果 也就是说其 任意多次执行对资源所产生的影响与一次执行的影响相同

在MQ中指 消费多条相同的消息 与消费一条消息得到结果一致

15. RabbitMQ 高级 - 图8

11. 集群搭建

停止rabbitmq服务

  1. rabbitmqctl stop

启动第一个节点 此处是单机器 多端口搭建伪集群 集群用ip区分即可

  1. RABBITMQ_NODE_PORT=5673 RABBITMQ_NODENAME=rabbit1 rabbitmq-server start

启动第二个节点 因为默认web插件端口被占用所以也要设置web插口端口

  1. RABBITMQ_NODE_PORT=5674 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15674}]" RABBITMQ_NODENAME=rabbit2 rabbitmq-server start

设置rabbit1为主节点

  1. rabbitmqctl -n rabbit1 stop_app
  2. rabbitmqctl -n rabbit1 reset
  3. rabbitmqctl -n rabbit1 start_app

设置rabbit2为从节点

  1. rabbitmqctl -n rabbit2 stop_app
  2. rabbitmqctl -n rabbit2 reset
  3. rabbitmqctl -n rabbit2 join_cluster rabbit1@'dubbo' #@''后面为系统用户名 需要自己更改
  4. rabbitmqctl -n rabbit2 start_app

11.1. 镜像队列

从节点默认是从主节点中获取数据 我们可以通过镜像队列来将每个节点同存放想相同数据

命令方式

  1. rabbitmqctl set_policy my_ha "^"'{"ha-mode":"all"}'

网页方式

15. RabbitMQ 高级 - 图9

11.2. 负载均衡-HAProxy

https://blog.csdn.net/William0318/article/details/99677701

安装

  1. yum install haproxy -y
  2. haproxy -v

配置文件

  1. vim /etc/haproxy/haproxy.cfg

编辑内容

  1. #全局配置
  2. global
  3. #设置日志
  4. log 127.0.0.1 local0 info
  5. #当前工作目录
  6. chroot /usr/local/haproxy
  7. #用户与用户组
  8. user haproxy
  9. group haproxy
  10. #运行进程ID
  11. uid 99
  12. gid 99
  13. #守护进程启动
  14. daemon
  15. #最大连接数
  16. maxconn 4096
  17. #默认配置
  18. defaults
  19. #应用全局的日志配置
  20. log global
  21. #默认的模式mode {tcp|http|health}
  22. #TCP是4层,HTTP是7层,health只返回OK
  23. mode tcp
  24. #日志类别tcplog
  25. option tcplog
  26. #不记录健康检查日志信息
  27. option dontlognull
  28. #3次失败则认为服务不可用
  29. retries 3
  30. #每个进程可用的最大连接数
  31. maxconn 2000
  32. #连接超时
  33. timeout connect 5s
  34. #客户端超时
  35. timeout client 120s
  36. #服务端超时
  37. timeout server 120s
  38. #绑定配置
  39. listen rabbitmq_cluster
  40. bind 0.0.0.0:5672
  41. #配置TCP模式
  42. mode tcp
  43. #简单的轮询
  44. balance roundrobin
  45. #RabbitMQ集群节点配置
  46. server rmq_node1 127.0.0.1:5673 check inter 5000 rise 2 fall 3 weight 1
  47. server rmq_node2 127.0.0.1:5674 check inter 5000 rise 2 fall 3 weight 1
  48. #haproxy监控页面地址
  49. listen monitor
  50. bind 0.0.0.0:8100
  51. mode http
  52. option httplog
  53. stats enable
  54. stats uri /stats
  55. stats refresh 5s

检查配置文件是否错误

  1. haproxy -f /etc/haproxy/haproxy.cfg -c

启动

  1. haproxy -f /etc/haproxy/haproxy.cfg -d

访问HAProxy后台

http://192.168.130.124:8100/stats

后续将消息队列的地址和ip设置为HAProxy监听的地址即可

  1. spring:
  2. #消息队列配置
  3. rabbitmq:
  4. host: 192.168.0.104 #HAProxy的地址
  5. port: 5672
  6. username: guest
  7. password: guest
  8. virtual-host: /
  9. publisher-returns: true
  10. publisher-confirms: true
  11. connection-timeout: 5000ms