




下面将进行代码示例:
一、SpringBoot环境配置
1.xml
<dependencies><dependency><groupId>org.springframework</groupId><artifactId>spring-context</artifactId><version>5.1.7.RELEASE</version></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit</artifactId><version>2.1.8.RELEASE</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-test</artifactId><version>5.1.7.RELEASE</version></dependency></dependencies><!--打包jar--><build><finalName>service-common-crm</finalName><plugins><!--spring-boot-maven-plugin--><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><!--解决打包出来的jar文件中没有主清单属性问题--><executions><execution><goals><goal>repackage</goal></goals></execution></executions></plugin></plugins></build>
2.rabbitmq.properties配置文件
rabbitmq.host=124.71.7.200rabbitmq.port=5672rabbitmq.username=azhirabbitmq.password=2021888rabbitmq.virtual-host=vh_test
3.Bean容器,生产者spring-rabbitmq-producer.xml
<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:context="http://www.springframework.org/schema/context"xmlns:rabbit="http://www.springframework.org/schema/rabbit"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/contexthttps://www.springframework.org/schema/context/spring-context.xsdhttp://www.springframework.org/schema/rabbithttp://www.springframework.org/schema/rabbit/spring-rabbit.xsd"><!--加载配置文件--><context:property-placeholder location="classpath:rabbitmq.properties"/><!-- 定义rabbitmq connectionFactory --><rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"port="${rabbitmq.port}"username="${rabbitmq.username}"password="${rabbitmq.password}"virtual-host="${rabbitmq.virtual-host}"publisher-confirms="true"publisher-returns="true"/><!--定义管理交换机、队列--><rabbit:admin connection-factory="connectionFactory"/><!--定义rabbitTemplate对象操作可以在代码中方便发送消息--><rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/><!--####################################消息可靠性投递 开始 ################################--><!--消息可靠性投递(生产端)--><rabbit:queue id="test_queue_confirm" name="test_queue_confirm"></rabbit:queue><rabbit:direct-exchange name="test_exchange_confirm"><rabbit:bindings><rabbit:binding queue="test_queue_confirm" key="confirm"></rabbit:binding></rabbit:bindings></rabbit:direct-exchange><!--######################################消息可靠性投递 结束 ##########################################--><!--####################################TTL 开始 ################################--><!--ttl--><rabbit:queue name="test_queue_ttl" id="test_queue_ttl"><!--设置queue的参数--><rabbit:queue-arguments><!--x-message-ttl指队列的过期时间--><entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"></entry></rabbit:queue-arguments></rabbit:queue><rabbit:topic-exchange name="test_exchange_ttl" ><rabbit:bindings><rabbit:binding pattern="ttl.#" queue="test_queue_ttl"></rabbit:binding></rabbit:bindings></rabbit:topic-exchange><!--######################################TTL 结束 ##########################################--><!--#######################################死信队列 开始 #########################################--><!--死信队列:1. 声明正常的队列(test_queue_dlx)和交换机(test_exchange_dlx)2. 声明死信队列(queue_dlx)和死信交换机(exchange_dlx)3. 正常队列绑定死信交换机设置两个参数:* x-dead-letter-exchange:死信交换机名称* x-dead-letter-routing-key:发送给死信交换机的routingkey--><!--1. 声明正常的队列(test_queue_dlx)和交换机(test_exchange_dlx)--><rabbit:queue name="test_queue_dlx" id="test_queue_dlx"><!--3. 正常队列绑定死信交换机--><rabbit:queue-arguments><!--3.1 x-dead-letter-exchange:死信交换机名称--><entry key="x-dead-letter-exchange" value="exchange_dlx" /><!--3.2 x-dead-letter-routing-key:发送给死信交换机的routingkey--><entry key="x-dead-letter-routing-key" value="dlx.hehe" /><!--4.1 设置队列的过期时间 ttl--><entry key="x-message-ttl" value="10000" value-type="java.lang.Integer" /><!--4.2 设置队列的长度限制 max-length--><entry key="x-max-length" value="10" value-type="java.lang.Integer" /></rabbit:queue-arguments></rabbit:queue><rabbit:topic-exchange name="test_exchange_dlx"><rabbit:bindings><rabbit:binding pattern="test.dlx.#" queue="test_queue_dlx"></rabbit:binding></rabbit:bindings></rabbit:topic-exchange><!--2. 声明死信队列(queue_dlx)和死信交换机(exchange_dlx)--><rabbit:queue name="queue_dlx" id="queue_dlx"></rabbit:queue><rabbit:topic-exchange name="exchange_dlx"><rabbit:bindings><rabbit:binding pattern="dlx.#" queue="queue_dlx"></rabbit:binding></rabbit:bindings></rabbit:topic-exchange><!--######################################死信队列 结束##########################################--><!--########################################延迟队列 开始########################################--><!--延迟队列:1. 定义正常交换机(order_exchange)和队列(order_queue)2. 定义死信交换机(order_exchange_dlx)和队列(order_queue_dlx)3. 绑定,设置正常队列过期时间为30分钟--><!-- 1. 定义正常交换机(order_exchange)和队列(order_queue)--><rabbit:queue id="order_queue" name="order_queue"><!--3. 绑定,设置正常队列过期时间为30分钟--><rabbit:queue-arguments><entry key="x-dead-letter-exchange" value="order_exchange_dlx" /><entry key="x-dead-letter-routing-key" value="dlx.order.cancel" /><entry key="x-message-ttl" value="10000" value-type="java.lang.Integer" /></rabbit:queue-arguments></rabbit:queue><rabbit:topic-exchange name="order_exchange"><rabbit:bindings><rabbit:binding pattern="order.#" queue="order_queue"></rabbit:binding></rabbit:bindings></rabbit:topic-exchange><!--2. 定义死信交换机(order_exchange_dlx)和队列(order_queue_dlx)--><rabbit:queue id="order_queue_dlx" name="order_queue_dlx"></rabbit:queue><rabbit:topic-exchange name="order_exchange_dlx"><rabbit:bindings><rabbit:binding pattern="dlx.order.#" queue="order_queue_dlx"></rabbit:binding></rabbit:bindings></rabbit:topic-exchange><!--######################################延迟队列 结束 ##########################################--></beans>
4.Bean窗口,消费者spring-rabbitmq-consumer.xml
<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:context="http://www.springframework.org/schema/context"xmlns:rabbit="http://www.springframework.org/schema/rabbit"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/contexthttps://www.springframework.org/schema/context/spring-context.xsdhttp://www.springframework.org/schema/rabbithttp://www.springframework.org/schema/rabbit/spring-rabbit.xsd"><!--加载配置文件--><context:property-placeholder location="classpath:rabbitmq.properties"/><!-- 定义rabbitmq connectionFactory --><rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"port="${rabbitmq.port}"username="${rabbitmq.username}"password="${rabbitmq.password}"virtual-host="${rabbitmq.virtual-host}"/><context:component-scan base-package="com.baiqi.listener" /><!--定义监听器容器acknowledge="manual":手动签收prefetch="1":每次抓取多少条消息--><!--定义监听器容器 acknowledge="manual" prefetch="1" --><rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual"><rabbit:listener ref="ackListener" queue-names="test_queue_confirm"></rabbit:listener><!--<rabbit:listener ref="qosListener" queue-names="test_queue_confirm"></rabbit:listener>--><!--定义监听器,监听正常队列--><!--<rabbit:listener ref="dlxListener" queue-names="test_queue_dlx"></rabbit:listener>--><!--延迟队列效果实现: 一定要监听的是 死信队列!!!--><rabbit:listener ref="orderListener" queue-names="order_queue_dlx"></rabbit:listener></rabbit:listener-container></beans>
二、生产者测试代码
@RunWith(SpringJUnit4ClassRunner.class)@ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")public class ProducerTest {@Autowiredprivate RabbitTemplate rabbitTemplate;//测试 Confirm 模式@Testpublic void testConfirm() {//定义回调rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {/**** @param correlationData 相关配置信息* @param ack exchange交换机 是否成功收到了消息。true 成功,false代表失败* @param cause 失败原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {//System.out.println("confirm方法被执行了...."+correlationData.getId());//ack 为 true表示 消息已经到达交换机if (ack) {//接收成功System.out.println("接收成功消息" + cause);} else {//接收失败System.out.println("接收失败消息" + cause);//做一些处理,让消息再次发送。}}});//进行消息发送for (int i = 0; i < 5; i++) {rabbitTemplate.convertAndSend("test_exchange_confirm", "confirm", "message Confirm...");}//进行睡眠操作try {Thread.sleep(1000);} catch (Exception e) {e.printStackTrace();}}//测试 return模式@Testpublic void testReturn() {//设置交换机处理失败消息的模式 为true的时候,消息达到不了 队列时,会将消息重新返回给生产者rabbitTemplate.setMandatory(true);//定义回调rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {/**** @param message 消息对象* @param replyCode 错误码* @param replyText 错误信息* @param exchange 交换机* @param routingKey 路由键*/@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {System.out.println("return 执行了....");System.out.println("message:" + message);System.out.println("replyCode:" + replyCode);System.out.println("replyText:" + replyText);System.out.println("exchange:" + exchange);System.out.println("routingKey:" + routingKey);//处理}});//进行消息发送rabbitTemplate.convertAndSend("test_exchange_confirm", "confirm11", "message return...");//进行睡眠操作try {Thread.sleep(5000);} catch (Exception e) {e.printStackTrace();}}//批量发送消息,让消费者每次拉去指定的数量@Testpublic void testQos() {for (int i = 0; i < 10; i++) {// 发送消息rabbitTemplate.convertAndSend("test_exchange_confirm", "confirm", "message confirm....");}}/*** TTL:过期时间* 1. 队列统一过期* <p>* 2. 消息单独过期* <p>* <p>* 如果设置了消息的过期时间,也设置了队列的过期时间,它以时间短的为准。* 队列过期后,会将队列所有消息全部移除。* 消息过期后,只有消息在队列顶端,才会判断其是否过期(移除掉)*/@Testpublic void testTtl() {for (int i = 0; i < 10; i++) {// 发送消息rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.baiqi", "message ttl....");}}/*** 发送测试死信消息:* 1. 过期时间* 2. 长度限制* 3. 消息拒收*/@Testpublic void testDlx() {//1. 测试过期时间,死信消息rabbitTemplate.convertAndSend("test_exchange_dlx", "test.dlx.hehe", "死信消息测试1,开始...");//2. 测试长度限制后,消息死信/* for (int i = 0; i < 20; i++) {rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.hehe","死信消息测试2,开始...");}*///3. 测试消息拒收//rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.baiqi","死信消息测试3,开始...");}/** 测试延时消息* */@Testpublic void testDelay() throws InterruptedException {//1.发送订单消息。 将来是在订单系统中,下单成功后,发送消息rabbitTemplate.convertAndSend("order_exchange", "order.msg", "订单信息:id=1,time=2020年12月...");//2.打印倒计时10秒for (int i = 10; i > 0; i--) {System.out.println(i + "...");Thread.sleep(1000);}}}
执行testConfirm结果
执行testReturn结果
三、消费者测试代码
1.初始化IOC容器
import org.springframework.context.ApplicationContext;import org.springframework.context.support.ClassPathXmlApplicationContext;public class Test {public static void main(String[] args) {//初始化IOC容器ApplicationContext ctx = new ClassPathXmlApplicationContext("classpath:spring-rabbitmq-consumer.xml");}}
2.AckListener类
import com.rabbitmq.client.Channel;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;import org.springframework.stereotype.Component;/***/@Componentpublic class AckListener implements ChannelAwareMessageListener {@Overridepublic void onMessage(Message message, Channel channel) throws Exception {//1、获取消息的idlong deliveryTag = message.getMessageProperties().getDeliveryTag();try {//2、获取消息System.out.println("message:"+new String(message.getBody()));//3、进行业务处理System.out.println("=====进行业务处理====");//模拟出现异常//int i = 5/0;//4、进行消息签收//channel.basicAck(deliveryTag, false);System.out.println("收到了消息:"+deliveryTag);} catch (Exception e) {//拒绝签收/*第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker会重新发送该消息给消费端*/// channel.basicNack(deliveryTag, false, true);}}}
3.DlxListener类
import com.rabbitmq.client.Channel;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;import org.springframework.stereotype.Component;@Componentpublic class DlxListener implements ChannelAwareMessageListener {@Overridepublic void onMessage(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {//1.接收转换消息System.out.println(new String(message.getBody()));//2. 处理业务逻辑System.out.println("处理业务逻辑...");//int i = 3/0;//出现错误//3. 手动签收channel.basicAck(deliveryTag,true);} catch (Exception e) {//e.printStackTrace();System.out.println("出现异常,拒绝接受");//4.拒绝签收,不重回队列 requeue=falsechannel.basicNack(deliveryTag,true,false);}}}
4.OrderListener类
import com.rabbitmq.client.Channel;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;import org.springframework.stereotype.Component;@Componentpublic class OrderListener implements ChannelAwareMessageListener {@Overridepublic void onMessage(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {//1.接收转换消息System.out.println(new String(message.getBody()));//2. 处理业务逻辑System.out.println("处理业务逻辑...");System.out.println("根据订单id查询其状态...");System.out.println("判断状态是否为支付成功");System.out.println("取消订单,回滚库存....");//3. 手动签收channel.basicAck(deliveryTag,true);} catch (Exception e) {//e.printStackTrace();System.out.println("出现异常,拒绝接受");//4.拒绝签收,不重回队列 requeue=falsechannel.basicNack(deliveryTag,true,false);}}}
5.QosListener类
import com.rabbitmq.client.Channel;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;import org.springframework.stereotype.Component;/***/@Componentpublic class QosListener implements ChannelAwareMessageListener {@Overridepublic void onMessage(Message message, Channel channel) throws Exception {//获取到的消息System.out.println(new String(message.getBody()));Thread.sleep(1000);//处理业务逻辑//进行消息的签收//channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);}}
