1. RabbitMQ 高级
2. 消息的可靠投递(生产端)
RabbitMQ为我们提供了两种方式用来控制消息的投递可靠性模式
- confirm 确认模式 已经过时
- return 退回模式
rabbitmq 整个消息投递路径为
producer —-> rabbitmq broker —> exchange —-> queue —> consumer
- 消息从 producer 到 exchange 则会返回一个 confirmCallback
- 消息从 exchange —> queue 投递失败则会返回一个 returnCallback
利用这两个callback控制消息的可靠性投递
rabbitmq配置文件
<!-- 加载配置文件--><context:property-placeholder location="classpath:rabbitmq.properties"/><!-- 定义rabbitmq connectionFactory--><!-- returns开启回退模式设置为 publisher-returns="true" --><rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"port="${rabbitmq.port}"username="${rabbitmq.username}"password="${rabbitmq.password}"virtual-host="${rabbitmq.virtual-host}"publisher-returns="true"/><!-- 消息可靠性投递(生产端) --><rabbit:queue id="test_queue_confirm" name="test_queue_confirm"/><rabbit:direct-exchange name="test_exchange_confirm"><rabbit:bindings><rabbit:binding queue="test_queue_confirm" key="confirm"/></rabbit:bindings></rabbit:direct-exchange><!-- 定义rabbitTemplate对象操作可以在代码中方便发送消息--><rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
test
/*回退模式 :当消息发送给Exchange后,Exchange路由到Queue失败是 才会执行 ReturnCallBack*/@Testpublic void testReturn(){//设置交换机处理失败消息的模式 必须开启之后 失败发送失败才会回调rabbitTemplate.setMandatory(true);//设置returncallbackrabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {System.out.println("return 执行了");System.out.println(returnedMessage.getMessage()); //消息对象System.out.println(returnedMessage.getReplyCode()); //错误码System.out.println(returnedMessage.getReplyText()); //错误信息System.out.println(returnedMessage.getExchange()); //交换机System.out.println(returnedMessage.getRoutingKey()); //路由键}});//给不存在频道中的key发送消息rabbitTemplate.convertAndSend("test_exchange_confirm","confirm111","hello callback");}
3. Consumer Ack(消费端)
ack指 Acknowledge 确认 表示消费端收到消息后的确认方式
有三种确认方式
- 默认为自动确认 acknowledge=”none”
- 手动确认 acknowledge=”manual”
- 根据异常情况确认 acknowledge=”auto”
rabbitmq配置文件 消费端
<!--spring扫描监听器类 --><context:component-scan base-package="com.itheima.listener"/><!-- 定义监听器容器--><rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual"><rabbit:listener ref="ackListener" queue-names="test_queue_confirm" /></rabbit:listener-container>
监听类
package com.itheima.listener;import com.rabbitmq.client.Channel;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;import org.springframework.stereotype.Component;import java.io.IOException;/*** Consumer ACK机制* 设置手动签收 acknowledge="manual"* 让监听器类实现 ChannelAwareMessageListener 接口 实现 onMessage方法*/@Componentpublic class AckListener implements ChannelAwareMessageListener {@Overridepublic void onMessage(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {//接受信息System.out.println(new String(message.getBody()));//业务逻辑System.out.println("处理");//手动签收channel.basicAck(deliveryTag, true);} catch (IOException e) {// e.printStackTrace();//拒绝签收 basicNack允许多条消息 第三个参数requeue 设置为true则重新回到队列中channel.basicNack(deliveryTag, true, true);// channel.basicReject(deliveryTag, true);}}}
4. 消费端限流(消费端)
rabbitmq配置文件
<!--spring扫描监听器类 --><context:component-scan base-package="com.itheima.listener"/><!-- 定义监听器容器--><rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1"><!-- <rabbit:listener ref="ackListener" queue-names="test_queue_confirm" />--><rabbit:listener ref="qosListener" queue-names="test_queue_confirm" /></rabbit:listener-container>
限流监听类
package com.itheima.listener;import com.rabbitmq.client.Channel;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;import org.springframework.stereotype.Component;/*** 限流机制* acr机制设置为手动确认* listener-container配置属性 perfetch = 1 表示消费端每次从mq拉取一条消息 直到手动确认消费完毕后才去拉取下一条消息*/@Componentpublic class QosListener implements ChannelAwareMessageListener {@Overridepublic void onMessage(Message message, Channel channel) throws Exception {//获取消息System.out.println(new String(message.getBody()));//处理业务逻辑System.out.println("业务逻辑");//签收 直到手动确认消费完毕后才去拉取下一条消息channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);}}
5. TTL(生产端)
TTL 为存活时间 / 过期时间
当消息到达存活时间后 还没有被消费 会被自动清除
RabbitMQ可以对消息设置过期时间 也可以对整个队列设置过期时间
生产端配置文件
<!-- ttl--><rabbit:queue name="test_spring_queue_ttl" auto-declare="true"><rabbit:queue-arguments><entry key="x-message-ttl" value-type="long" value="5000"/></rabbit:queue-arguments></rabbit:queue><rabbit:topic-exchange name="test_exchange_ttl"><rabbit:bindings><rabbit:binding pattern="ttl.#" queue="test_spring_queue_ttl"/></rabbit:bindings></rabbit:topic-exchange>
test发送消息
@Testpublic void testTtl() {//ttl发送消息rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.eee", "hello ttl", new MessagePostProcessor() {//消息后处理对象 设置一些消息的参数信息@Overridepublic Message postProcessMessage(Message message) throws AmqpException {//1.设置message的消息message.getMessageProperties().setExpiration("5000");//消息的过期时间 毫秒return message; //如果设置了队列过期时间和消息的过期时间 则以时间短的为准//队列过期后会将该队列的所有消息清空//消息过期后.只有消息在队列顶端,才会判断其是否过期}});}
6. 死信队列
DLX(Dead Letter Exchange) 死信交换机 当消息成为 Dead message后可以被重新发送到另外一台交换机 那么这个交换机就是DLX

消息成为死信的情况
- 队列消息长度到达限制
- 消费者拒接消费消息,basicNack/basicReject 并且不把消息重新放入原目标队列.requeue=false
- 原队列存在消费过期设置 消息到达超时时间未被消费
定义正常队列和死信队列
<!-- 死信队列 --><!-- 1.声明正常的队列(test_queue_dlx)和交换机(test_exchange_dlx)--><rabbit:queue id="test_queue_dlx" name="test_queue_dlx"><!-- 3.正常队列绑定死信交换机--><rabbit:queue-arguments><!-- - x-dead-letter-exchange:死信交换机名称--><entry key="x-dead-letter-exchange" value="exchange_dlx"/><!-- - x-dead-letter-routing-key 死信交换机的routingkey--><entry key="x-dead-letter-routing-key" value="dlx.hehe"/><!-- 设置队列过期时间--><entry key="x-message-ttl" value="5000" value-type="java.lang.Integer"/><!-- 设置队列长度--><entry key="x-max-length" value="10" value-type="java.lang.Integer"/></rabbit:queue-arguments></rabbit:queue><rabbit:topic-exchange name="test_exchange_dlx"><rabbit:bindings><rabbit:binding pattern="test.dlx.#" queue="test_queue_dlx"/></rabbit:bindings></rabbit:topic-exchange><!-- 2.声明死信的队列(queue.dlx)和交换机(exchange_dxl)--><rabbit:queue id="queue_dlx" name="queue_dlx"/><rabbit:topic-exchange name="exchange_dlx"><rabbit:bindings><rabbit:binding pattern="dlx.#" queue="queue_dlx"/></rabbit:bindings></rabbit:topic-exchange>
test
@Testpublic void testDlx() {//1.测试过期时间// rabbitTemplate.convertAndSend("test_exchange_dlx", "test.dlx.haha", "dlx");//2.超过队列长度 消息死信// for (int i = 0; i < 20; i++) {// rabbitTemplate.convertAndSend("test_exchange_dlx", "test.dlx.haha", "dlx" + i);// }//3.消费者拒接签收消息 并设置不重回队列中rabbitTemplate.convertAndSend("test_exchange_dlx", "test.dlx.haha", "dlx");}
消费者拒接签收消息 监听配置
<context:component-scan base-package="com.itheima.listener"/><!-- 定义监听器容器--><rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1"><!-- <rabbit:listener ref="ackListener" queue-names="test_queue_confirm" />--><!-- <rabbit:listener ref="qosListener" queue-names="test_queue_confirm" />--><!-- 监听正常的队列--><rabbit:listener ref="dlxListener" queue-names="test_queue_dlx"/></rabbit:listener-container>
监听类
package com.itheima.listener;import com.rabbitmq.client.Channel;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;import org.springframework.stereotype.Component;import java.io.IOException;@Componentpublic class DlxListener implements ChannelAwareMessageListener {@Overridepublic void onMessage(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {//接受信息System.out.println(new String(message.getBody()));//业务逻辑System.out.println("处理");int i = 3 / 0;// 出现异常//手动签收channel.basicAck(deliveryTag, true);} catch (Exception e) {// e.printStackTrace();System.out.println("拒绝签收");//拒绝签收 basicNack允许多条消息 第三个参数requeue 设置为true则重新回到队列中 此处是死信队列所以我们不重回队列中channel.basicNack(deliveryTag, true, false);// channel.basicReject(deliveryTag, true);}}}
7. 延迟队列
延迟队列 即消息进入队列后不会立即被消费 只有到达指定时间后 才会被消费

rabbitmq中并没有提供延迟队列功能 但是我们可以通过使用 TTL+死信队列 组合实现延迟队列的效果

- 定义正常队列和死信队列 并且设置TTL过去时间
<!-- 延迟队列 通过TTL+死信队列实现--><!-- 定义正常队列 和交换机--><rabbit:queue id="order_queue" name="order_queue"><!-- 绑定死信队列 和设置TTL过期时间--><rabbit:queue-arguments><entry key="x-dead-letter-exchange" value="order_exchange_dlx"/><entry key="x-dead-letter-routing-key" value="dlx.order.cancel"/><entry key="x-message-ttl" value="5000" value-type="java.lang.Integer"/></rabbit:queue-arguments></rabbit:queue><rabbit:topic-exchange name="order_exchange"><rabbit:bindings><rabbit:binding pattern="order.#" queue="order_queue"/></rabbit:bindings></rabbit:topic-exchange><!-- 死信队列--><rabbit:queue id="order_queue_dlx" name="order_queue_dlx"></rabbit:queue><rabbit:topic-exchange name="order_exchange_dlx"><rabbit:bindings><rabbit:binding pattern="dlx.order.#" queue="order_queue_dlx"/></rabbit:bindings></rabbit:topic-exchange>
- 发送消息
@Testpublic void testDelay(){rabbitTemplate.convertAndSend("order_exchange","order.msg","延迟队列");}
- 监听配置
<!--spring扫描监听器类 --><context:component-scan base-package="com.itheima.listener"/><!-- 定义监听器容器--><rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1"><!-- <rabbit:listener ref="ackListener" queue-names="test_queue_confirm" />--><!-- <rabbit:listener ref="qosListener" queue-names="test_queue_confirm" />--><!-- 监听正常的队列--><!-- <rabbit:listener ref="dlxListener" queue-names="test_queue_dlx"/>--><!-- 延迟队列 监听的是TTL过期后的死信队列--><rabbit:listener ref="orderListener" queue-names="order_queue_dlx"/></rabbit:listener-container>
- 监听类
package com.itheima.listener;import com.rabbitmq.client.Channel;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;import org.springframework.stereotype.Component;import java.io.IOException;@Componentpublic class OrderListener implements ChannelAwareMessageListener {@Overridepublic void onMessage(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {//接受信息System.out.println(new String(message.getBody()));//业务逻辑System.out.println("业务 处理");//手动签收channel.basicAck(deliveryTag, true);} catch (IOException e) {// e.printStackTrace();//拒绝签收 basicNack允许多条消息 第三个参数requeue 设置为true则重新回到队列中channel.basicNack(deliveryTag, true, true);// channel.basicReject(deliveryTag, true);}}
8. 日志监控
RabbitMQ默认日志存放路径 /var/log/rabbitmq/rabbit@xxx.log

8.1. 消息追踪
开启firehose后 默认路由会将消息重新发送一遍 并且包含消息从哪里传递等具体信息打包过去队列中

网页插件版

9. 消息补偿

10. 消息幂等性保障
幂等性指一次和多次请求某一个资源,对于资源本身应该具有同样的结果 也就是说其 任意多次执行对资源所产生的影响与一次执行的影响相同
在MQ中指 消费多条相同的消息 与消费一条消息得到结果一致

11. 集群搭建
停止rabbitmq服务
rabbitmqctl stop
启动第一个节点 此处是单机器 多端口搭建伪集群 集群用ip区分即可
RABBITMQ_NODE_PORT=5673 RABBITMQ_NODENAME=rabbit1 rabbitmq-server start
启动第二个节点 因为默认web插件端口被占用所以也要设置web插口端口
RABBITMQ_NODE_PORT=5674 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15674}]" RABBITMQ_NODENAME=rabbit2 rabbitmq-server start
设置rabbit1为主节点
rabbitmqctl -n rabbit1 stop_apprabbitmqctl -n rabbit1 resetrabbitmqctl -n rabbit1 start_app
设置rabbit2为从节点
rabbitmqctl -n rabbit2 stop_apprabbitmqctl -n rabbit2 resetrabbitmqctl -n rabbit2 join_cluster rabbit1@'dubbo' #@''后面为系统用户名 需要自己更改rabbitmqctl -n rabbit2 start_app
11.1. 镜像队列
从节点默认是从主节点中获取数据 我们可以通过镜像队列来将每个节点同存放想相同数据
命令方式
rabbitmqctl set_policy my_ha "^"'{"ha-mode":"all"}'
网页方式

11.2. 负载均衡-HAProxy
https://blog.csdn.net/William0318/article/details/99677701
安装
yum install haproxy -yhaproxy -v
配置文件
vim /etc/haproxy/haproxy.cfg
编辑内容
#全局配置global#设置日志log 127.0.0.1 local0 info#当前工作目录chroot /usr/local/haproxy#用户与用户组user haproxygroup haproxy#运行进程IDuid 99gid 99#守护进程启动daemon#最大连接数maxconn 4096#默认配置defaults#应用全局的日志配置log global#默认的模式mode {tcp|http|health}#TCP是4层,HTTP是7层,health只返回OKmode tcp#日志类别tcplogoption tcplog#不记录健康检查日志信息option dontlognull#3次失败则认为服务不可用retries 3#每个进程可用的最大连接数maxconn 2000#连接超时timeout connect 5s#客户端超时timeout client 120s#服务端超时timeout server 120s#绑定配置listen rabbitmq_clusterbind 0.0.0.0:5672#配置TCP模式mode tcp#简单的轮询balance roundrobin#RabbitMQ集群节点配置server rmq_node1 127.0.0.1:5673 check inter 5000 rise 2 fall 3 weight 1server rmq_node2 127.0.0.1:5674 check inter 5000 rise 2 fall 3 weight 1#haproxy监控页面地址listen monitorbind 0.0.0.0:8100mode httpoption httplogstats enablestats uri /statsstats refresh 5s
检查配置文件是否错误
haproxy -f /etc/haproxy/haproxy.cfg -c
启动
haproxy -f /etc/haproxy/haproxy.cfg -d
访问HAProxy后台
http://192.168.130.124:8100/stats
后续将消息队列的地址和ip设置为HAProxy监听的地址即可
spring:#消息队列配置rabbitmq:host: 192.168.0.104 #HAProxy的地址port: 5672username: guestpassword: guestvirtual-host: /publisher-returns: truepublisher-confirms: trueconnection-timeout: 5000ms
