创建生产者工程

7.png8.png

配置pom.xml文件

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <modelVersion>4.0.0</modelVersion>
  6. <groupId>com.cdq</groupId>
  7. <artifactId>rabbitmq-producer-spring</artifactId>
  8. <version>1.0-SNAPSHOT</version>
  9. <dependencies>
  10. <dependency>
  11. <groupId>org.springframework</groupId>
  12. <artifactId>spring-context</artifactId>
  13. <version>5.1.7.RELEASE</version>
  14. </dependency>
  15. <dependency>
  16. <groupId>org.springframework.amqp</groupId>
  17. <artifactId>spring-rabbit</artifactId>
  18. <version>2.1.8.RELEASE</version>
  19. </dependency>
  20. <dependency>
  21. <groupId>junit</groupId>
  22. <artifactId>junit</artifactId>
  23. <version>4.12</version>
  24. </dependency>
  25. <dependency>
  26. <groupId>org.springframework</groupId>
  27. <artifactId>spring-test</artifactId>
  28. <version>5.1.7.RELEASE</version>
  29. </dependency>
  30. </dependencies>
  31. <build>
  32. <plugins>
  33. <plugin>
  34. <groupId>org.apache.maven.plugins</groupId>
  35. <artifactId>maven-compiler-plugin</artifactId>
  36. <version>3.8.0</version>
  37. <configuration>
  38. <source>1.8</source>
  39. <target>1.8</target>
  40. </configuration>
  41. </plugin>
  42. </plugins>
  43. </build>
  44. </project>

配置文件 rabbitmq.properties

在resource 文件夹下面添加
rabbitmq.host=localhost //连接路径
rabbitmq.port=5672 //连接地址
rabbitmq.username=guest //网址登录名
rabbitmq.password=guest //网址登录密码
rabbitmq.virtual-host=/ //网址连接符号

  1. rabbitmq.host=localhost
  2. rabbitmq.port=5672
  3. rabbitmq.username=guest
  4. rabbitmq.password=guest
  5. rabbitmq.virtual-host=/

配置文件 spring-rabbitmq-producer.xml

在resource 文件夹下面添加

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <beans xmlns="http://www.springframework.org/schema/beans"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xmlns:context="http://www.springframework.org/schema/context"
  5. xmlns:rabbit="http://www.springframework.org/schema/rabbit"
  6. xsi:schemaLocation="http://www.springframework.org/schema/beans
  7. http://www.springframework.org/schema/beans/spring-beans.xsd
  8. http://www.springframework.org/schema/context
  9. https://www.springframework.org/schema/context/spring-context.xsd
  10. http://www.springframework.org/schema/rabbit
  11. http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
  12. <!--加载配置文件-->
  13. <!--定义rabbitmq connectionFactory (工厂模式)
  14. 确认模式开启:publisher-confirms="true"-->
  15. <!--定义交换机队列-->
  16. <!--定义rabbitTemplate对象操作可以在代码中方便发送消息-->
  17. <!--消息可靠性投递(生产端)-->
  18. <!--加载配置文件-->
  19. <context:property-placeholder location="classpath:rabbitmq.properties"></context:property-placeholder>
  20. <!--定义rabbitmq connectionFactory (工厂模式)
  21. 确认模式开启:publisher-confirms="true"-->
  22. <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
  23. port="${rabbitmq.port}"
  24. username="${rabbitmq.username}"
  25. password="${rabbitmq.password}"
  26. virtual-host="${rabbitmq.virtual-host}"
  27. publisher-confirms="true"
  28. publisher-returns="true"
  29. />
  30. <!--定义交换机队列-->
  31. <rabbit:admin connection-factory="connectionFactory"/>
  32. <!--定义rabbitTemplate对象操作可以在代码中方便发送消息-->
  33. <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
  34. <!--消息可靠性投递(生产端)-->
  35. <rabbit:queue id="test_queue_confirm" name="test_queue_confirm"></rabbit:queue>
  36. <rabbit:direct-exchange name="test_exchange_confirm">
  37. <rabbit:bindings>
  38. <rabbit:binding queue="test_queue_confirm" key="confirm"></rabbit:binding>
  39. </rabbit:bindings>
  40. </rabbit:direct-exchange>
  41. </beans>

创建测试类 , 添加确认模式

@RunWith(SpringJUnit4ClassRunner.class) 开启Test方法
@ContextConfiguration 是引用配置文件或者配置类的注解
@ContextConfiguration这个注解通常与@RunWith(SpringJUnit4ClassRunner.class)联合使用用来测试

@ContextConfiguration使用方式

  • classpath:只会到你的class路径中查找找文件。
  • classpath*:不仅包含class路径,还包括jar文件中(class路径)进行查找。
    ```yaml package com.cdq;

import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import org.springframework.test.context.junit4.SpringRunner;

/**

  • @program: one
  • @description:
  • @author: cdq
  • @create: 2021-03-29 09:31 / @RunWith (SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = “classpath:spring-rabbitmq-producer.xml”) public class ProducerTest { @Autowired private RabbitTemplate rabbitTemplate; /

    • 确认模式:
    • 步骤:
      1. 确认模式开启:ConnectionFactory中开启publisher-confirms=”true”
      1. 在rabbitTemplate定义ConfirmCallBack回调函数 */ @Test public void testConfirm() { //2. 定义回调 rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {

        @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) {

        1. if (ack){
        2. //接收成功
        3. System.out.println("接收成功消息" + cause);
        4. }else {
        5. //接收失败
        6. System.out.println("接收失败消息" + cause);
        7. //做一些处理,让消息再次发送。
        8. }

        } });

        /*

        • 发送消息
        • 第一个参数:交换机名字
        • 第二个参数:路由规则
        • 第三个参数:表示消息体
        • */ rabbitTemplate.convertAndSend(“test_exchange_confirm000”, “confirm”, “message confirm….”); }

}

  1. <a name="RK68m"></a>
  2. ## 运行结果
  3. <a name="nV2eL"></a>
  4. ### 接收失败消息
  5. channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'test_exchange_confirm000' in vhost '/', class-id=60, method-id=40)<br />Disconnected from the target VM, address: '127.0.0.1:5380', transport: 'socket'<br />这时由于:_发送消息队列名字和xml中的名字对应不上_<br />test_exchange_confirm000 改变 test_exchange_confirm<br />![9.png](https://cdn.nlark.com/yuque/0/2021/png/12916683/1616983112695-b52885b4-5bfa-4f6c-bff7-26b6107b4b0b.png#align=left&display=inline&height=424&margin=%5Bobject%20Object%5D&name=9.png&originHeight=424&originWidth=1532&size=34214&status=done&style=none&width=1532)
  6. <a name="6lh9v"></a>
  7. ### 修改后
  8. ![10.png](https://cdn.nlark.com/yuque/0/2021/png/12916683/1616983212409-9a0ac36d-1e33-4718-9eb3-f142f40f5c20.png#align=left&display=inline&height=380&margin=%5Bobject%20Object%5D&name=10.png&originHeight=380&originWidth=997&size=27439&status=done&style=none&width=997)
  9. <a name="HvYWs"></a>
  10. ### 访问http://[http://localhost:15672](http://localhost:15672/)
  11. ![11.png](https://cdn.nlark.com/yuque/0/2021/png/12916683/1616983869966-ab085b69-a0ab-46a7-a3b4-4491bafc2296.png#align=left&display=inline&height=554&margin=%5Bobject%20Object%5D&name=11.png&originHeight=554&originWidth=1010&size=34138&status=done&style=none&width=1010)
  12. <a name="Dm0Tq"></a>
  13. ### 接收到的消息
  14. ![12.png](https://cdn.nlark.com/yuque/0/2021/png/12916683/1616983900670-712b88a0-c8cc-4806-9272-f75db53ebcac.png#align=left&display=inline&height=509&margin=%5Bobject%20Object%5D&name=12.png&originHeight=509&originWidth=684&size=18133&status=done&style=none&width=684)
  15. <a name="ScE6U"></a>
  16. ## 添加回退
  17. 在ProducerTest中添加
  18. ```yaml
  19. /**
  20. * 回退模式: 当消息发送给Exchange后,Exchange路由到Queue失败是 才会执行 ReturnCallBack
  21. * 步骤:
  22. * 1. 开启回退模式:publisher-returns="true"
  23. * 2. 设置ReturnCallBack
  24. * 3. 设置Exchange处理消息的模式:
  25. * 1. 如果消息没有路由到Queue,则丢弃消息(默认)
  26. * 2. 如果消息没有路由到Queue,返回给消息发送方ReturnCallBack
  27. */
  28. @Test
  29. public void testReturn() {
  30. //交换机设置强制处理失败消息的模式
  31. rabbitTemplate.setMandatory(true);
  32. //2.设置ReturnCallBack
  33. rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
  34. /**
  35. *
  36. * @param message 消息对象
  37. * @param replyCode 错误码
  38. * @param replyText 错误信息
  39. * @param exchange 交换机
  40. * @param routingKey 路由键
  41. */
  42. @Override
  43. public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
  44. System.out.println("return 执行了....");
  45. System.out.println(message);
  46. System.out.println(replyCode);
  47. System.out.println(replyText);
  48. System.out.println(exchange);
  49. System.out.println(routingKey);
  50. //处理
  51. }
  52. });
  53. /*
  54. * 发送消息
  55. * 第一个参数:交换机名字
  56. * 第二个参数:路由规则
  57. * 第三个参数:表示消息体
  58. *
  59. * */
  60. rabbitTemplate.convertAndSend("test_exchange_confirm", "confirm", "message confirm....");
  61. }

目前rabbitMQ中有一条消息

22.png

远行结果

23.png

rabbitMQ中有两条消息

24.png

路由规则书写错误

会返回错误的信息
25.png

rabbitMQ中的消息并不会更新

26.png