原理:

交换机种类

RabbitMQ常用的交换器类型有fanout、direct、topic、headers这四种类型
(1)fanout它会把所有的交换器上的消息路由到所有与该交换器邦定的队列中,不需要BindingKey生效
(2)direct:它会把消息路由到BindingKey与RoutingKey完全匹配的队列中。比如在发送消息的时候,设置Label中RoutingKey为warning,则消息会路由到Queue1与Queue2上(请看下图)。
demo-rabbitmq-developer - 图1
(3)topic:是direct上的扩展,同样是利用RoutingKey与BindingKey相匹配,但是匹配规则不一样,支持模糊匹配。有如下的规则

  1. - RoutingKey为一个点号“.”分隔的字符串,每个被隔开的独立字符串即为一个单词,是匹配的单位;
  2. - BindingKeyRoutingKey一样,也是"."分割的字符串;
  3. - 但不同的是BindingKey,可以用“#”,“*”进行类似于占位符的模糊匹配,“#”表示一个单词,"*"表示多个单词(也可以是零个)

比如:
RoutingKey为com.hidden.client的消息只会到队列Queue2中:因为只有Queue2的BindingKey=..client匹配com.hidden.client
RoutingKey为com.rabbitmq.client的消息会到队列Queue1-2中: 因为Queue1的BindingKey=.rabbitmq..匹配com.rabbitmq.client,Queue2的BindingKey=..client匹配com.rabbitmq.client
RoutingKey为java.rabbitmq.demo的消息只会到队列Queue1中:因为只有Queue1的BindingKey=.rabbitmq..匹配java.rabbitmq.demo
RoutingKey为java.util.concurrent的消息会被丢弃或者返回给生产者(需要设置)
demo-rabbitmq-developer - 图2

(4)headers:依赖发送消息内容中的hearders属性进行匹配,在绑定队列和交换器时指定一组键值对,这里的也就是headers,当发送消息到交换器时,RabbitMQ会获取到该消息的headers,通过比较会路由到相关队列中,这种交换器性能会很差,一般不会使用。

应答模式

  • AcknowledgeMode.NONE:不确认
  • AcknowledgeMode.AUTO:自动确认
  • AcknowledgeMode.MANUAL:手动确认

    全局配置

    1. spring:
    2. rabbitmq:
    3. listener:
    4. simple:
    5. acknowledge-mode: manual

    在特定接受者上配置

    ```java @RabbitListener(queues =”topic.message”) public void process1(@Payload String message,@Header(AmqpHeaders.DELIVERY_TAG)long deliveryTag, Channel channel)throws Exception {
    1. //定义队列的消费者
    2. Consumer consumer = new DefaultConsumer(channel);
    3. //监听队列,设置手动返回完成,第二个参数 false表示需要手动确认返回
    4. channel.basicConsume("topic.message",false,consumer);
    5. //手动确认返回-不成功就用basicNack
    6. channel.basicAck(deliveryTag,false);
    7. System.out.print("这里是手动返回确认信息 ");
    }
  1. <a name="qN1Mp"></a>
  2. # pom配置
  3. ```xml
  4. <?xml version="1.0" encoding="UTF-8"?>
  5. <project xmlns="http://maven.apache.org/POM/4.0.0"
  6. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  7. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  8. <parent>
  9. <artifactId>springcloud-demo</artifactId>
  10. <groupId>demo</groupId>
  11. <version>1.0-SNAPSHOT</version>
  12. </parent>
  13. <modelVersion>4.0.0</modelVersion>
  14. <artifactId>demo-stream-rabbit-provider</artifactId>
  15. <dependencies>
  16. <dependency>
  17. <groupId>org.springframework.boot</groupId>
  18. <artifactId>spring-boot-starter-web</artifactId>
  19. </dependency>
  20. <dependency>
  21. <groupId>org.springframework.cloud</groupId>
  22. <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
  23. </dependency>
  24. <!--sentinel-->
  25. <dependency>
  26. <groupId>com.alibaba.cloud</groupId>
  27. <artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
  28. </dependency>
  29. <!--健康监控-->
  30. <dependency>
  31. <groupId>org.springframework.boot</groupId>
  32. <artifactId>spring-boot-starter-actuator</artifactId>
  33. </dependency>
  34. <dependency>
  35. <groupId>com.alibaba.cloud</groupId>
  36. <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
  37. </dependency>
  38. <dependency>
  39. <groupId>com.alibaba.cloud</groupId>
  40. <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
  41. </dependency>
  42. <!-- mybatis 和SpringBoot 整合-->
  43. <dependency>
  44. <groupId>org.mybatis.spring.boot</groupId>
  45. <artifactId>mybatis-spring-boot-starter</artifactId>
  46. </dependency>
  47. <!-- MySQL 驱动 -->
  48. <dependency>
  49. <groupId>mysql</groupId>
  50. <artifactId>mysql-connector-java</artifactId>
  51. </dependency>
  52. <dependency>
  53. <groupId>com.alibaba</groupId>
  54. <artifactId>druid</artifactId>
  55. </dependency>
  56. <!-- jdbc -->
  57. <dependency>
  58. <groupId>org.springframework.boot</groupId>
  59. <artifactId>spring-boot-starter-jdbc</artifactId>
  60. </dependency>
  61. <dependency>
  62. <groupId>org.springframework.boot</groupId>
  63. <artifactId>spring-boot-devtools</artifactId>
  64. <scope>runtime</scope>
  65. <optional>true</optional>
  66. </dependency>
  67. <dependency>
  68. <groupId>org.projectlombok</groupId>
  69. <artifactId>lombok</artifactId>
  70. </dependency>
  71. <dependency>
  72. <groupId>org.springframework.boot</groupId>
  73. <artifactId>spring-boot-starter-test</artifactId>
  74. <scope>test</scope>
  75. </dependency>
  76. <dependency>
  77. <groupId>junit</groupId>
  78. <artifactId>junit</artifactId>
  79. </dependency>
  80. </dependencies>
  81. </project>

yaml配置

  1. server:
  2. port: 8003
  3. spring:
  4. application:
  5. name: demo-stream-rabbit-provider
  6. datasource:
  7. type: com.alibaba.druid.pool.DruidDataSource
  8. driver-class-name: com.mysql.jdbc.Driver
  9. url: jdbc:mysql://192.168.2.20:30569/test?useUnicode=true&characterEncoding=utf-8&useSSL=false
  10. username: root
  11. password: root
  12. cloud:
  13. nacos:
  14. server-addr: 192.168.2.20:30010
  15. username: nacos
  16. password: nacos
  17. sentinel:
  18. transport:
  19. dashboard: 192.168.2.20:31596 # 控制台的安装位置
  20. # port: 8719 # 与sentinel单独连接的端口
  21. # client-ip: 192.168.2.6 # 本机的ip,如果sentinel装在虚拟机,必须配这个
  22. # port: 30195 # 与sentinel单独连接的端口
  23. # client-ip: 192.168.2.20 # 本机的ip,如果sentinel装在虚拟机,必须配这个
  24. stream:
  25. binders: # 在此处配置要绑定的rabbitmq的服务信息;
  26. defaultRabbit: # 表示定义的名称,用于于binding整合
  27. type: rabbit # 消息组件类型
  28. # environment: # 设置rabbitmq的相关的环境配置
  29. # spring:
  30. # rabbitmq:
  31. # host: 192.168.2.20
  32. # port: 31672
  33. # username: user
  34. # password: ze2tgb2WGC
  35. bindings: # 服务的整合处理
  36. output: # 这个名字是一个通道的名称
  37. destination: studyExchange # 表示要使用的Exchange名称定义
  38. content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
  39. binder: defaultRabbit # 设置要绑定的消息服务的具体设置
  40. group: default-group
  41. input: # 这个名字是一个通道的名称
  42. destination: studyExchange # 表示要使用的Exchange名称定义
  43. binder: defaultRabbit # 设置要绑定的消息服务的具体设置
  44. group: default-group
  45. rabbitmq:
  46. host: 192.168.2.20
  47. port: 31672
  48. username: user
  49. password: ze2tgb2WGC
  50. # 暴露应用信息
  51. management:
  52. endpoints:
  53. web:
  54. exposure:
  55. include: '*'

示例:controller

  1. package com.test.rabbit.consumer.controller;
  2. import com.test.rabbit.consumer.service.MessageProviderService;
  3. import lombok.RequiredArgsConstructor;
  4. import org.springframework.web.bind.annotation.GetMapping;
  5. import org.springframework.web.bind.annotation.RequestMapping;
  6. import org.springframework.web.bind.annotation.RestController;
  7. /**
  8. * @author jia
  9. * @since 2022-04-05 11:39
  10. */
  11. @RestController
  12. @RequestMapping("/rabbit")
  13. @RequiredArgsConstructor
  14. public class SendMessageController {
  15. protected final MessageProviderService messageProviderService;
  16. @GetMapping("/sendMessage")
  17. public String sendMessage() {
  18. return messageProviderService.send();
  19. }
  20. }

示例:service

  1. package com.test.rabbit.consumer.serviceimpl;
  2. import com.test.rabbit.consumer.service.MessageProviderService;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.springframework.cloud.stream.annotation.EnableBinding;
  5. import org.springframework.cloud.stream.messaging.Source;
  6. import org.springframework.messaging.MessageChannel;
  7. import org.springframework.messaging.support.MessageBuilder;
  8. import javax.annotation.Resource;
  9. import java.util.UUID;
  10. /**
  11. * @author jia
  12. * @since 2022-04-05 11:42
  13. */
  14. @EnableBinding(Source.class)
  15. @Slf4j
  16. public class MessageProviderServiceImpl implements MessageProviderService {
  17. @Resource
  18. MessageChannel output;
  19. @Override
  20. public String send() {
  21. String serial = UUID.randomUUID().toString();
  22. output.send(MessageBuilder.withPayload(serial).build());
  23. log.info("***** serial: {}", serial);
  24. return null;
  25. }
  26. }

参考

https://github.com/spring-cloud/spring-cloud-stream-samples/
https://www.jianshu.com/p/33cbe3132e63