1. RabbitMQ 高级
2. 消息的可靠投递(生产端)
RabbitMQ为我们提供了两种方式用来控制消息的投递可靠性模式
- confirm 确认模式 已经过时
- return 退回模式
rabbitmq 整个消息投递路径为
producer —-> rabbitmq broker —> exchange —-> queue —> consumer
- 消息从 producer 到 exchange 则会返回一个 confirmCallback
- 消息从 exchange —> queue 投递失败则会返回一个 returnCallback
利用这两个callback控制消息的可靠性投递
rabbitmq配置文件
<!-- 加载配置文件-->
<context:property-placeholder location="classpath:rabbitmq.properties"/>
<!-- 定义rabbitmq connectionFactory-->
<!-- returns开启回退模式设置为 publisher-returns="true" -->
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"
publisher-returns="true"
/>
<!-- 消息可靠性投递(生产端) -->
<rabbit:queue id="test_queue_confirm" name="test_queue_confirm"/>
<rabbit:direct-exchange name="test_exchange_confirm">
<rabbit:bindings>
<rabbit:binding queue="test_queue_confirm" key="confirm"/>
</rabbit:bindings>
</rabbit:direct-exchange>
<!-- 定义rabbitTemplate对象操作可以在代码中方便发送消息-->
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
test
/*
回退模式 :当消息发送给Exchange后,Exchange路由到Queue失败是 才会执行 ReturnCallBack
*/
@Test
public void testReturn(){
//设置交换机处理失败消息的模式 必须开启之后 失败发送失败才会回调
rabbitTemplate.setMandatory(true);
//设置returncallback
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
System.out.println("return 执行了");
System.out.println(returnedMessage.getMessage()); //消息对象
System.out.println(returnedMessage.getReplyCode()); //错误码
System.out.println(returnedMessage.getReplyText()); //错误信息
System.out.println(returnedMessage.getExchange()); //交换机
System.out.println(returnedMessage.getRoutingKey()); //路由键
}
});
//给不存在频道中的key发送消息
rabbitTemplate.convertAndSend("test_exchange_confirm","confirm111","hello callback");
}
3. Consumer Ack(消费端)
ack指 Acknowledge 确认 表示消费端收到消息后的确认方式
有三种确认方式
- 默认为自动确认 acknowledge=”none”
- 手动确认 acknowledge=”manual”
- 根据异常情况确认 acknowledge=”auto”
rabbitmq配置文件 消费端
<!--spring扫描监听器类 -->
<context:component-scan base-package="com.itheima.listener"/>
<!-- 定义监听器容器-->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual">
<rabbit:listener ref="ackListener" queue-names="test_queue_confirm" />
</rabbit:listener-container>
监听类
package com.itheima.listener;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* Consumer ACK机制
* 设置手动签收 acknowledge="manual"
* 让监听器类实现 ChannelAwareMessageListener 接口 实现 onMessage方法
*/
@Component
public class AckListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
//接受信息
System.out.println(new String(message.getBody()));
//业务逻辑
System.out.println("处理");
//手动签收
channel.basicAck(deliveryTag, true);
} catch (IOException e) {
// e.printStackTrace();
//拒绝签收 basicNack允许多条消息 第三个参数requeue 设置为true则重新回到队列中
channel.basicNack(deliveryTag, true, true);
// channel.basicReject(deliveryTag, true);
}
}
}
4. 消费端限流(消费端)
rabbitmq配置文件
<!--spring扫描监听器类 -->
<context:component-scan base-package="com.itheima.listener"/>
<!-- 定义监听器容器-->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1">
<!-- <rabbit:listener ref="ackListener" queue-names="test_queue_confirm" />-->
<rabbit:listener ref="qosListener" queue-names="test_queue_confirm" />
</rabbit:listener-container>
限流监听类
package com.itheima.listener;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
/**
* 限流机制
* acr机制设置为手动确认
* listener-container配置属性 perfetch = 1 表示消费端每次从mq拉取一条消息 直到手动确认消费完毕后才去拉取下一条消息
*/
@Component
public class QosListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
//获取消息
System.out.println(new String(message.getBody()));
//处理业务逻辑
System.out.println("业务逻辑");
//签收 直到手动确认消费完毕后才去拉取下一条消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
}
}
5. TTL(生产端)
TTL 为存活时间 / 过期时间
当消息到达存活时间后 还没有被消费 会被自动清除
RabbitMQ可以对消息设置过期时间 也可以对整个队列设置过期时间
生产端配置文件
<!-- ttl-->
<rabbit:queue name="test_spring_queue_ttl" auto-declare="true">
<rabbit:queue-arguments>
<entry key="x-message-ttl" value-type="long" value="5000"/>
</rabbit:queue-arguments>
</rabbit:queue>
<rabbit:topic-exchange name="test_exchange_ttl">
<rabbit:bindings>
<rabbit:binding pattern="ttl.#" queue="test_spring_queue_ttl"/>
</rabbit:bindings>
</rabbit:topic-exchange>
test发送消息
@Test
public void testTtl() {
//ttl发送消息
rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.eee", "hello ttl", new MessagePostProcessor() {
//消息后处理对象 设置一些消息的参数信息
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//1.设置message的消息
message.getMessageProperties().setExpiration("5000");//消息的过期时间 毫秒
return message; //如果设置了队列过期时间和消息的过期时间 则以时间短的为准
//队列过期后会将该队列的所有消息清空
//消息过期后.只有消息在队列顶端,才会判断其是否过期
}
});
}
6. 死信队列
DLX(Dead Letter Exchange) 死信交换机 当消息成为 Dead message后可以被重新发送到另外一台交换机 那么这个交换机就是DLX
消息成为死信的情况
- 队列消息长度到达限制
- 消费者拒接消费消息,basicNack/basicReject 并且不把消息重新放入原目标队列.requeue=false
- 原队列存在消费过期设置 消息到达超时时间未被消费
定义正常队列和死信队列
<!-- 死信队列 -->
<!-- 1.声明正常的队列(test_queue_dlx)和交换机(test_exchange_dlx)-->
<rabbit:queue id="test_queue_dlx" name="test_queue_dlx">
<!-- 3.正常队列绑定死信交换机-->
<rabbit:queue-arguments>
<!-- - x-dead-letter-exchange:死信交换机名称-->
<entry key="x-dead-letter-exchange" value="exchange_dlx"/>
<!-- - x-dead-letter-routing-key 死信交换机的routingkey-->
<entry key="x-dead-letter-routing-key" value="dlx.hehe"/>
<!-- 设置队列过期时间-->
<entry key="x-message-ttl" value="5000" value-type="java.lang.Integer"/>
<!-- 设置队列长度-->
<entry key="x-max-length" value="10" value-type="java.lang.Integer"/>
</rabbit:queue-arguments>
</rabbit:queue>
<rabbit:topic-exchange name="test_exchange_dlx">
<rabbit:bindings>
<rabbit:binding pattern="test.dlx.#" queue="test_queue_dlx"/>
</rabbit:bindings>
</rabbit:topic-exchange>
<!-- 2.声明死信的队列(queue.dlx)和交换机(exchange_dxl)-->
<rabbit:queue id="queue_dlx" name="queue_dlx"/>
<rabbit:topic-exchange name="exchange_dlx">
<rabbit:bindings>
<rabbit:binding pattern="dlx.#" queue="queue_dlx"/>
</rabbit:bindings>
</rabbit:topic-exchange>
test
@Test
public void testDlx() {
//1.测试过期时间
// rabbitTemplate.convertAndSend("test_exchange_dlx", "test.dlx.haha", "dlx");
//2.超过队列长度 消息死信
// for (int i = 0; i < 20; i++) {
// rabbitTemplate.convertAndSend("test_exchange_dlx", "test.dlx.haha", "dlx" + i);
// }
//3.消费者拒接签收消息 并设置不重回队列中
rabbitTemplate.convertAndSend("test_exchange_dlx", "test.dlx.haha", "dlx");
}
消费者拒接签收消息 监听配置
<context:component-scan base-package="com.itheima.listener"/>
<!-- 定义监听器容器-->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1">
<!-- <rabbit:listener ref="ackListener" queue-names="test_queue_confirm" />-->
<!-- <rabbit:listener ref="qosListener" queue-names="test_queue_confirm" />-->
<!-- 监听正常的队列-->
<rabbit:listener ref="dlxListener" queue-names="test_queue_dlx"/>
</rabbit:listener-container>
监听类
package com.itheima.listener;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class DlxListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
//接受信息
System.out.println(new String(message.getBody()));
//业务逻辑
System.out.println("处理");
int i = 3 / 0;// 出现异常
//手动签收
channel.basicAck(deliveryTag, true);
} catch (Exception e) {
// e.printStackTrace();
System.out.println("拒绝签收");
//拒绝签收 basicNack允许多条消息 第三个参数requeue 设置为true则重新回到队列中 此处是死信队列所以我们不重回队列中
channel.basicNack(deliveryTag, true, false);
// channel.basicReject(deliveryTag, true);
}
}
}
7. 延迟队列
延迟队列 即消息进入队列后不会立即被消费 只有到达指定时间后 才会被消费
rabbitmq中并没有提供延迟队列功能 但是我们可以通过使用 TTL+死信队列 组合实现延迟队列的效果
- 定义正常队列和死信队列 并且设置TTL过去时间
<!-- 延迟队列 通过TTL+死信队列实现-->
<!-- 定义正常队列 和交换机-->
<rabbit:queue id="order_queue" name="order_queue">
<!-- 绑定死信队列 和设置TTL过期时间-->
<rabbit:queue-arguments>
<entry key="x-dead-letter-exchange" value="order_exchange_dlx"/>
<entry key="x-dead-letter-routing-key" value="dlx.order.cancel"/>
<entry key="x-message-ttl" value="5000" value-type="java.lang.Integer"/>
</rabbit:queue-arguments>
</rabbit:queue>
<rabbit:topic-exchange name="order_exchange">
<rabbit:bindings>
<rabbit:binding pattern="order.#" queue="order_queue"/>
</rabbit:bindings>
</rabbit:topic-exchange>
<!-- 死信队列-->
<rabbit:queue id="order_queue_dlx" name="order_queue_dlx">
</rabbit:queue>
<rabbit:topic-exchange name="order_exchange_dlx">
<rabbit:bindings>
<rabbit:binding pattern="dlx.order.#" queue="order_queue_dlx"/>
</rabbit:bindings>
</rabbit:topic-exchange>
- 发送消息
@Test
public void testDelay(){
rabbitTemplate.convertAndSend("order_exchange","order.msg","延迟队列");
}
- 监听配置
<!--spring扫描监听器类 -->
<context:component-scan base-package="com.itheima.listener"/>
<!-- 定义监听器容器-->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1">
<!-- <rabbit:listener ref="ackListener" queue-names="test_queue_confirm" />-->
<!-- <rabbit:listener ref="qosListener" queue-names="test_queue_confirm" />-->
<!-- 监听正常的队列-->
<!-- <rabbit:listener ref="dlxListener" queue-names="test_queue_dlx"/>-->
<!-- 延迟队列 监听的是TTL过期后的死信队列-->
<rabbit:listener ref="orderListener" queue-names="order_queue_dlx"/>
</rabbit:listener-container>
- 监听类
package com.itheima.listener;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class OrderListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
//接受信息
System.out.println(new String(message.getBody()));
//业务逻辑
System.out.println("业务 处理");
//手动签收
channel.basicAck(deliveryTag, true);
} catch (IOException e) {
// e.printStackTrace();
//拒绝签收 basicNack允许多条消息 第三个参数requeue 设置为true则重新回到队列中
channel.basicNack(deliveryTag, true, true);
// channel.basicReject(deliveryTag, true);
}
}
8. 日志监控
RabbitMQ默认日志存放路径 /var/log/rabbitmq/rabbit@xxx.log
8.1. 消息追踪
开启firehose后 默认路由会将消息重新发送一遍 并且包含消息从哪里传递等具体信息打包过去队列中
网页插件版
9. 消息补偿
10. 消息幂等性保障
幂等性指一次和多次请求某一个资源,对于资源本身应该具有同样的结果 也就是说其 任意多次执行对资源所产生的影响与一次执行的影响相同
在MQ中指 消费多条相同的消息 与消费一条消息得到结果一致
11. 集群搭建
停止rabbitmq服务
rabbitmqctl stop
启动第一个节点 此处是单机器 多端口搭建伪集群 集群用ip区分即可
RABBITMQ_NODE_PORT=5673 RABBITMQ_NODENAME=rabbit1 rabbitmq-server start
启动第二个节点 因为默认web插件端口被占用所以也要设置web插口端口
RABBITMQ_NODE_PORT=5674 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15674}]" RABBITMQ_NODENAME=rabbit2 rabbitmq-server start
设置rabbit1为主节点
rabbitmqctl -n rabbit1 stop_app
rabbitmqctl -n rabbit1 reset
rabbitmqctl -n rabbit1 start_app
设置rabbit2为从节点
rabbitmqctl -n rabbit2 stop_app
rabbitmqctl -n rabbit2 reset
rabbitmqctl -n rabbit2 join_cluster rabbit1@'dubbo' #@''后面为系统用户名 需要自己更改
rabbitmqctl -n rabbit2 start_app
11.1. 镜像队列
从节点默认是从主节点中获取数据 我们可以通过镜像队列来将每个节点同存放想相同数据
命令方式
rabbitmqctl set_policy my_ha "^"'{"ha-mode":"all"}'
网页方式
11.2. 负载均衡-HAProxy
https://blog.csdn.net/William0318/article/details/99677701
安装
yum install haproxy -y
haproxy -v
配置文件
vim /etc/haproxy/haproxy.cfg
编辑内容
#全局配置
global
#设置日志
log 127.0.0.1 local0 info
#当前工作目录
chroot /usr/local/haproxy
#用户与用户组
user haproxy
group haproxy
#运行进程ID
uid 99
gid 99
#守护进程启动
daemon
#最大连接数
maxconn 4096
#默认配置
defaults
#应用全局的日志配置
log global
#默认的模式mode {tcp|http|health}
#TCP是4层,HTTP是7层,health只返回OK
mode tcp
#日志类别tcplog
option tcplog
#不记录健康检查日志信息
option dontlognull
#3次失败则认为服务不可用
retries 3
#每个进程可用的最大连接数
maxconn 2000
#连接超时
timeout connect 5s
#客户端超时
timeout client 120s
#服务端超时
timeout server 120s
#绑定配置
listen rabbitmq_cluster
bind 0.0.0.0:5672
#配置TCP模式
mode tcp
#简单的轮询
balance roundrobin
#RabbitMQ集群节点配置
server rmq_node1 127.0.0.1:5673 check inter 5000 rise 2 fall 3 weight 1
server rmq_node2 127.0.0.1:5674 check inter 5000 rise 2 fall 3 weight 1
#haproxy监控页面地址
listen monitor
bind 0.0.0.0:8100
mode http
option httplog
stats enable
stats uri /stats
stats refresh 5s
检查配置文件是否错误
haproxy -f /etc/haproxy/haproxy.cfg -c
启动
haproxy -f /etc/haproxy/haproxy.cfg -d
访问HAProxy后台
http://192.168.130.124:8100/stats
后续将消息队列的地址和ip设置为HAProxy监听的地址即可
spring:
#消息队列配置
rabbitmq:
host: 192.168.0.104 #HAProxy的地址
port: 5672
username: guest
password: guest
virtual-host: /
publisher-returns: true
publisher-confirms: true
connection-timeout: 5000ms