1. RabbitMQ 整合 Spring AMQP

AMQP 核心组件:

  • RabbitAdmin
  • SpringAMQP 声明
  • RabbitTemplate
  • SimpleMessageListenerContainer
  • MessageListenerAdapter
  • MessageConverter

    1.1 SpringAMQP 用户管理组件:RabbitAdmin

    1.1.1 描述

  • RabbitAdmin 类可以很好的操作 RabbitMQ,在 Spring 中直接进行注入即可:

    1. @Bean
    2. public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
    3. RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
    4. rabbitAdmin.setAutoStartup(true);
    5. return rabbitAdmin;
    6. }
  • RabbitMQ 类实现了一些接口,其中的 InitializingBean接口包含 afterPropertiesSet() 抽象方法,作用是在所有 Bean 加载完成后初始化设置;

    1. public interface InitializingBean {
    2. void afterPropertiesSet() throws Exception;
    3. }
    1. @ManagedResource(description = "Admin Tasks")
    2. public class RabbitAdmin implements AmqpAdmin, ApplicationContextAware, ApplicationEventPublisherAware,
    3. BeanNameAware, InitializingBean {
    4. @Override
    5. public void afterPropertiesSet() {
    6. synchronized (this.lifecycleMonitor) {
    7. // autoStartup == false 时直接返回
    8. if (this.running || !this.autoStartup) {
    9. return;
    10. }
    11. }
    12. ...
    13. initialize();
    14. ...
    15. }
    16. ...
    17. //Declares all the exchanges, queues and bindings in the enclosing application context,
    18. //if any. It should be safe (but unnecessary) to call this method more than once.
    19. public void initialize() {
    20. ...
    21. }
    22. }

    :::info

  • autoStartup 必须要设置为 true,否则 Spring 容器不会加载 RabbitAdmin 类;

  • RabbitAdmin 底层实现就是从 Spring 容器中获取 Exchange、Binding、RoutingKey 以及 Queue 的 @Bean 声明,然后使用 RabbitTemplate 的 execute 方法执行对应的声明、修改、删除等一系列 RabbitMQ 基础功能操作;
  • RabbitAdmin 经常用来,添加一个交换机、删除一个绑定、清空一个队列里的消息等等; :::

1.1.2 代码演示:

虽然新建的是一个 SpringBoot 项目,但是在整合时主要使用 RabbitMQ 和 Spring。

  • POM 添加依赖

    • amqp-client:RabbitMQ 的核心包
    • spring-boot-starter-amqp:RabbitMQ 与 Spring 整合的包
      1. <dependency>
      2. <groupId>com.rabbitmq</groupId>
      3. <artifactId>amqp-client</artifactId>
      4. <version>5.9.0</version>
      5. </dependency>
      6. <dependency>
      7. <groupId>org.springframework.boot</groupId>
      8. <artifactId>spring-boot-starter-amqp</artifactId>
      9. </dependency>
  • 将 RabbitMQ 注入到 Spring 容器

    • 创建一个 RabbitMQ 配置类:RabbitMQConfig
    • 可以从 Spring 容器获得 ConnectionFactory、RabbitAdmin

      1. @Configuration
      2. @ComponentScan({"com.example.rabbitmqspring.*"})
      3. public class RabbitMQConfig {
      4. @Bean
      5. public ConnectionFactory connectionFactory(){
      6. CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
      7. connectionFactory.setAddresses("192.168.12.131:5672");
      8. connectionFactory.setUsername("shawn");
      9. connectionFactory.setPassword("123");
      10. connectionFactory.setVirtualHost("/");
      11. return connectionFactory;
      12. }
      13. @Bean
      14. public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
      15. RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
      16. rabbitAdmin.setAutoStartup(true);
      17. return rabbitAdmin;
      18. }
      19. }
  • 如何使用 RabbitMQAdmin

    • 我们可以在 JunitTest 中使用 RabbitAdmin 来测试使用效果 ```java @SpringBootTest class ApplicationTests {

      @Test void contextLoads() { } @Autowired private RabbitAdmin rabbitAdmin;

      @Test public void testAdmin() { rabbitAdmin.declareExchange(new DirectExchange(“test.direct”, false, false));

      rabbitAdmin.declareExchange(new TopicExchange(“test.topic”, false, false));

      rabbitAdmin.declareExchange(new FanoutExchange(“test.fanout”, false, false));

      rabbitAdmin.declareQueue(new Queue(“test.direct.queue”, false));

      rabbitAdmin.declareQueue(new Queue(“test.topic.queue”, false));

      rabbitAdmin.declareQueue(new Queue(“test.fanout.queue”, false));

      rabbitAdmin.declareBinding(new Binding(“test.direct.queue”,

      1. Binding.DestinationType.QUEUE,
      2. "test.direct", "direct", new HashMap<>()));

      rabbitAdmin.declareBinding(

      1. BindingBuilder
      2. .bind(new Queue("test.topic.queue", false)) //直接创建队列
      3. .to(new TopicExchange("test.topic", false, false)) //直接创建交换机 建立关联关系
      4. .with("user.#")); //指定路由Key
  1. rabbitAdmin.declareBinding(
  2. BindingBuilder
  3. .bind(new Queue("test.fanout.queue", false))
  4. .to(new FanoutExchange("test.fanout", false, false)));
  5. //清空队列数据
  6. rabbitAdmin.purgeQueue("test.topic.queue", false);
  7. }

}

  1. :::info
  2. - 可以分别声明 ExchangeQueueBinding
  3. - `rabbitAdmin.declareExchange(new FanoutExchange(...));`
  4. - `rabbitAdmin.declareQueue(new Queue(...));`
  5. - `rabbitAdmin.declareBinding(new Binding(...)):`
  6. - 也可以在声明 Binding 时直接创建 ExchangeQueueRouteKey
  7. - rabbitAdmin.declareBinding(
  8. BindingBuilder<br /> .bind(new Queue("test.topic.queue", false)) //直接创建队列<br /> .to(new TopicExchange("test.topic", false, false)) //直接创建交换机 建立关联关系<br /> .with("user.#"));
  9. :::
  10. <a name="pDyQs"></a>
  11. ## 1.2 SpringAMQP 声明
  12. - 在以前我们使用 RabbitMQ,需要在 RabbitMQ 核心 API 里面声明一个 Exchange、一个 Queue、一个 Binding
  13. ```java
  14. channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
  15. channel.queueDeclare("dlx.queue", true, false, false, null);
  16. channel.queueBind("dlx.queue", "dlx.exchange", "#");
  • 在我们整合了 SpringAMQP 之后,可以使用 SpringAMQP 去声明,即声明 @Bean 方式

    • 在配置类 RabbitMQConfig 中使用 @Bean 声明交换机、队列、绑定

      1. /**
      2. * 针对消费者配置
      3. * 1. 设置交换机类型
      4. * 2. 将队列绑定到交换机
      5. FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念
      6. HeadersExchange :通过添加属性key-value匹配
      7. DirectExchange:按照routingkey分发到指定队列
      8. TopicExchange:多关键字匹配
      9. */
      10. @Bean
      11. public TopicExchange exchange01() {
      12. return new TopicExchange("topic_exchange01", true, false);
      13. }
      14. @Bean
      15. public Queue queue01() {
      16. return new Queue("queue01", true);
      17. }
      18. @Bean
      19. public Binding binding01() {
      20. return BindingBuilder.bind(queue01()).to(exchange01()).with("topic.*");
      21. }

1.3 SpringAMQP 消息模板组件:RabbitTemplate

1.3.1 描述

  • RabbitTemplate 是我们在与 SpringAQMP 整合的时候进行发送消息的关键类;
  • 该类提供了丰富的发送消息方法,包括可靠性投递消息方法、回调监听消息接口 ConfirmCallback、返回值确认接口 ReturnCallback 等等。同样我们需要进行注入到 Spring 容器中,然后直接使用。

  • 在与 Spring 整合时需要实例化,但是在与 SpringBoot 整合时,在配置文件里添加配置即可。

1.3.2 代码演示

  • 在配置类 RabbitMQConfig 中声明 RabbitTemplate:

    1. @Bean
    2. public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
    3. RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    4. return rabbitTemplate;
    5. }
  • 在测试类 ApplicationTests 中使用 RabbitTemplate 发送消息: ```java @Autowired private RabbitTemplate rabbitTemplate;

  1. @Test
  2. public void testSendMessage() throws Exception {
  3. //1 创建消息
  4. MessageProperties messageProperties = new MessageProperties();
  5. messageProperties.getHeaders().put("desc", "信息描述..");
  6. messageProperties.getHeaders().put("type", "自定义消息类型..");
  7. Message message = new Message("Hello RabbitMQ".getBytes(), messageProperties);
  8. rabbitTemplate.convertAndSend("topic001", "spring.amqp", message, new MessagePostProcessor() {
  9. @Override
  10. public Message postProcessMessage(Message message) throws AmqpException {
  11. System.err.println("------添加额外的设置---------");
  12. message.getMessageProperties().getHeaders().put("desc", "额外修改的信息描述");
  13. message.getMessageProperties().getHeaders().put("attr", "额外新加的属性");
  14. return message;
  15. }
  16. });
  17. }
  18. @Test
  19. public void testSendMessage2() throws Exception {
  20. //1 创建消息
  21. MessageProperties messageProperties = new MessageProperties();
  22. messageProperties.setContentType("text/plain");
  23. Message message = new Message("mq 消息1234".getBytes(), messageProperties);
  24. rabbitTemplate.send("topic001", "spring.abc", message);
  25. rabbitTemplate.convertAndSend("topic001", "spring.amqp", "hello object message send!");
  26. rabbitTemplate.convertAndSend("topic002", "rabbit.abc", "hello object message send!");
  27. }
  1. :::info
  2. - `rabbitTemplate.convertAndSend(exchange, routeKey, messageObject, [MessagePostProcessor]) `
  3. - messageObject 可以是 Message 类,也可以是简单的 String
  4. - MessagePostProcessor 是可选的,可以重写其中的方法给 Message 添加额外的配置;
  5. :::
  6. <a name="MffXu"></a>
  7. ## 1.4 SpringAMQP 消息容器:SimpleMessageListenerContainer
  8. <a name="Ox4A9"></a>
  9. ### 1.4.1 描述
  10. - SimpleMessageListenerContainer,简单消息监听容器;
  11. - 这个类非常的强大,我们可以对他进行很多设置,对于消费者的配置项,这个类都可以满足;
  12. - 监听队列(多个队列),自动启动、自动声明功能;
  13. - 设置事务特性、事务管理器、事务属性、事务容量(并发)、是否开启事务、回滚消息等;
  14. - 设置消费者数量、最小最大数量、批量消费;
  15. - 设置消息确认和自动确认模式、是否重回队列、异常捕获 handler 函数;
  16. - 设置消费者标签生成策略、是否独占模式、消费者属性等;
  17. - 设置具体的监听器、消息转换器等等;
  18. :::info
  19. - SimpleMessageListenerContainer 可以进行动态设置,比如在运行中的应用可以动态的修改其消费者数量的大小、接受消息的模式等;
  20. - 很多基于 RabbitMQ 的自定义化后端滚控台在进行动态设置的时候,也是根据这一特性去实现的。所以可以看出 SpringAMQP 非常的强大;
  21. :::
  22. **SimpleMessageListenerContainer 为什么可以动态感知配置变更?**
  23. <a name="ncZmz"></a>
  24. ### 1.4.2 代码演示
  25. - 在配置类 **RabbitMQConfig **中声明消息容器 **SimpleMessageListenerContainer**
  26. ```java
  27. @Bean
  28. public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
  29. SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
  30. // 监听多个队列
  31. container.setQueues(queue001(), queue002(), queue003(), queue_image(), queue_pdf());
  32. // 设置消费者最小最大数量
  33. container.setConcurrentConsumers(1);
  34. container.setMaxConcurrentConsumers(5);
  35. // 不开启重回队列
  36. container.setDefaultRequeueRejected(false);
  37. // 设置消息自动确认模式
  38. container.setAcknowledgeMode(AcknowledgeMode.AUTO);
  39. // 暴露监听通道
  40. container.setExposeListenerChannel(true);
  41. // 设置消费者标签生成策略
  42. container.setConsumerTagStrategy(new ConsumerTagStrategy() {
  43. @Override
  44. public String createConsumerTag(String queue) {
  45. return queue + "_" + UUID.randomUUID().toString();
  46. }
  47. });
  48. // 设置具体的监听器、消息转换器
  49. container.setMessageListener(new ChannelAwareMessageListener() {
  50. @Override
  51. public void onMessage(Message message, Channel channel) throws Exception {
  52. String msg = new String(message.getBody());
  53. System.err.println("----------消费者: " + msg);
  54. }
  55. });
  56. return container;
  57. }

1.5 SpringAMQP 消息适配器:MessageListenerAdapter

1.5.1 描述

  • MessageListenerAdapter,消息监听适配器
  • MessageListenerAdapter 类中的变量 defaultListenerMethod表示监听方法名称,默认方法名称是 "handleMessage"
  • 可用 adapter.setDefaultListenerMethod("consumeMessage"); 设置自定义的监听方法名称。
  • MessageDelegate 委托对象:实际真实的委托对象,自定义的类用于处理消息。
  • queueOrTagToMethodName 队列标识与方法名称组成的集合,可以将队列和方法名绑定,指定队列中的消息会被所绑定的方法处理

1.5.2 代码演示

  • 在配置类 RabbitMQConfig 中的消息容器 Bean 中通过消息适配器 adapter 使用自定义的消息监听器,不再是使用上文中 new ChannelAwareMessageListener() 并重写其 onMessage() 方法的做法。

  • MessageListenerAdapter 适配器有两种使用方式:

    • 方式一:**adapter.setDefaultListenerMethod("xxxxx");**

      • “xxxxx” 方法名称要与 MessageDelegate 类中自定义的方法对应;
      • MessageDelegate 类中自定义方法的传入参数默认byte[] 字节数组类型,如果想改变方法的入参,可以设置一个自定义的消息转换器,通过自定义一个 TextMessageConverter 实现 MessageConverter 接口;

        1. @Configuration
        2. @ComponentScan({"com.example.rabbitmqspring"})
        3. public class RabbitMQConfig {
        4. ...
        5. @Bean
        6. public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
        7. ...
        8. // 适配器使用方式1: 默认是有自己的方法名字的:handleMessage
        9. // 可以自己指定一个方法的名字: consumeMessage
        10. // 也可以添加一个转换器: 从字节数组转换为String
        11. MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
        12. adapter.setDefaultListenerMethod("consumeMessage");
        13. adapter.setMessageConverter(new TextMessageConverter());
        14. container.setMessageListener(adapter);
        15. return container;
        16. }
        17. }

        ```java public class MessageDelegate { // 使用默认方法名称 public void handleMessage(byte[] messageBody) { System.err.println(“默认方法, 消息内容:” + new String(messageBody)); }

      // 使用自定义的方法名称 public void consumeMessage(byte[] messageBody) { System.err.println(“字节数组方法, 消息内容:” + new String(messageBody)); }

      // 改变自定义方法的入参类型
      public void consumeMessage(String messageBody) { System.err.println(“字符串方法, 消息内容:” + messageBody); } } java public class TextMessageConverter implements MessageConverter {

      // Convert a Java object to a Message. @Override public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException { System.out.println(“TextMessageConverter.toMessage…”); return new Message(object.toString().getBytes(), messageProperties); }

      // Convert from a Message to a Java object. // 对接收到的message进行过滤:ContentType: text/plain的message Body类型改变为String类型 @Override public Object fromMessage(Message message) throws MessageConversionException { System.out.println(“TextMessageConverter.fromMessage…”); String contentType = message.getMessageProperties().getContentType(); if(null != contentType && contentType.contains(“text”)) {

      1. return new String(message.getBody());

      } return message.getBody(); }

}

  1. ```java
  2. // 测试类
  3. @SpringBootTest
  4. class ApplicationTests {
  5. ...
  6. @Test
  7. public void testSendMessageForText() throws Exception {
  8. //1 创建消息
  9. MessageProperties messageProperties = new MessageProperties();
  10. messageProperties.setContentType("text/plain");
  11. Message message = new Message("mq 消息1234".getBytes(), messageProperties);
  12. rabbitTemplate.send("topic001", "spring.abc", message);
  13. }
  14. }
  15. // 在测试方法中设置 Message 的ContentType:text/plain,并使用 rabbitTemplate 发送消息
  16. // 运行结果:
  17. 字符串方法, 消息内容:mq 消息1234
  18. TextMessageConverter.fromMessage...
  • 方式二:**adapter.setQueueOrTagToMethodName(Map<String, String> xxxx);**

    • 我们的 Queue 名称 和 Method 名称,也可以进行一一的匹配
    • map.put(“queue01”, “method01”); 可以指定队列中的消息会被所绑定的方法处理
    • “method01” 方法名称要与 MessageDelegate 类中自定义的方法对应;

      1. @Configuration
      2. @ComponentScan({"com.example.rabbitmqspring"})
      3. public class RabbitMQConfig {
      4. ...
      5. @Bean
      6. public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
      7. ...
      8. // 2 适配器方式: 我们的队列名称 和 方法名称 也可以进行一一的匹配
      9. MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
      10. adapter.setMessageConverter(new TextMessageConverter());
      11. Map<String, String> queueOrTagToMethodName = new HashMap<>();
      12. queueOrTagToMethodName.put("queue001", "method1");
      13. queueOrTagToMethodName.put("queue002", "method2");
      14. adapter.setQueueOrTagToMethodName(queueOrTagToMethodName);
      15. container.setMessageListener(adapter);
      16. return container;
      17. }
      18. }

      ```java public class MessageDelegate { public void method1(String messageBody) { System.err.println(“method1 收到消息内容:” + new String(messageBody)); }

    public void method2(String messageBody) { System.err.println(“method2 收到消息内容:” + new String(messageBody)); } } java // 测试类 @SpringBootTest class ApplicationTests { … @Test public void testSendMessageForText() throws Exception { //1 创建消息 MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType(“text/plain”); Message message = new Message(“mq 消息1234”.getBytes(), messageProperties);

    rabbitTemplate.send(“topic001”, “spring.abc”, message); rabbitTemplate.send(“topic002”, “rabbit.abc”, message); } }

// 在测试方法中设置 Message 的ContentType:text/plain,并使用 rabbitTemplate 发送消息 // 运行结果: TextMessageConverter.fromMessage… method1 收到消息内容:mq 消息1234 TextMessageConverter.fromMessage… method2 收到消息内容:mq 消息1234

  1. <a name="kcs1A"></a>
  2. ## 1.6 SpringAMQP 消息转换器:MessageConverter
  3. <a name="SVKw8"></a>
  4. ### 1.6.1 描述
  5. - MessageConverter:消息转换器;
  6. - 我们在发送消息的时候,正常情况下消息体为二进制的数据方式进行传输,如果希望内部帮我们进行转换,或者指定自定义的转换器,就需要用到 MessageConverter;
  7. - 如何自定义常用转换器:
  8. - 需要实现 MessageConverter 这个接口;
  9. - 需要重写两个方法:
  10. - toMessage:Convert a Java object to a Message;
  11. - fromMessage:Convert from a Message to a Java object;
  12. - 常用几种转换器:
  13. - 上节中使用到的 String 转换器;
  14. - Json 转换器:Jackson2JsonMessageConverter 进行 Java 对象的转换功能;
  15. - DefaultJackson2JavaTypeMapper 映射器:Java 对象的映射关系;
  16. - 自定义二进制转换器:如图片类型、PDF、PPT、流媒体;
  17. <a name="FwqjS"></a>
  18. ### 1.6.2 代码演示
  19. - 为了便于演示,创建了两个实体类:**entity.Order**、**entity.Packaged**;
  20. - **Json转换器(Map作为入参):Jackson2JsonMessageConverter**
  21. - 测试类中将 Java 对象 Order 转换成 Json String,然后封装成 Message 发送;
  22. - **RabbitMQConfig **中在适配器 adapter 中添加 Json 转换器;
  23. - 自定义监听方法,将 Message 消息体作为 Map 类型的入参;
  24. - 最终实现:`**Order Object => Json String => Message => Order 属性/值所对应的 Map**`;
  25. ```java
  26. @Configuration
  27. @ComponentScan({"com.example.rabbitmqspring"})
  28. public class RabbitMQConfig {
  29. ...
  30. @Bean
  31. public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
  32. ...
  33. // 1.1 支持json格式的转换器
  34. MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
  35. adapter.setDefaultListenerMethod("consumeMessage");
  36. Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
  37. adapter.setMessageConverter(jackson2JsonMessageConverter);
  38. container.setMessageListener(adapter);
  39. return container;
  40. }
  41. }
  1. public class MessageDelegate {
  2. public void consumeMessage(Map messageBody) {
  3. System.err.println("map方法, 消息内容:" + messageBody);
  4. System.err.println("map方法, 消息content:"+ messageBody.get("content"));
  5. }
  6. }
  1. // 测试类
  2. @SpringBootTest
  3. class ApplicationTests {
  4. ...
  5. @Test
  6. public void testSendJsonMessage() throws Exception {
  7. Order order = new Order();
  8. order.setId("001");
  9. order.setName("消息订单");
  10. order.setContent("描述信息");
  11. ObjectMapper mapper = new ObjectMapper();
  12. String json = mapper.writeValueAsString(order);
  13. System.err.println("order 4 json: " + json);
  14. MessageProperties messageProperties = new MessageProperties();
  15. //这里注意一定要修改contentType为 application/json
  16. messageProperties.setContentType("application/json");
  17. Message message = new Message(json.getBytes(), messageProperties);
  18. rabbitTemplate.send("topic001", "spring.order", message);
  19. }
  20. }
  21. // 在测试方法中设置 Message 的ContentType:application/json,并使用 rabbitTemplate 发送消息
  22. // 运行结果:
  23. order 4 json: {"id":"001","name":"消息订单","content":"描述信息"}
  24. map方法, 消息内容:{id=001, name=消息订单, content=描述信息}
  25. map方法, 消息content:描述信息
  • 支持 Java 对象的转换(Java Object 作为入参):DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter

    • 测试类中将 Java 对象 Order 转换成 Json String,然后封装成 Message 配置 Header 并发送;
    • RabbitMQConfig 中创建 javaTypeMapper,将 javaTypeMapper 配置到 Json 转换器,将 Json 转换器添加到适配器;
    • 自定义监听方法,将 Message 消息体作为 Java Object 类型的入参;
    • 最终实现:**Order Object => Json String => Message => Order Object**

      1. @Configuration
      2. @ComponentScan({"com.example.rabbitmqspring"})
      3. public class RabbitMQConfig {
      4. ...
      5. @Bean
      6. public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
      7. ...
      8. // 1.2 DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter 支持java对象转换
      9. MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
      10. adapter.setDefaultListenerMethod("consumeMessage");
      11. Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
      12. DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
      13. //否则会抛出异常The class '...' is not in the trusted packages: [java.util, java.lang].
      14. // If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
      15. //默认只支持java.util和java.lang包下的类
      16. javaTypeMapper.setTrustedPackages("*");
      17. jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
      18. adapter.setMessageConverter(jackson2JsonMessageConverter);
      19. container.setMessageListener(adapter);
      20. return container;
      21. }
      22. }
      1. public class MessageDelegate {
      2. public void consumeMessage(Order order) {
      3. System.err.println("order对象, 消息内容, id: " + order.getId() +
      4. ", name: " + order.getName() +
      5. ", content: "+ order.getContent());
      6. }
      7. }

      ```java // 测试类 @SpringBootTest class ApplicationTests { … @Test public void testSendJsonMessage() throws Exception {

      Order order = new Order(); order.setId(“001”); order.setName(“消息订单”); order.setContent(“描述信息”); ObjectMapper mapper = new ObjectMapper(); String json = mapper.writeValueAsString(order);

      System.err.println(“order 4 json: “ + json);

      MessageProperties messageProperties = new MessageProperties(); //这里注意一定要修改contentType为 application/json messageProperties.setContentType(“application/json”); Message message = new Message(json.getBytes(), messageProperties);

      rabbitTemplate.send(“topic001”, “spring.order”, message); } }

// 在测试方法中设置 Message 的ContentType:application/json,并使用 rabbitTemplate 发送消息 // 运行结果: order 4 json: {“id”:”001”,”name”:”消息订单”,”content”:”描述信息”} order对象, 消息内容, id: 001, name: 订单消息, content: 订单描述信息

  1. - **支持 Java 对象多映射转换:DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter**
  2. - 测试类中将 Java 对象 Order/Packaged 转换成 Json String,然后封装成 Message 配置 Header 并发送;
  3. - **RabbitMQConfig **中创建 javaTypeMapper 并配置标签与对应的类路径,将 javaTypeMapper 配置到 Json 转换器,将 Json 转换器添加到适配器;
  4. - 自定义监听方法,将 Message 消息体作为 Java Object 类型的入参;
  5. - 最终实现:`**Order/Packaged Object => Json String => Message => Order/Packaged Object**`
  6. ```java
  7. @Configuration
  8. @ComponentScan({"com.example.rabbitmqspring"})
  9. public class RabbitMQConfig {
  10. ...
  11. @Bean
  12. public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
  13. ...
  14. //1.3 DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter 支持java对象多映射转换
  15. MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
  16. adapter.setDefaultListenerMethod("consumeMessage");
  17. Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
  18. DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
  19. Map<String, Class<?>> idClassMapping = new HashMap<String, Class<?>>();
  20. idClassMapping.put("order", Order.class); //转换时根据Message Header中的标签进行对应转换
  21. idClassMapping.put("packaged", Packaged.class);
  22. javaTypeMapper.setIdClassMapping(idClassMapping);
  23. jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
  24. adapter.setMessageConverter(jackson2JsonMessageConverter);
  25. container.setMessageListener(adapter);
  26. return container;
  27. }
  28. }
  1. public class MessageDelegate {
  2. public void consumeMessage(Order order) {
  3. System.err.println("order对象, 消息内容, id: " + order.getId() +
  4. ", name: " + order.getName() +
  5. ", content: "+ order.getContent());
  6. }
  7. public void consumeMessage(Packaged pack) {
  8. System.err.println("package对象, 消息内容, id: " + pack.getId() +
  9. ", name: " + pack.getName() +
  10. ", content: "+ pack.getDescription());
  11. }
  12. }
  1. // 测试类
  2. @SpringBootTest
  3. class ApplicationTests {
  4. ...
  5. @Test
  6. public void testSendMappingMessage() throws Exception {
  7. ObjectMapper mapper = new ObjectMapper();
  8. Order order = new Order();
  9. order.setId("001");
  10. order.setName("订单消息");
  11. order.setContent("订单描述信息");
  12. String json1 = mapper.writeValueAsString(order);
  13. System.err.println("order 4 json: " + json1);
  14. MessageProperties messageProperties1 = new MessageProperties();
  15. //这里注意一定要修改contentType为 application/json
  16. messageProperties1.setContentType("application/json");
  17. messageProperties1.getHeaders().put("__TypeId__", "order"); //对应标签,不再是类路径
  18. Message message1 = new Message(json1.getBytes(), messageProperties1);
  19. rabbitTemplate.send("topic001", "spring.order", message1);
  20. Packaged pack = new Packaged();
  21. pack.setId("002");
  22. pack.setName("包裹消息");
  23. pack.setDescription("包裹描述信息");
  24. String json2 = mapper.writeValueAsString(pack);
  25. System.err.println("pack 4 json: " + json2);
  26. MessageProperties messageProperties2 = new MessageProperties();
  27. //这里注意一定要修改contentType为 application/json
  28. messageProperties2.setContentType("application/json");
  29. messageProperties2.getHeaders().put("__TypeId__", "packaged"); //对应标签,不再是类路径
  30. Message message2 = new Message(json2.getBytes(), messageProperties2);
  31. rabbitTemplate.send("topic001", "spring.pack", message2);
  32. }
  33. }
  34. // 在测试方法中设置 Message 的ContentType:application/json,并使用 rabbitTemplate 发送消息
  35. // 运行结果:
  36. order 4 json: {"id":"001","name":"订单消息","content":"订单描述信息"}
  37. pack 4 json: {"id":"002","name":"包裹消息","description":"包裹描述信息"}
  38. order对象, 消息内容, id: 001, name: 订单消息, content: 订单描述信息
  39. package对象, 消息内容, id: 002, name: 包裹消息, content: 包裹描述信息
  • 全局的转换器:ContentTypeDelegatingMessageConverter

    • RabbitMQConfig 中创建全局的转换器,可以创建很多小的转换器添加到全局转换器,然后将全局转换器添加到适配器 adapter; ```java @Configuration @ComponentScan({“com.example.rabbitmqspring”}) public class RabbitMQConfig { … @Bean public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) { … //1.4 ext convert

      MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate()); adapter.setDefaultListenerMethod(“consumeMessage”);

      //全局的转换器: ContentTypeDelegatingMessageConverter convert = new ContentTypeDelegatingMessageConverter();

      TextMessageConverter textConvert = new TextMessageConverter(); convert.addDelegate(“text”, textConvert); convert.addDelegate(“html/text”, textConvert); convert.addDelegate(“xml/text”, textConvert); convert.addDelegate(“text/plain”, textConvert);

      Jackson2JsonMessageConverter jsonConvert = new Jackson2JsonMessageConverter(); convert.addDelegate(“json”, jsonConvert); convert.addDelegate(“application/json”, jsonConvert);

      ImageMessageConverter imageConverter = new ImageMessageConverter(); convert.addDelegate(“image/png”, imageConverter); convert.addDelegate(“image”, imageConverter);

      PDFMessageConverter pdfConverter = new PDFMessageConverter(); convert.addDelegate(“application/pdf”, pdfConverter);

  1. adapter.setMessageConverter(convert);
  2. container.setMessageListener(adapter);
  3. return container;
  4. }

}

  1. - **自定义二进制转换器,图片转换器:**
  2. - **RabbitMQConfig **中将自定义图片转换器添加到全局转换器,并将标签与子转换器相对应,标签值要与 Message ContentType 类型保持一致;
  3. - 测试类中,从本地读取照片存储为 byte[] 类型,设置 Message ContentType Header,发送 Message
  4. - 图片转换器需要自定义,与文本转换器一样,实现 MessageConverter 接口,重写两个方法,fromMessage 方法中将接收的 Message byte[] 转换成 File 存在本地作为图片文件;
  5. - 接收到的 Message 转换成 FIle 后,委托类 MessageDelegate 自定义消息监听方法会被调用,File 类型作为入参;
  6. - 最终实现:图片 File => Byte[] => Message =>Byte[] => File =》图片
  7. ```java
  8. public class ImageMessageConverter implements MessageConverter {
  9. @Override
  10. public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
  11. throw new MessageConversionException(" convert error ! ");
  12. }
  13. @Override
  14. public Object fromMessage(Message message) throws MessageConversionException {
  15. System.err.println("-----------Image MessageConverter----------");
  16. Object _extName = message.getMessageProperties().getHeaders().get("extName");
  17. String extName = _extName == null ? "png" : _extName.toString();
  18. byte[] body = message.getBody();
  19. String fileName = UUID.randomUUID().toString();
  20. String path = "d:/010_test/" + fileName + "." + extName;
  21. File f = new File(path);
  22. try {
  23. Files.copy(new ByteArrayInputStream(body), f.toPath());
  24. } catch (IOException e) {
  25. e.printStackTrace();
  26. }
  27. return f;
  28. }
  29. }
  1. public class MessageDelegate {
  2. public void consumeMessage(File file) {
  3. System.err.println("文件对象 方法, 消息内容:" + file.getName());
  4. }
  5. }
  1. // 测试类
  2. @SpringBootTest
  3. class ApplicationTests {
  4. ...
  5. @Test
  6. public void testSendExtConverterMessage() throws Exception {
  7. byte[] body = Files.readAllBytes(Paths.get("d:/002_books", "picture.png"));
  8. MessageProperties messageProperties = new MessageProperties();
  9. messageProperties.setContentType("image/png");
  10. messageProperties.getHeaders().put("extName", "png");
  11. Message message = new Message(body, messageProperties);
  12. rabbitTemplate.send("", "image_queue", message);
  13. // byte[] body = Files.readAllBytes(Paths.get("d:/002_books", "mysql.pdf"));
  14. // MessageProperties messageProperties = new MessageProperties();
  15. // messageProperties.setContentType("application/pdf");
  16. // Message message = new Message(body, messageProperties);
  17. // rabbitTemplate.send("", "pdf_queue", message);
  18. }
  19. }
  20. // 在测试方法中设置 Message 的ContentType:application/json,并使用 rabbitTemplate 发送消息
  21. // 运行结果:
  22. order 4 json: {"id":"001","name":"订单消息","content":"订单描述信息"}
  23. pack 4 json: {"id":"002","name":"包裹消息","description":"包裹描述信息"}
  24. order对象, 消息内容, id: 001, name: 订单消息, content: 订单描述信息
  25. package对象, 消息内容, id: 002, name: 包裹消息, content: 包裹描述信息
  • 自定义二进制转换器,PDF 转换器: ```java public class PDFMessageConverter implements MessageConverter {

    @Override public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {

    1. throw new MessageConversionException(" convert error ! ");

    }

    @Override public Object fromMessage(Message message) throws MessageConversionException {

    1. System.err.println("-----------PDF MessageConverter----------");
    2. byte[] body = message.getBody();
    3. String fileName = UUID.randomUUID().toString();
    4. String path = "d:/010_test/" + fileName + ".pdf";
    5. File f = new File(path);
    6. try {
    7. Files.copy(new ByteArrayInputStream(body), f.toPath());
    8. } catch (IOException e) {
    9. e.printStackTrace();
    10. }
    11. return f;

    }

}

  1. <a name="BUTM3"></a>
  2. # 2. RabbitMQ 与 SpringBoot2.x 整合
  3. <a name="eaSSg"></a>
  4. ## 2.1 基本配置
  5. - Pom 添加依赖
  6. ```xml
  7. <dependency>
  8. <groupId>org.springframework.boot</groupId>
  9. <artifactId>spring-boot-starter-amqp</artifactId>
  10. </dependency>

2.2 生产端整合

  • 创建生产端项目 rabbitmq-springboot-producer

    2.2.1 配置文件

  • application.properties 文件

    • publisher-confirms,需要实现一个监听器用于监听 Broker 端给我们返回的确认消息:RabbitTemplate.ConfirmCallback
    • publisher-returns,保证消息对 Broker 端是可达的,如果出现路由键不可达的情况,则需要实现监听器对不可达的消息进行后续的处理,保证消息的路由成功:RabbitTemplate.ReturnCallback
    • 注意,在发送消息的时候对 template 进行配置 **mandatory=true** 保证监听有效。默认 false,当 Broker 收到路由不可达的消息,会自动删除该消息,此时上面的监听器就无法生效;
    • 生产端还可以配置其他属性,比如发送重试、超时时间、次数、间隔等;
      1. spring:
      2. rabbitmq:
      3. addresses: 192.168.12.132:5672
      4. username: shawn
      5. password: 123
      6. virtual-host: /
      7. connection-timeout: 15000
      8. #开启Publisher Confirm机制
      9. #publisher-confirms: true 新版本此配置已过时,使用下面配置替换
      10. publisher-confirm-type: correlated
      11. #开启Publisher Return机制
      12. publisher-returns: true
      13. #启用强制消息,设置为false收不到Publisher Return机制返回的消息
      14. template:
      15. mandatory: true

      2.2.2 RabbitMQConfig(可选)

      可以参考与 Spring 整合中的配置类,就是以 @Bean 的方式申明交换机、队列与绑定关系。
      1. @Configuration
      2. @ComponentScan({"com.example.rabbitmq"})
      3. public class RabbitMQConfig {
      4. ...
      5. }

      2.2.3 发送消息

      为了便于测试,将发送消息的逻辑封装成一个 Service 类,这样可以在测试类中调用:
  • 由于没有在 RabbitMQConfig 配置交换机、队列、绑定,我们可以直接在控制台创建交换机、队列、绑定:

    • 交换机:exchange-1、exchange-2
    • 队列:queue-1、queue-2
    • RouteKey:springboot.#
  • 实现了 ConfirmCallback,来完善 Broker 收到消息反馈给生产端的消息确认机制;
  • 实现了 ReturnCallback,完善了 Broker 收到路由不到队列的消息时的后续处理,Return 机制确保消息可被路由;
  • 如果需要自定义消息体属性 Properties,可以在 Service 类中自己定义一个 send 方法,接收 Object 类型消息内容和 Map 类型的 message properties 作为入参,封装成 springframework.messaging 的 Message(非 SpringAMQP 的 Message) 并进行发送消息;
  • 如果不需要自定义消息属性,直接使用默认属性就满足,那么可以在自己定义一个 sendOrder 方法,接收 Java 对象 Order,然后使用 rabbitTemplate.convertAndSend() 方法直接发送对象作为消息;

    • 注意,convertAndSend() 只能直接发送 String 对象,想发送自定义的 Order 对象需要对象实现序列化接口; ```java @Component public class RabbitSender {

      //自动注入RabbitTemplate模板类 @Autowired private RabbitTemplate rabbitTemplate;

      //回调函数: confirm确认 final ConfirmCallback confirmCallback = new ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) {

      1. System.err.println("correlationData: "+ correlationData);
      2. System.err.println("ack: "+ ack);
      3. if(!ack){
      4. System.err.println("异常处理...");
      5. }

      } };

      //回调函数: return返回 final ReturnCallback returnCallback = new ReturnCallback() { @Override public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText,

      1. String exchange, String routingKey) {
      2. System.err.println("return exchange: " + exchange + ", routingKey: "
      3. + routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);

      } };

      //发送消息方法调用: 构建Message消息 public void send(Object message, Map properties) throws Exception { MessageHeaders mhs = new MessageHeaders(properties); Message msg = MessageBuilder.createMessage(message, mhs); rabbitTemplate.setConfirmCallback(confirmCallback); rabbitTemplate.setReturnCallback(returnCallback); //id + 时间戳 全局唯一,作为 deliveryTag 唯一标识消息,这里简单的使用一个long数字 CorrelationData correlationData = new CorrelationData(“1234567890”); rabbitTemplate.convertAndSend(“exchange-1”, “springboot.abc”, msg, correlationData); }

      //发送消息方法调用: 构建自定义对象消息 public void sendOrder(Order order) throws Exception { rabbitTemplate.setConfirmCallback(confirmCallback); rabbitTemplate.setReturnCallback(returnCallback); //id + 时间戳 全局唯一 CorrelationData correlationData = new CorrelationData(“0987654321”); rabbitTemplate.convertAndSend(“exchange-2”, “springboot.def”, order, correlationData); }

}

  1. ```java
  2. //测试类
  3. @SpringBootTest
  4. class ApplicationTests {
  5. @Test
  6. void contextLoads() {
  7. }
  8. @Autowired
  9. private RabbitSender rabbitSender;
  10. private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
  11. @Test
  12. public void testSender1() throws Exception {
  13. Map<String, Object> properties = new HashMap<>();
  14. properties.put("number", "12345");
  15. properties.put("send_time", simpleDateFormat.format(new Date()));
  16. rabbitSender.send("Hello RabbitMQ For Spring Boot!", properties);
  17. }
  18. @Test
  19. public void testSender2() throws Exception {
  20. Order order = new Order("001", "第一个订单");
  21. rabbitSender.sendOrder(order);
  22. }
  23. }
  24. # 测试结果:
  25. ############### testSender1() ###################
  26. # 消息可到达队列
  27. correlationData: CorrelationData [id=1234567890]
  28. ack: true
  29. #将RabbitSender.send()中RouteKey改为spring.abc,消息不可到达队列
  30. return exchange: exchange-1, routingKey: spring.abc, replyCode: 312, replyText: NO_ROUTE
  31. correlationData: CorrelationData [id=1234567890]
  32. ack: true
  33. ############### testSender2() ###################
  34. # 消息可到达队列
  35. correlationData: CorrelationData [id=0987654321]
  36. ack: true
  1. public class Order implements Serializable {
  2. private String id;
  3. private String name;
  4. ...
  5. }

2.3 消费端整合

  • 创建生产端项目 rabbitmq-springboot-consumer

    2.3.1 配置文件

  • application.yaml 文件

    • 首先配置手工确认模式,用于 ACK 的手工确认,这样可以保证消息的可靠性送达,或者在消费端消费失败的时候可以做到重回队列、根据业务记录日志等处理;
    • 可以设置消费端的监听个数和最大个数,用于控制消费端的并发情况;
    • 消费端监听使用 @RabbitListener 注解,@RabbitListener 是一个组合注解,里面可以注解配置;@Queue@QueueBinding@Exchange 直接通过这个组合注解一次性解决消费端交换机、队列、绑定、路由并且配置监听功能等;
      • 建议:由于类配置写在代码里非常不友好,所以强烈建议大家使用配置文件配置。
        1. spring:
        2. rabbitmq:
        3. host: 192.168.12.132
        4. port: 5672
        5. username: shawn
        6. password: 123
        7. virtual-host: /
        8. connection-timeout: 15000
        9. #上面跟生产端基本保持一致
        10. listener:
        11. simple:
        12. acknowledge-mode: manual # 手动ack
        13. concurrency: 1 # 监听消息的个数
        14. max-concurrency: 5
        15. # 自定义mq配置 用于声明交换机、队列、绑定路由的参数
        16. order:
        17. queue:
        18. name: queue-2
        19. durable: true
        20. exchange:
        21. name: exchange-2
        22. durable: true
        23. type: topic
        24. ignoreDeclarationExceptions: true
        25. key: springboot.*

        2.3.2 监听消息

  • 在测试前,先把之前在控制台创建的交换机、队列删除,因为这里先启动消费端服务,由消费端创建交换机、队列、绑定;

  • RabbitReceiver Service 中 onMessage、onOrderMessage 方法是自己定义的方法,方法名随便取,关键是 @RabbitListener 声明交换机、队列、绑定等配置,@RabbitHandler 声明自定义的方法监听消息并处理消息;
  • onMessage 接收 springframework.messaging 的 Message(非 SpringAMQP 的 Message);
  • onOrderMessage 接收 Java 对象 Order,与生产者的 Order 对象一样;

    1. @Component
    2. public class RabbitReceiver {
    3. @RabbitListener(bindings = @QueueBinding(
    4. value = @Queue(value = "queue-1",
    5. durable="true"),
    6. exchange = @Exchange(value = "exchange-1",
    7. durable="true",
    8. type= "topic",
    9. ignoreDeclarationExceptions = "true"),
    10. key = "springboot.*"
    11. )
    12. )
    13. @RabbitHandler
    14. public void onMessage(Message message, Channel channel) throws Exception {
    15. System.err.println("--------------------------------------");
    16. System.err.println("消费端Payload: " + message.getPayload());
    17. Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
    18. //手工ACK
    19. channel.basicAck(deliveryTag, false);
    20. }
    21. * @param order
    22. * @param channel
    23. * @param headers
    24. * @throws Exception
    25. */
    26. @RabbitListener(bindings = @QueueBinding(
    27. value = @Queue(value = "${spring.rabbitmq.listener.order.queue.name}",
    28. durable="${spring.rabbitmq.listener.order.queue.durable}"),
    29. exchange = @Exchange(value = "${spring.rabbitmq.listener.order.exchange.name}",
    30. durable="${spring.rabbitmq.listener.order.exchange.durable}",
    31. type= "${spring.rabbitmq.listener.order.exchange.type}",
    32. ignoreDeclarationExceptions = "${spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions}"),
    33. key = "${spring.rabbitmq.listener.order.key}"
    34. )
    35. )
    36. @RabbitHandler
    37. public void onOrderMessage(@Payload Order order,
    38. Channel channel,
    39. @Headers Map<String, Object> headers) throws Exception {
    40. System.err.println("--------------------------------------");
    41. System.err.println("消费端order: " + order.getId());
    42. Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
    43. //手工ACK
    44. channel.basicAck(deliveryTag, false);
    45. }
    46. }

    ```

    测试结果:

    ######### testSender1()

消费端Payload: Hello RabbitMQ For Spring Boot!

######### testSender2()

消费端order: 001

  1. <a name="mArPm"></a>
  2. # 3. RabbitMQ 与 SpringCloudStream 整合
  3. SpringCloudStream 不能保证消息的100%可靠性,用于和 Kafka 兼顾,目的是高性能的消息通信。
  4. <a name="T9iH2"></a>
  5. ## 3.1 SpringCloudStream 整体架构
  6. ![](https://cdn.nlark.com/yuque/0/2021/png/1471554/1621330479030-23b7325f-b91f-4a1f-9cef-a1836db0b174.png#clientId=uc6cd2f8e-917d-4&from=paste&height=334&id=uad87721d&margin=%5Bobject%20Object%5D&originHeight=668&originWidth=1450&originalType=url&status=done&style=none&taskId=udd750071-b44c-4228-88d9-3f83168086b&width=725)<br />使用 SpringCloudStream,消息的生产端和消费端可以是两种不同的消息队列。<br />![](https://cdn.nlark.com/yuque/0/2021/png/1471554/1621330552614-3c72918b-27ec-4559-9a2b-07b1fcd61163.png#clientId=uc6cd2f8e-917d-4&from=paste&id=ua9b55521&margin=%5Bobject%20Object%5D&originHeight=389&originWidth=460&originalType=url&status=done&style=none&taskId=ud0e8b0f6-8ee7-42da-bb25-f73eb80fee0)<br />Spring Cloud Stream 通过定义绑定器 Binder 作为中间层,完美地实现了应用程序与消息中间件之间的隔离。通过向应用程序暴露统一的 Channel 通道,使得应用程序不需要再考虑各种不同的消息中间件的实现。当需要升级消息中间件,或是更换其他消息中间件产品时,只需要更换对应的 Binder 绑定器而不需要修改任何的应用逻辑。
  7. ![](https://cdn.nlark.com/yuque/0/2021/png/1471554/1621330645210-97a264bc-fe3c-407f-b222-c7430c9b5b32.png#clientId=uc6cd2f8e-917d-4&from=paste&height=302&id=u91b599cd&margin=%5Bobject%20Object%5D&originHeight=403&originWidth=744&originalType=url&status=done&style=none&taskId=u09d6e37c-0103-4e7a-99ca-fbbdbe8cd3a&width=558)<br />上图中黄色的为 RabbitMQ 的部分,绿色的部分为 Spring Cloud Stream 在生产者和消费者添加了一层中间件。
  8. <a name="g1KAw"></a>
  9. ## 3.2 Barista 接口
  10. Barista 接口:定义作为后面类的参数,定义通道类型(决定通道是用于发送还是接收消息)和通道名称(作为配置用)。
  11. - @EnableBinding:value 参数指定用于定义绑定消息通道的接口,在应用启动时实现对定义消息通道的绑定;
  12. - @Output:输出注解,用于定义发送消息接口;
  13. - @Input:输入注解,用于定义消息的消费者接口;
  14. - @StreamListener:用于定义监听方法的注解;
  15. <a name="bfz9y"></a>
  16. ## 3.3 整合
  17. <a name="b7tJH"></a>
  18. ### 3.3.1 添加依赖
  19. ```xml
  20. <dependency>
  21. <groupId>org.springframework.cloud</groupId>
  22. <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
  23. </dependency>

3.3.2 生产端

Barista 接口:定义输出通道,添加绑定关系

  1. public interface Barista {
  2. String OUTPUT_CHANNEL = "output_channel";
  3. // 注解@Output声明了它是一个输出类型的通道,名字是output_channel。
  4. // 这一名字与app1中通道名一致,表明注入了一个名字为output_channel的通道
  5. // 类型是output,发布的主题名为mydest。
  6. @Output(Barista.OUTPUT_CHANNEL)
  7. MessageChannel logoutput();
  8. }

配置:

  1. spring:
  2. cloud:
  3. stream:
  4. bindings:
  5. output_channel:
  6. destination: exchange-3
  7. group: queue-3
  8. binder: rabbit_cluster
  9. binders:
  10. rabbit_cluster:
  11. type: rabbit
  12. enviroment:
  13. spring:
  14. rabbitmq:
  15. address: 192.168.58.129:5672
  16. username: orcas
  17. password: 1224
  18. virtual-host: /
  1. @EnableBinding(Barista.class)
  2. @Service
  3. public class RabbitmqSender {
  4. @Autowired
  5. private Barista barista;
  6. // 发送消息
  7. public String sendMessage(Object message, Map<String, Object> properties) throws Exception {
  8. try{
  9. MessageHeaders mhs = new MessageHeaders(properties);
  10. Message msg = MessageBuilder.createMessage(message, mhs);
  11. boolean sendStatus = barista.logoutput().send(msg);
  12. System.err.println("--------------sending -------------------");
  13. System.out.println("发送数据:" + message + ",sendStatus: " + sendStatus);
  14. }catch (Exception e){
  15. System.err.println("-------------error-------------");
  16. e.printStackTrace();
  17. throw new RuntimeException(e.getMessage());
  18. }
  19. return null;
  20. }
  21. }

3.3.3 消费端

Barista

  1. public interface Barista {
  2. String INPUT_CHANNEL = "input_channel";
  3. // 注解@Input声明了它是一个输入类型的通道,名字是Barista.INPUT_CHANNEL,也就是position3的input_channel。
  4. // 这一名字与上述配置app2的配置文件中position1应该一致,表明注入了一个名字叫做input_channel的通道,
  5. // 它的类型是input,订阅的主题是position2处声明的mydest这个主题
  6. @Input(Barista.INPUT_CHANNEL)
  7. SubscribableChannel loginput();
  8. }

配置:

  1. spring:
  2. cloud:
  3. stream:
  4. bindings:
  5. input_channel:
  6. destination: exchange-3
  7. group: queue-3
  8. binder: rabbit_cluster
  9. consumer:
  10. concurrency: 1
  11. rabbit:
  12. bindings:
  13. input_channle:
  14. consumer:
  15. requeue-rejected: false
  16. acknowledge-mode: MANUAL
  17. recovery-interval: 3000
  18. durable-subscription: true
  19. max-concurrency: 5
  20. binders:
  21. rabbit_cluster:
  22. type: rabbit
  23. enviroment:
  24. spring:
  25. rabbitmq:
  26. address: 192.168.58.129:5672
  27. username: orcas
  28. password: 1224
  29. virtual-host: /

接收:

  1. @EnableBinding(Barista.class)
  2. @Service
  3. public class RabbitmqReceiver {
  4. @StreamListener(Barista.INPUT_CHANNEL)
  5. public void receiver(Message message) throws Exception {
  6. Channel channel = (com.rabbitmq.client.Channel) message.getHeaders().get(AmqpHeaders.CHANNEL);
  7. Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
  8. System.out.println("Input Stream 1 接受数据:" + message);
  9. System.out.println("消费完毕------------");
  10. channel.basicAck(deliveryTag, false);
  11. }
  12. }