Spring提供了整合Rabbit的依赖。
Spring 整合RabbitMQ
生产端工程
1. 添加依赖
<dependencies>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.0.1.RELEASE</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.9</version>
</dependency>
</dependencies>
2. spring-rabbitmq-producer.xml
<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
">
<!-- 1.配置连接Rabbit工厂 -->
<rabbit:connection-factory id="connectionFactory" host="172.16.150.130" port="5672" username="prim"
password="123456" virtual-host="/edu"/>
<!-- 2. 配置队列 -->
<rabbit:queue name="test_spring_queue_1"/>
<!-- 3. 配置RabbitAdmin:主要用于在Java代码中对队列的管理,用来创建、绑定、删除队列与交换机,发送消息等操作 -->
<rabbit:admin connection-factory="connectionFactory"/>
<!-- 4. 配置交换机/路由 topic类型 -->
<rabbit:topic-exchange name="spring_topic_exchange">
<!-- 绑定队列 -->
<rabbit:bindings>
<!-- pattern通配符 -->
<rabbit:binding pattern="msg.#" queue="test_spring_queue_1"/>
</rabbit:bindings>
</rabbit:topic-exchange>
<!-- 5. 配置json转换的工具:将消息转换成json的工具 -->
<bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"/>
<!-- 6. 配置RabbitMQ模板 -->
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" exchange="spring_topic_exchange"
message-converter="jsonMessageConverter"/>
</beans>
3. 发送消息
public static void main(String[] args) {
//1. 创建spring容器
ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext("spring/spring-rabbitmq-producer.xml");
//2. 从容器中获得Rabbit模板对象
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
//3. 发送消息
Map<String, String> map = new HashMap<>();
map.put("name", "prim");
map.put("email", "111@qq.com");
rabbitTemplate.convertAndSend("msg.user", map);
applicationContext.close();
}
消费端工程
1. 消费端配置
<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
">
<!-- 1.配置连接Rabbit工厂 -->
<rabbit:connection-factory id="connectionFactory" host="172.16.150.130" port="5672" username="prim"
password="123456" virtual-host="/edu"/>
<!-- 2. 配置队列 -->
<rabbit:queue name="test_spring_queue_1"/>
<!-- 3. 配置RabbitAdmin:主要用于在Java代码中对队列的管理,用来创建、绑定、删除队列与交换机,发送消息等操作 -->
<rabbit:admin connection-factory="connectionFactory"/>
<!-- 4. 注解扫描 -->
<context:component-scan base-package="listener"/>
<!-- 5. 配置监听 -->
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener ref="consumerListener" queue-names="test_spring_queue_1"/>
</rabbit:listener-container>
</beans>
2. 添加监听类
/**
* @program: spring-rabbit-consumer
* @Description: 消费者队列
* @author: sufulu
* @version: 1.0.0
* @create: 2021-02-02 22:36
* @PackageName: listener
* @ClassName: ConsumerListener.java
**/
@Component
public class ConsumerListener implements MessageListener {
@Override
public void onMessage(Message message) {
// 将Message对象转换成json
try {
JsonNode jsonNode = MAPPER.readTree(message.getBody());
String name = jsonNode.get("name").asText();
String email = jsonNode.get("email").asText();
System.out.println("从队列中获取:" + name + "的邮箱是:" + email);
} catch (IOException e) {
e.printStackTrace();
}
}
}
3. 编写测试类:
加载spring容器,让程序一直运行,监听生产端的消息
public class TestRunner {
public static void main(String[] args) throws IOException {
ClassPathXmlApplicationContext applicationContext =
new ClassPathXmlApplicationContext("spring/spring-rabbitmq-consumer.xml");
System.in.read();//程序一直运行
}
}
生产端,运行发送消息,查看消费端是否可以接收到消息?
消费端可以接收到消息,这样我们就实现了两个工程之间的通信
消息成功确认机制
在实际场景下,有的生产者发送的消息是必须保证成功发送到消息队列中,那么如何保证成功投递呢?
- 事务机制
- 发布确认(推荐使用)
事务机制
- AMQP协议提供的一种保证消息成功投递的方式,通过信道开启transactional模式
- 并利用信道的三个方法来实现以事务方式发送消息,若发送失败,通过异常处理回滚事务,确保消息成功投递
- channel.txSelect()开启事务
- channel.txCommit()提交事务
- channel.txRollback()回滚事务
示例代码如下:
- 生产者
```java
public class Sender {
public static void main(String[] args) throws Exception {
// channel.queueDeclare(“test_work_queue”, false, false, false, null);Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel();
} }//声明路由,创建网红主播 //第一个参数:路由名称 //第二个参数:路由类型 一共有四种。 //第三个参数:表示否是持久化路由 //direct:根据路由键进行定向分发消息 //topic:模糊匹配的定向分发 channel.exchangeDeclare("text_transaction_topic", "topic"); try { //开启事务 channel.txSelect(); //推消息到路由器 //第二个参数必填,路由键 channel.basicPublish("text_transaction_topic", "order.down", null, "商品1-降价".getBytes()); int i = 1 / 0;//出现了异常会导致:成功的成功,失败的失败,加入事务要么全部成功,要么全部失败 channel.basicPublish("text_transaction_topic", "order.down", null, "商品2-降价".getBytes()); //提交事务 channel.txCommit(); System.out.println("[消息已发送]"); } catch (IOException e) { System.out.println("消息全部撤销"); //回滚事务 channel.txRollback(); } finally { //5. 释放资源 channel.close(); connection.close(); }
- 消费者
```java
public class Recer1 {
public static void main(String[] args) throws Exception {
//1. 获得连接
Connection connection = ConnectionUtils.getConnection();
//2. 获得通道
Channel channel = connection.createChannel();
//queueDeclare 该方法有双重作用,如果队列不存在,则创建;如果队列存在,则获取
//声明队列
channel.queueDeclare("test_transaction_queue", false, false, false, null);
/**
* 绑定用户相关的消息:user.#
* 参数1:队列名
* 参数2:路由名
* 参数3:路由键
*/
channel.queueBind("test_transaction_queue", "text_transaction_topic", "order.#");
//3. 从信道中获得消息
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 交付处理(收件人信息,包裹上的快递标签,协议的配置,消息)
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body);
System.out.println("【消费者1】:" + s);
}
};
//4. 监听队列
//第二个参数:true 表示自动消息确认;false 手动确认消息
channel.basicConsume("test_transaction_queue", true, consumer);
}
}
Confirm发布确认机制
- RabbitMQ为了保证消息的成功投递,采用通过AMQP协议层面为我们提供事务机制的方案,但是采用事务会大大降低消息的吞吐量
- 事务效率低,为什么呢?假如10条消息,前9条成功了,如果第10条失败了,那么9条消息要全部撤销回滚,太浪费了
- Confirm模式则采用补发第10条消息的措施来完成10条消息的送达
在spring中的应用:
spring-rabbitmq-producer.xml
添加publisher-confirms="true"
启动生产者确认机制<!-- 1.配置连接Rabbit工厂 --> <rabbit:connection-factory id="connectionFactory" host="172.16.150.130" port="5672" username="prim" password="123456" virtual-host="/edu" publisher-confirms="true"/> <!-- 2. 配置队列 --> <rabbit:queue name="test_spring_queue_1"/> <!-- 3. 配置RabbitAdmin:主要用于在Java代码中对队列的管理,用来创建、绑定、删除队列与交换机,发送消息等操作 --> <rabbit:admin connection-factory="connectionFactory"/> <!-- 4. 配置交换机/路由 topic类型 --> <rabbit:topic-exchange name="spring_topic_exchange"> <!-- 绑定队列 --> <rabbit:bindings> <!-- pattern通配符 --> <rabbit:binding pattern="msg.#" queue="test_spring_queue_1"/> </rabbit:bindings> </rabbit:topic-exchange> <!-- 5. 配置json转换的工具:将消息转换成json的工具 --> <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"/> <!-- 6. 配置RabbitMQ模板 --> <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" exchange="spring_topic_exchange" message-converter="jsonMessageConverter" confirm-callback="messageConfirm"/> <!-- 7. 确认机制处理类 --> <bean id="messageConfirm" class="confirm.MessageConfirm"/>
消息确认处理类
/** * @program: spring-rabbitmq-producer * @Description: 消息确认处理 * @author: sufulu * @version: 1.0.0 * @create: 2021-02-02 23:48 * @PackageName: confirm * @ClassName: MessageConfirm.java **/ public class MessageConfirm implements RabbitTemplate.ConfirmCallback { /*** * * @param correlationData 消息相关的数据对象,封装了消息的唯一id * @param b 消息是否确认成功 * @param s 异常信息 */ @Override public void confirm(CorrelationData correlationData, boolean b, String s) { if (b) { System.out.println("消息确认成功"); } else { System.out.println("消息确认失败xxxx"); //补发 失败的消息,如果本条消息一定要发送到队列中,例如下订单消息,我们可以采用补发 //1. 采用递归 限制递归的次数 //2. redis+定时任务(JDK的timer,或者定时任务框架Quartz) } } }
测试发送失败消息,如下代码,发送到一个不存在的路由,那么消息肯定会失败
public class Sender { public static void main(String[] args) { //1. 创建spring容器 ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext("spring/spring-rabbitmq-producer.xml"); //2. 从容器中获得Rabbit模板对象 RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class); //3. 发送消息 Map<String, String> map = new HashMap<>(); map.put("name", "吕布"); map.put("email", "666@qq.com"); //将消息发送一个不存在的路由 测试消息发送失败 rabbitTemplate.convertAndSend("lalal","msg.user", map); System.out.println("[消息已发出]"); applicationContext.close(); } }
我们可以在消息确认类中补发消息:
补发 失败的消息,如果本条消息一定要发送到队列中,例如下订单消息,我们可以采用补发
1. 采用递归 限制递归的次数
2. redis+定时任务(JDK的timer,或者定时任务框架Quartz)
消费端限流
在RabbitMQ服务器积压了成千上万条未处理的消息,然后随便打开一个消费者客户端,就会出现:巨量的消息瞬间全部喷涌推送过来,但是单个客户端无法通知处理这么多数据就会被压垮崩溃。 所以在数据量特别大的时候,我们对生产端限流肯定不行的,因为有时候并发量特别大,有时候特别小,这是用户行为,无法约束的,所以应该对消费端限流,用于保持消费端的稳定。
RabbitMQ提供了一种Qos(Quality of Service 服务质量)服务质量保证功能。
即在非自动确认消息的前提下,如果一定数据的消息未被确认前,不再进行消费新的消息。
生产者循环发出多条消息
for (int i = 0; i < 10; i++) { rabbitTemplate.convertAndSend("msg.user", map); }
运行生产端,积压了10条消息
消费端进行限流处理
消费端配置:
<!-- 5. 配置监听
acknowledge="manual" 手动确认
prefetch="3" 一次性消费的消息数量,会告诉RabbitMQ不要同时给一个消费者推送多于N个消息,一但有N个消息 还没有ack
则Consumer将阻塞直到消息被ack
-->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="3">
<rabbit:listener ref="consumerListener" queue-names="test_spring_queue_1"/>
</rabbit:listener-container>
消费端监听类:
@Component
public class ConsumerListener extends AbstractAdaptableMessageListener {
//jackson提供序列化和反序列化中使用最多的类,转换json的
private static final ObjectMapper MAPPER = new ObjectMapper();
@Override
public void onMessage(Message message, Channel channel) throws Exception {
// 将Message对象转换成json
try {
JsonNode jsonNode = MAPPER.readTree(message.getBody());
String name = jsonNode.get("name").asText();
String email = jsonNode.get("email").asText();
System.out.println("从队列中获取:" + name + "的邮箱是:" + email);
//手动确认消息
//参数1:RabbitMQ向该channel投递的这条消息的唯一标识id,此id是一个单调递增的正整数
//参数2:为了减少网络流量,手动确认可以被批量处理。当为true时,则可以一次性确认小于等于messageId值的所有消息
//加入messageId=8 则可以确认<=8的所有消息
long messageId = message.getMessageProperties().getDeliveryTag();
System.out.println("messageId:" + messageId);
channel.basicAck(messageId, true);
Thread.sleep(3000);
System.out.println("休息三秒后再继续接收消息");
} catch (IOException e) {
e.printStackTrace();
}
}
}
过期时间TTL
Time To Live:生存时间,单位:毫秒,如果超过了这个时间,则自动删除(被称为dead message 并投入到死信队列,无法消费该消息) RabbitMQ可以对消息和队列设置TTL
- 通过队列设置,队列中所有都有相同的过期时间
- 对消息单独设置,每条消息的TTL可以不同(更颗粒化)
设置队列TTL
在生产端配置:
<!-- 2. 配置队列 -->
<!-- <rabbit:queue name="test_spring_queue_1"/>-->
<rabbit:queue name="test_spring_queue_1" auto-declare="true">
<rabbit:queue-arguments>
<entry key="x-message-ttl" value-type="long" value="5000"/>
</rabbit:queue-arguments>
</rabbit:queue>
等待5S之后:消息自动删除了
设置消息TTL
队列配置
<rabbit:queue name="test_spring_queue_ttl_2"/>
发送消息的设置:
public static void main(String[] args) {
ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext("spring/spring-rabbitmq-producer.xml");
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
//创建消息的配置对象
MessageProperties properties = new MessageProperties();
properties.setExpiration("3000");//设置过期时间三秒
//创建消息
Message message = new Message("测试过期时间".getBytes(), properties);
rabbitTemplate.convertAndSend("msg.user", message);
System.out.println("[消息已发出]");
applicationContext.close();
}
死信队列
DLX 死信交换机/死信邮箱,当消息在队列中由于某些原因没有被及时消费而变成死信后,这些消息就会被分发到DLX交换机中,而绑定DLX交换机的队列,称之为:死信队列。
消息没有被及时消费的原因:
- 消息被拒绝(basic.reject/basic.nack)并且不再重新投递 requeue=false
- 消息超时未消费
- 达到最大队列长度
代码示例:
spring/spring-rabbitmq-producer2.xml
<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
">
<rabbit:connection-factory
id="connectionFactory"
host="172.16.150.130"
port="5672"
username="prim"
password="123456"
virtual-host="/edu"/>
<rabbit:admin connection-factory="connectionFactory"/>
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
<!-- 死信的相关配置 -->
<!-- 定义死信队列 -->
<rabbit:queue name="dlx_queue"/>
<!-- 定向的死信交换机 -->
<rabbit:direct-exchange name="dle_exchange">
<rabbit:bindings>
<rabbit:binding key="dlx_ttl" queue="dlx_queue"/>
<rabbit:binding key="dlx_max" queue="dlx_queue"/>
</rabbit:bindings>
</rabbit:direct-exchange>
<!-- 声明过期时间的消息队列 -->
<rabbit:queue name="test_ttl_queue">
<rabbit:queue-arguments>
<!-- 设置队列的过期时间 -->
<entry key="x-message-ttl" value-type="long" value="6000"/>
<!-- 消息如果超时,将消息投递给死信交换机 -->
<entry key="x-dead-letter-exchange" value="dle_exchange"/>
</rabbit:queue-arguments>
</rabbit:queue>
<!-- 声明固定长度的消息队列 -->
<rabbit:queue name="test_max_queue">
<rabbit:queue-arguments>
<!-- 设置队列的长度,最多装2个消息 -->
<entry key="x-max-length" value-type="long" value="2"/>
<!-- 消息如果超出长度,将消息投递给死信交换机 -->
<entry key="x-dead-letter-exchange" value="dle_exchange"/>
</rabbit:queue-arguments>
</rabbit:queue>
<!-- 声明定向的测试消息的交换机 -->
<rabbit:direct-exchange name="my_exchange">
<rabbit:bindings>
<rabbit:binding key="dlx_ttl" queue="test_ttl_queue"/>
<rabbit:binding key="dlx_max" queue="test_max_queue"/>
</rabbit:bindings>
</rabbit:direct-exchange>
</beans>
- 编写发送消息类
public class SenderDLX { public static void main(String[] args) { //1. 创建spring容器 ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext("spring/spring-rabbitmq-producer-dlx.xml"); //2. 从容器中获得Rabbit模板对象 RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class); //传递路由键,将消息传递到配置的test_ttl_queue队列 rabbitTemplate.convertAndSend("dlx_ttl", "测试超时".getBytes()); System.out.println("[消息已发出]"); applicationContext.close(); } }
发送消息:在管理端查看如下,在test_ttl_queue
中有一条消息,dlx_queue
中的一条消息是我之前测试的。
等待6S之后: ,在test_ttl_queue
中有0条消息,dlx_queue
中的2条消息。
测试消息超出长度,在配置中我们配置的是两条消息,发送三条消息进行测试
rabbitTemplate.convertAndSend("dlx_max", "测试长度1".getBytes()); rabbitTemplate.convertAndSend("dlx_max", "测试长度2".getBytes()); rabbitTemplate.convertAndSend("dlx_max", "测试长度3".getBytes());
查看管理端:可以看到
test_max_queue
只有2条,超出的那条被挤到了死信队列dlx_queue
消息顺序发送 1 2 3,第一条消息被挤到了dlx_queue
死信队列中了,如下查看死信队列的信息
延迟队列
延迟队列:TTL + 死信队列的合体 死信队列只是一种特殊的队列,里面的消息仍然可以消费。 在电商开发部分中,都会涉及到延时关闭顶点,此时延迟队列正好可以解决这个问题
生产者:沿用上面死信队列案例的超时测试,超时时间改为订单关闭时间即可。
消费者:监听死信队列
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="3"> <!-- 监听死信队列 --> <rabbit:listener ref="consumerListener" queue-names="dlx_queue"/> </rabbit:listener-container>
@Component public class ConsumerListener extends AbstractAdaptableMessageListener { //jackson提供序列化和反序列化中使用最多的类,转换json的 private static final ObjectMapper MAPPER = new ObjectMapper(); @Override public void onMessage(Message message, Channel channel) throws Exception { // 将Message对象转换成json try { byte[] body = message.getBody(); String s = new String(body); System.out.println(s); // JsonNode jsonNode = MAPPER.readTree(message.getBody()); // String name = jsonNode.get("name").asText(); // String email = jsonNode.get("email").asText(); // System.out.println("从队列中获取:" + name + "的邮箱是:" + email); //手动确认消息 //参数1:RabbitMQ向该channel投递的这条消息的唯一标识id,此id是一个单调递增的正整数 //参数2:为了减少网络流量,手动确认可以被批量处理。当为true时,则可以一次性确认小于等于messageId值的所有消息 //加入messageId=8 则可以确认<=8的所有消息 long messageId = message.getMessageProperties().getDeliveryTag(); // System.out.println("messageId:" + messageId); channel.basicAck(messageId, true); // Thread.sleep(3000); // System.out.println("休息三秒后再继续接收消息"); } catch (Exception e) { e.printStackTrace(); } } }
运行发送端:
消费端监听死信队列:拿到消息ACK确认,消息被消费掉了,所以dlx_queue
队列中消息消费掉了 就没有了