消息队列中间件,是分布式系统中的重要组件
主要解决,异步处理,应用解耦,流量削峰等问题 从而实现高性能,高可用,可伸缩和最终一致性的架构
使用较多的消息队列产品:RabbitMQ,RocketMQ,ActiveMQ,ZeroMQ,Kafka等
异步处理
用户注册后,需要发送验证邮箱和手机验证码; 将注册信息写入数据库,发送验证邮件,发送手机,三个步骤全部完成后,返回给客户端
应用解耦
如果库存系统坏了,传统架构下订单系统也会出问题。耦合太高了。
使用MQ以后,下单的时候,库存系统不能正常运行,也不会影响下单,因为下单后,订单系统写入消息队 列就不再关心其他的后续操作了,实现了订单系统和库存系统的应用解耦。
流量削峰
用户的请求,服务器接收后,首先写入消息队列,如果超过队列的长度,就抛弃,甩一个秒杀结束 的页面
背景知识
AMQP
即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议
协议:数据在传输的过程中必须要遵守的规则
基于此协议的客户端可以与消息中间件传递消息并不受产品、开发语言等条件的限制
JMS
Java Message Server,Java消息服务应用程序接口, 一种规范,和JDBC担任的角色类似
是一个Java平台中关于面向消息中间件的API,用于在两个应用程序之间,或分布式系统中发送消 息,进行异步通信
关系
JMS是定义了统一接口,统一消息操作;AMQP通过协议统一数据交互格式 JMS必须是java语言;AMQP只是协议,与语言无关
为什么用RabbitMQ
有强大的WEB管理页面
强大的社区支持,为技术进步提供动力
支持消息持久化、支持消息确认机制、灵活的任务分发机制等,支持功能非常丰富
集群扩展很容易,并且可以通过增加节点实现成倍的性能提升
总结:如果你希望使用一个可靠性高、功能强大、易于管理的消息队列系统那么就选择RabbitMQ,如果你想用一个性能高,但偶尔丢点数据不是很在乎可以使用kafka或者zeroMQ
kafka和zeroMQ的性能爆表,绝对可以压RabbitMQ一头!
RabbitMQ组件的功能
Broker:消息队列服务器实体
Virtual Host:虚拟主机 标识一批交换机、消息队列和相关对象,形成的整体
虚拟主机是共享相同的身份认证和加密环境的独立服务器域 每个vhost本质上就是一个mini版的RabbitMQ服务器,拥有自己的队列、交换器、绑定和权限机制 vhost是AMQP概念的基础,RabbitMQ默认的vhost是 /,必须在链接时指定
Exchange:交换器(路由) 用来接收生产者发送的消息并将这些消息路由给服务器中的队列
Queue:消息队列。用来保存消息直到发送给消费者。 它是消息的容器,也是消息的终点。 一个消息可投入一个或多个队列。 消息一直在队列里面,等待消费者连接到这个队列将其取走。
Banding:绑定,用于消息队列和交换机之间的关联。
Channel:通道(信道)
多路复用连接中的一条独立的双向数据流通道。 信道是建立在真实的TCP连接内的虚拟链接
AMQP命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,都是通过信道完成的
因为对于操作系统来说,建立和销毁TCP连接都是非常昂贵的开销,所以引入了信道的概念,用来复用TCP连接。
Connection:网络连接,比如一个TCP连接。
Publisher:消息的生产者,也是一个向交换器发布消息的客户端应用程序。
Consumer:消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。
Message:消息
消息是不具名的,它是由消息头和消息体组成。 消息体是不透明的,而消息头则是由一系列的可选属性组成,这些属性包括routing-key(路由-键)、priority(优先级)、delivery-mode(消息可能需要持久性存储[消息的路由模式])等。
启动后台管理插件
rabbitmq-plugins enable rabbitmq_management
启动MQ相关命令
[root@localhost opt]# systemctl start rabbitmq-server.service
[root@localhost opt]# systemctl status rabbitmq-server.service
[root@localhost opt]# systemctl restart rabbitmq-server.service
[root@localhost opt]# systemctl stop rabbitmq-server.service
查看进程
[root@localhost opt]# ps -ef | grep rabbitmq
默认端口15672
创建远程连接角色
[root@localhost opt]# rabbitmqctl add_user laosun 123456
[root@localhost opt]# rabbitmqctl set_user_tags laosun administrator
[root@localhost opt]# rabbitmqctl set_permissions -p "/" laosun ".*" ".*" ".*"
查看用户列表和改密码
[root@localhost opt]# rabbitmqctl list_users
[root@localhost opt]# rabbitmqctl change_password laosun 123123
界面简介
Java连接RabbitMQ
public class ConnectionUtil {
public static Connection getConnection() throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.42.131");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/lagou");
connectionFactory.setUsername("jining");
connectionFactory.setPassword("980909");
Connection connection = connectionFactory.newConnection();
return connection;
}
public static void main(String[] args) throws Exception {
Connection connection = getConnection();
System.out.println(connection);
connection.close();
}
}
消息模式
RabbitMQ提供了6种消息模型,但是第6种其实是RPC,并不是MQ,因此我们只学习前5种
1和2属于点对点 3、4、5属于发布订阅模式(一对多)
点对点模式:P2P(point to point)模式包含三个角色:消息队列(queue),发送者(sender),接收者(receiver)
每个消息发送到一个特定的队列中,接收者从中获得消息
队列中保留这些消息,直到他们被消费或超时
特点:
- 每个消息只有一个消费者,一旦消费,消息就不在队列中了
- 发送者和接收者之间没有依赖性,发送者发送完成,不管接收者是否运行,都不会影响消息发送到队列中(我给你发微信,不管你看不看手机,反正我发完了)
- 接收者成功接收消息之后需向对象应答成功(确认)
如果希望发送的每个消息都会被成功处理,那需要P2P
发布订阅模式:publish(Pub)/subscribe(Sub)
pub/sub模式包含三个角色:交换机(exchange),发布者(publisher),订阅者 (subcriber) 多个发布者将消息发送交换机,系统将这些消息传递给多个订阅者
特点:
- 每个消息可以有多个订阅者
- 发布者和订阅者之间在时间上有依赖,对于某个交换机的订阅者,必须创建一个订阅
后,才能消费发布者的消息 - 为了消费消息,订阅者必须保持运行状态;类似于看电视直播。
如果希望发送的消息被多个消费者处理,可采用本模式
简单模式
public class MessageSender {
public static void main(String[] args) throws Exception {
String msg = "Jining said 'Hello, Rabbit MQ'";
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//参数1 队列名 2 是否持久化 3 是否排外(能否给别的队列用) 4 是否自动删除 5 队列参数
channel.queueDeclare("queue1", false, false, false, null);
//1 交换机名称 简单模式没有交换机。 2 目标队列名称 3 设置消息属性 4 消息内容
channel.basicPublish("", "queue1", null, msg.getBytes(StandardCharsets.UTF_8));
System.out.println("发送了" + msg);
channel.close();
}
}
public class MessageReceiver {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//1 收件人信息 2 信封(包裹上的快递标签) 3 协议配置 4
System.out.println(new String(body));
}
};
channel.basicConsume("queue1", true, defaultConsumer);
}
}
消息确认机制
修改成手动确认
public class ReceiverACK {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//1 收件人信息 2 信封(包裹上的快递标签) 3 协议配置 4
System.out.println(new String(body));
// 1 收件人信息 2 是否同时确认多个消息
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume("queue1", false, defaultConsumer);
}
}
主要就是最后一行。
工作队列模式
发送者
public class Sender {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//参数1 队列名 2 是否持久化 3 是否排外(能否给别的队列用) 4 是否自动删除 5 队列参数
channel.queueDeclare("test_work_queue", false, false, false, null);
//1 交换机名称 简单模式没有交换机。 2 目标队列名称 3 设置消息属性 4 消息内容
for (int i=0; i<100; i++) {
String msg = "羊肉串 ---> " + i;
channel.basicPublish("", "test_work_queue", null, msg.getBytes(StandardCharsets.UTF_8));
System.out.println("新鲜出炉" + msg);
}
channel.close();
}
}
接收者1
public class Receiver1 {
private static int i = 1;
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//队列不存在则创建,存在则获取
channel.queueDeclare("test_work_queue", false, false, false, null);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//1 收件人信息 2 信封(包裹上的快递标签) 3 协议配置 4
String msg = new String(body);
System.out.printf("顾客1吃掉了" + msg + ", 总共吃%d串\n", i++);
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 1 收件人信息 2 是否同时确认多个消息
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume("test_work_queue", false, defaultConsumer);
}
}
接收者2
public class Receiver2 {
private static int i = 1;
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("test_work_queue", false, false, false, null);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//1 收件人信息 2 信封(包裹上的快递标签) 3 协议配置 4
String msg = new String(body);
System.out.printf("顾客2吃掉了" + msg + ", 总共吃%d串\n", i++);
try {
Thread.sleep(900);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 1 收件人信息 2 是否同时确认多个消息
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume("test_work_queue", false, defaultConsumer);
}
}
问题与解决
俩人消费的速度不一样可消费的消息数是一样的,1先消费完,无所事事的等2,这肯定不行。
官网给了解决方法
channel.basicQos(1);
这句话的意思就是 不要给队列一次发送1条以上的信息。换句话说,消息没确认之前不要发下一条信息。
这必须配合手动确认机制。
发布订阅模式
P是生产者,X是交换机,红色的是队列。
举例来说,P是up主,X是哔站,红色的是人。
整个过程,必须先创立路由。但是路由没有储存消息的能力。
运行时的顺序
先生产者 创建路由
再消费者 绑定队列
最后再生产者 发送具体消息
代码
public class Sender {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//定义路由 1 名 2 模式。 fanout 不处理路由键(只需要把队列绑定到路由上,消息就会自动转发到队列就行了)
channel.exchangeDeclare("test_exchange_fanout", "fanout");
//1 交换机名称 简单模式没有交换机。 2 目标队列名称 3 设置消息属性 4 消息内容
String msg = "hello everyone";
channel.basicPublish("test_exchange_fanout", "", null, msg.getBytes(StandardCharsets.UTF_8));
System.out.println("生产者: " + msg);
channel.close();
}
}
public class Receiver1 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare("test_exchange_fanout_queue1", false, false, false, null);
//绑定路由
channel.queueBind("test_exchange_fanout_queue1", "test_exchange_fanout", "");
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body);
System.out.println("消费者1: " + msg);
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume("test_exchange_fanout_queue1", false, defaultConsumer);
}
}
Receiver就是把队列换成2,改一下数字即可。
路由模式
direct就是定向。
public class Sender {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//direct 根据路由键定向分发消息
channel.exchangeDeclare("test_exchange_direct", "direct");
//1 交换机名称 简单模式没有交换机。 2 目标队列名称 3 设置消息属性 4 消息内容
String msg = "用户注册,[userid=s101]";
channel.basicPublish("test_exchange_direct", "select", null, msg.getBytes(StandardCharsets.UTF_8));
System.out.println("生产者: " + msg);
channel.close();
}
}
public class Receiver1 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare("test_exchange_direct_queue1", false, false, false, null);
//绑定路由
channel.queueBind("test_exchange_direct_queue1", "test_exchange_direct", "insert");
channel.queueBind("test_exchange_direct_queue1", "test_exchange_direct", "update");
channel.queueBind("test_exchange_direct_queue1", "test_exchange_direct", "delete");
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body);
System.out.println("消费者1: " + msg);
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume("test_exchange_direct_queue1", false, defaultConsumer);
}
}
public class Receiver2 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare("test_exchange_direct_queue2", false, false, false, null);
//绑定路由
channel.queueBind("test_exchange_direct_queue2", "test_exchange_direct", "select");
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body);
System.out.println("消费者2: " + msg);
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume("test_exchange_direct_queue2", false, defaultConsumer);
}
}
通配符模式
和定向基本相同,就是路由键可以模糊匹配。
public class Sender {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//direct 根据路由键定向分发消息
channel.exchangeDeclare("test_exchange_topic", "topic");
//1 交换机名称 简单模式没有交换机。 2 目标队列名称 3 设置消息属性 4 消息内容
String msg = "用户注册,[userid=s101]";
channel.basicPublish("test_exchange_topic", "product.price", null, msg.getBytes(StandardCharsets.UTF_8));
System.out.println("生产者: " + msg);
channel.close();
}
}
public class Receiver1 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare("test_exchange_topic_queue1", false, false, false, null);
//绑定路由
channel.queueBind("test_exchange_topic_queue1", "test_exchange_topic", "user.#");
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body);
System.out.println("消费者1: " + msg);
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume("test_exchange_topic_queue1", false, defaultConsumer);
}
}
public class Receiver2 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare("test_exchange_topic_queue2", false, false, false, null);
//绑定路由
channel.queueBind("test_exchange_topic_queue2", "test_exchange_topic", "product.#");
channel.queueBind("test_exchange_topic_queue2", "test_exchange_topic", "order.#");
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body);
System.out.println("消费者2: " + msg);
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume("test_exchange_topic_queue2", false, defaultConsumer);
}
}
持久化
想要将消息持久化,那么路由和队列都要持久化才可以。
public class Sender {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//direct 根据路由键定向分发消息
channel.exchangeDeclare("test_exchange_topic", "topic", true);
//1 交换机名称 简单模式没有交换机。 2 目标队列名称 3 设置消息属性 4 消息内容
String msg = "用户注册,[userid=s101]";
channel.basicPublish("test_exchange_topic", "product.price",
MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes(StandardCharsets.UTF_8));
System.out.println("生产者: " + msg);
channel.close();
}
}
public class Receiver1 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare("test_exchange_topic_queue1", true, false, false, null);
//绑定路由
channel.queueBind("test_exchange_topic_queue1", "test_exchange_topic", "user.#");
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body);
System.out.println("消费者1: " + msg);
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume("test_exchange_topic_queue1", false, defaultConsumer);
}
}
public class Receiver2 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare("test_exchange_topic_queue2", true, false, false, null);
//绑定路由
channel.queueBind("test_exchange_topic_queue2", "test_exchange_topic", "product.#");
channel.queueBind("test_exchange_topic_queue2", "test_exchange_topic", "order.#");
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body);
System.out.println("消费者2: " + msg);
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume("test_exchange_topic_queue2", false, defaultConsumer);
}
}
Spring整合
其实用的最多的就是最后的通配符模式。
记一下依赖和配置文件吧。
生产者
依赖
<dependencies>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.0.1.RELEASE</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.9</version>
</dependency>
</dependencies>
配置文件
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<rabbit:connection-factory
id="connectionFactory"
host="192.168.42.131"
port="5672"
username="jining"
password="980909"
virtual-host="/lagou"
/>
<!-- 2.配置队列 -->
<rabbit:queue name="test_spring_queue_1"/>
<!-- 3.配置rabbitAdmin:主要用于在Java代码中对队列进行管理,用于创建、绑定、删除队列与交换机,发送消息等 -->
<rabbit:admin connection-factory="connectionFactory"/>
<!-- 4.配置topic类型exchange;队列绑定到交换机 -->
<rabbit:topic-exchange name="spring-topic-exchange">
<rabbit:bindings>
<rabbit:binding queue="test_spring_queue_1" pattern="msg.#"/>
</rabbit:bindings>
</rabbit:topic-exchange>
<!-- 5. 配置消息对象json转换类 -->
<bean id="jsonMessageConverter"
class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"/>
<!-- 6. 配置RabbitTemplate(消息生产者) -->
<rabbit:template id="rabbitTemplate"
connection-factory="connectionFactory"
exchange="spring-topic-exchange"
message-converter="jsonMessageConverter"/>
</beans>
public class Sender {
public static void main(String[] args) {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/spring-rabbitmq-producer.xml");
RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
HashMap<String, String> map = new HashMap<>();
map.put("name", "alice");
map.put("email", "alice@gmail.com");
rabbitTemplate.convertAndSend("msg.user", map);
context.close();
}
}
消费者
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">
<rabbit:connection-factory
id="connectionFactory"
host="192.168.42.131"
port="5672"
username="jining"
password="980909"
virtual-host="/lagou"
/>
<!-- 2.配置队列 -->
<rabbit:queue name="test_spring_queue_1"/>
<!-- 3.配置rabbitAdmin:主要用于在Java代码中对队列进行管理,用于创建、绑定、删除队列与交换机,发送消息等 -->
<rabbit:admin connection-factory="connectionFactory"/>
<!-- 注册bean-->
<context:component-scan base-package="listener"/>
<!-- 配置监听-->
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener ref="consumer" queue-names="test_spring_queue_1"/>
</rabbit:listener-container>
</beans>
@Component
public class Consumer implements MessageListener {
private static final ObjectMapper MAPPER = new ObjectMapper();
@Override
public void onMessage(Message message) {
try {
JsonNode jsonNode = MAPPER.readTree(message.getBody());
String name = jsonNode.get("name").asText();
String email = jsonNode.get("email").asText();
System.out.println("队列中的消息: " + name + "的邮箱是" + email);
} catch (IOException e) {
e.printStackTrace();
}
}
}
消息成功确认机制
如何保证消息成功发布?
事务、发布确认机制。
实际使用中主要是用发布确认机制,发布确认机制对性能影响比较小。
事务
AMQP协议提供的一种保证消息成功投递的方式,通过信道开启 transactional 模式 并利用信道 的三个方法来实现以事务方式发送消息,若发送失败,通过异常处理回滚事务,确保消息成功投递
channel.txSelect(): 开启事务
channel.txCommit() :提交事务
channel.txRollback() :回滚事务
当然,在spring里,这些都封装好了。
发布确认
比如我们一次发送十条消息,到第九条失败了,前八条都白瞎了。
发布确认则是失败什么重新补发什么。
配置文件,其实就改了几行,一是factory那里,二是模板那里多了bean
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<rabbit:connection-factory
id="connectionFactory"
host="192.168.42.131"
port="5672"
username="jining"
password="980909"
virtual-host="/lagou"
publisher-confirms="true"
/>
<!-- 2.配置队列 -->
<rabbit:queue name="test_spring_queue_1"/>
<!-- 3.配置rabbitAdmin:主要用于在Java代码中对队列进行管理,用于创建、绑定、删除队列与交换机,发送消息等 -->
<rabbit:admin connection-factory="connectionFactory"/>
<!-- 4.配置topic类型exchange;队列绑定到交换机 -->
<rabbit:topic-exchange name="spring-topic-exchange">
<rabbit:bindings>
<rabbit:binding queue="test_spring_queue_1" pattern="msg.#"/>
</rabbit:bindings>
</rabbit:topic-exchange>
<!-- 5. 配置消息对象json转换类 -->
<bean id="jsonMessageConverter"
class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"/>
<!-- 6. 配置RabbitTemplate(消息生产者) -->
<rabbit:template id="rabbitTemplate"
connection-factory="connectionFactory"
exchange="spring-topic-exchange"
message-converter="jsonMessageConverter"
confirm-callback="msgSendConfirmCallBack"
/>
<bean id="msgSendConfirmCallBack" class="confirm.MsgSendConfirmCallBack"/>
</beans>
消费端限流
实现
首先修改配置文件(消费端)
<!-- 配置监听 prefetch是一次放几条消息, 确认是手动-->
<rabbit:listener-container connection-factory="connectionFactory" prefetch="2" acknowledge="manual">
<rabbit:listener ref="consumer" queue-names="test_spring_queue_1"/>
</rabbit:listener-container>
再改代码
@Component
public class Consumer extends AbstractAdaptableMessageListener {
private static final ObjectMapper MAPPER = new ObjectMapper();
@Override
public void onMessage(Message message, Channel channel) throws Exception {
try {
// String str = new String(message.getBody());
// 将message对象转换成json
JsonNode jsonNode = MAPPER.readTree(message.getBody());
String name = jsonNode.get("name").asText();
String email = jsonNode.get("email").asText();
System.out.println("从队列中获取:【"+name+"的邮箱是:"+email+"】");
long deliveryTag =
message.getMessageProperties().getDeliveryTag();
//确认收到(参数1,参数2)
/*
参数1:RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递
增的正整数,delivery_tag 的范围仅限于 Channel
参数2:为了减少网络流量,手动确认可以被批处理,当该参数为 true 时,则可以
一次性确认 delivery_tag 小于等于传入值的所有消息
*/
channel.basicAck(deliveryTag , true);
Thread.sleep(3000);
System.out.println("休息三秒然后再接收消息");
} catch (Exception e){
e.printStackTrace();
}
}
}
过期时间
消息可以设置过期时间,过期后加入死信队列。
分类两种设置,一种是设置队列ttl,一种是设置消息ttl。第二种太细了,一般不用。
设置队列ttl
在生产者端进行修改。
<rabbit:queue name="test_spring_queue_ttl" auto-declare="true">
<rabbit:queue-arguments>
<entry key="x-message-ttl" value-type="long" value="5000"/>
</rabbit:queue-arguments>
</rabbit:queue>
<rabbit:admin connection-factory="connectionFactory"/>
<!-- 4.配置topic类型exchange;队列绑定到交换机 -->
<rabbit:topic-exchange name="spring-topic-exchange">
<rabbit:bindings>
<rabbit:binding queue="test_spring_queue_ttl" pattern="msg.#"/>
</rabbit:bindings>
</rabbit:topic-exchange>
测试之前删除已有的队列,之后运行生产者即可看到效果。
设置消息ttl
首先把队列的消息ttl相关配置还原
<!-- 2.配置队列 -->
<rabbit:queue name="test_spring_queue_ttl2"/>
<!-- 3.配置rabbitAdmin:主要用于在Java代码中对队列进行管理,用于创建、绑定、删除队列与交换机,发送消息等 -->
<rabbit:admin connection-factory="connectionFactory"/>
<!-- 4.配置topic类型exchange;队列绑定到交换机 -->
<rabbit:topic-exchange name="spring-topic-exchange">
<rabbit:bindings>
<rabbit:binding queue="test_spring_queue_ttl2" pattern="msg.#"/>
</rabbit:bindings>
</rabbit:topic-exchange>
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import java.nio.charset.StandardCharsets;
public class Sender2 {
public static void main(String[] args) {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/spring-rabbitmq-producer.xml");
RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
MessageProperties messageProperties = new MessageProperties();
messageProperties.setExpiration("6000");
Message message = new Message("测试".getBytes(StandardCharsets.UTF_8), messageProperties);
rabbitTemplate.convertAndSend("msg.user", message);
context.close();
}
}
一定要注意代码里引用的包。<br />另外,如果队列配置和消息配置都存在的情况下,会采用时间短的那个。
死信队列
DLX(Dead Letter Exchanges)死信交换机/死信邮箱,当消息在队列中由于某些原因没有被及时 消费而变成死信(dead message)后,这些消息就会被分发到DLX交换机中。
而绑定DLX交换机 的队列,称之为:“死信队列”
消息没有被及时消费的原因:
消息被拒绝(basic.reject/ basic.nack)并且不再重新投递 requeue=false
消息超时未消费
达到最大队列长度
核心就是修改配置文件,这里创建了一个新的。
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<rabbit:connection-factory
id="connectionFactory"
host="192.168.42.131"
port="5672"
username="jining"
password="980909"
virtual-host="/lagou"
publisher-confirms="true"
/>
<!-- 3.配置rabbitAdmin:主要用于在Java代码中对队列进行管理,用于创建、绑定、删除队列与交换机,发送消息等 -->
<rabbit:admin connection-factory="connectionFactory"/>
<!-- 6. 配置RabbitTemplate(消息生产者) -->
<rabbit:template id="rabbitTemplate"
connection-factory="connectionFactory"
exchange="my_exchange"
/>
<!-- dlx config begin-->
<!-- 定义死信队列-->
<rabbit:queue name="dlx_queue"/>
<!--定向死信交换机-->
<rabbit:direct-exchange name="dlx_exchange" >
<rabbit:bindings>
<rabbit:binding key="dlx_ttl" queue="dlx_queue"></rabbit:binding>
<rabbit:binding key="dlx_max" queue="dlx_queue"></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>
<!-- 测试消息交换机-->
<rabbit:direct-exchange name="my_exchange" >
<rabbit:bindings>
<rabbit:binding key="dlx_ttl" queue="test_ttl_queue">
</rabbit:binding>
<rabbit:binding key="dlx_max" queue="test_max_queue">
</rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>
<rabbit:queue name="test_ttl_queue">
<rabbit:queue-arguments>
<!--队列ttl为6秒-->
<entry key="x-message-ttl" value-type="long" value="6000"/>
<!--超时 消息 投递给 死信交换机-->
<entry key="x-dead-letter-exchange" value="dlx_exchange"/>
</rabbit:queue-arguments>
</rabbit:queue>
<!--测试超长度的队列-->
<rabbit:queue name="test_max_queue">
<rabbit:queue-arguments>
<!--队列ttl为6秒-->
<entry key="x-max-length" value-type="long" value="2"/>
<!--超时 消息 投递给 死信交换机-->
<entry key="x-dead-letter-exchange" value="dlx_exchange"/>
</rabbit:queue-arguments>
</rabbit:queue>
</beans>
public class SenderDLX {
public static void main(String[] args) {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/spring-rabbitmq-producer-dlx.xml");
RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
HashMap<String, String> map = new HashMap<>();
map.put("name", "alice");
map.put("email", "alice@gmail.com");
// rabbitTemplate.convertAndSend("dlx_ttl", map);
rabbitTemplate.convertAndSend("dlx_max", "测试长度1".getBytes());
rabbitTemplate.convertAndSend("dlx_max", "测试长度2".getBytes());
rabbitTemplate.convertAndSend("dlx_max", "测试长度3".getBytes());
context.close();
}
}
**注意,对于长度超出来说,会把旧的消息挤出去而不是新的。**
延迟队列
延迟队列:TTL + 死信队列的合体
死信队列只是一种特殊的队列,里面的消息仍然可以消费
在电商开发部分中,都会涉及到延时关闭订单,此时延迟队列正好可以解决这个问题
具体实现:只要改一下消费者消费的队列名就可以了。
集群
rabbitmq有3种模式,但集群模式是2种。详细如下:
单一模式:即单机情况不做集群,就单独运行一个rabbitmq而已。之前我们一直在用
普通模式:默认模式,以两个节点(A、B)为例来进行说明
当消息进入A节点的Queue后,consumer从B节点消费时,RabbitMQ会在A和B之间创建临时通道进行消息传输,把A中的消息实体取出并经过通过交给B发送给consumer
当A故障后,B就无法取到A节点中未消费的消息实体
如果做了消息持久化,那么得等A节点恢复,然后才可被消费
如果没有持久化的话,就会产生消息丢失的现象
镜像模式:非常经典的 mirror 镜像模式,保证 100% 数据不丢失。
高可靠性解决方案,主要就是实现数据的同步,一般来讲是 2 - 3 个节点实现数据同步
对于 100% 数据可靠性解决方案,一般是采用 3 个节点。
在实际工作中也是用得最多的,并且实现非常的简单,一般互联网大厂都会构建这种镜像集 群模式
还有主备模式,远程模式,多活模式等,不作介绍。
镜像模式准备
1 修改host文件,把两个机器改名成A B
127.0.0.1 A localhost localhost.localdomain localhost4
localhost4.localdomain4
::1 A localhost localhost.localdomain localhost6
localhost6.localdomain6
192.168.204.141 A
192.168.204.142 B
2 两台服务器的cookie文件要一致
[root@A opt]# scp /var/lib/rabbitmq/.erlang.cookie 192.168.204.142:/var/lib/rabbitmq
3 关闭防火墙重启mq服务
[root@A ~]# systemctl stop firewalld
[root@A ~]# systemctl start rabbitmq-server
4 加入集群节点
[root@B ~]# rabbitmqctl stop_app
[root@B ~]# rabbitmqctl join_cluster rabbit@A
[root@B ~]# rabbitmqctl start_app
5 查看集群状态
[root@B ~]# rabbitmqctl cluster_status
6 为集群模式单独添加用户,哪个节点都行。关闭集群后原有的用户恢复正常。
root@A ~]# rabbitmqctl add_user laosun 123123
[root@A ~]# rabbitmqctl set_user_tags laosun administrator
[root@A ~]# rabbitmqctl set_permissions -p "/" laosun ".*" ".*" ".*"
镜像模式
[root@A ~]# rabbitmqctl set_policy xall "^" '{"ha-mode":"all"}'
HAProxy
虽然我们在程序中访问A服务器,可以实现消息的同步,虽然在同步,但都是A服务器在接收消 息,A太累
是否可以像Nginx一样,做负载均衡,A和B轮流接收消息,再镜像同步 ?
HAProxy工作在OSI的第四层和第七层,支持TCP与Http协议。只能做负载均衡。Nginx不仅仅是一款优秀的负载均衡器/反向代理软件,它同时也是功能强大的Web应用服务器。
性能上HA胜,功能性和便利性上Nginx胜
只做负载均衡可以用HAProxy。
KeepAlived
HAProxy也需要做高可用集群。
可以用KeepAlived实现。把多台设备的ip虚拟成一个ip对外开放。