原理:
交换机种类
RabbitMQ常用的交换器类型有fanout、direct、topic、headers这四种类型
(1)fanout:它会把所有的交换器上的消息路由到所有与该交换器邦定的队列中,不需要BindingKey生效
(2)direct:它会把消息路由到BindingKey与RoutingKey完全匹配的队列中。比如在发送消息的时候,设置Label中RoutingKey为warning,则消息会路由到Queue1与Queue2上(请看下图)。
(3)topic:是direct上的扩展,同样是利用RoutingKey与BindingKey相匹配,但是匹配规则不一样,支持模糊匹配。有如下的规则
- RoutingKey为一个点号“.”分隔的字符串,每个被隔开的独立字符串即为一个单词,是匹配的单位;- BindingKey和RoutingKey一样,也是"."分割的字符串;- 但不同的是BindingKey,可以用“#”,“*”进行类似于占位符的模糊匹配,“#”表示一个单词,"*"表示多个单词(也可以是零个)
比如:
RoutingKey为com.hidden.client的消息只会到队列Queue2中:因为只有Queue2的BindingKey=..client匹配com.hidden.client
RoutingKey为com.rabbitmq.client的消息会到队列Queue1-2中: 因为Queue1的BindingKey=.rabbitmq..匹配com.rabbitmq.client,Queue2的BindingKey=..client匹配com.rabbitmq.client
RoutingKey为java.rabbitmq.demo的消息只会到队列Queue1中:因为只有Queue1的BindingKey=.rabbitmq..匹配java.rabbitmq.demo
RoutingKey为java.util.concurrent的消息会被丢弃或者返回给生产者(需要设置)
(4)headers:依赖发送消息内容中的hearders属性进行匹配,在绑定队列和交换器时指定一组键值对,这里的也就是headers,当发送消息到交换器时,RabbitMQ会获取到该消息的headers,通过比较会路由到相关队列中,这种交换器性能会很差,一般不会使用。
应答模式
- AcknowledgeMode.NONE:不确认
- AcknowledgeMode.AUTO:自动确认
- AcknowledgeMode.MANUAL:手动确认
全局配置
spring:rabbitmq:listener:simple:acknowledge-mode: manual
在特定接受者上配置
```java @RabbitListener(queues =”topic.message”) public void process1(@Payload String message,@Header(AmqpHeaders.DELIVERY_TAG)long deliveryTag, Channel channel)throws Exception {
}//定义队列的消费者Consumer consumer = new DefaultConsumer(channel);//监听队列,设置手动返回完成,第二个参数 false表示需要手动确认返回channel.basicConsume("topic.message",false,consumer);//手动确认返回-不成功就用basicNackchannel.basicAck(deliveryTag,false);System.out.print("这里是手动返回确认信息 ");
<a name="qN1Mp"></a># pom配置```xml<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>springcloud-demo</artifactId><groupId>demo</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>demo-stream-rabbit-provider</artifactId><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency><!--sentinel--><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-sentinel</artifactId></dependency><!--健康监控--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId></dependency><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId></dependency><!-- mybatis 和SpringBoot 整合--><dependency><groupId>org.mybatis.spring.boot</groupId><artifactId>mybatis-spring-boot-starter</artifactId></dependency><!-- MySQL 驱动 --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid</artifactId></dependency><!-- jdbc --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-jdbc</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><scope>runtime</scope><optional>true</optional></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId></dependency></dependencies></project>
yaml配置
server:port: 8003spring:application:name: demo-stream-rabbit-providerdatasource:type: com.alibaba.druid.pool.DruidDataSourcedriver-class-name: com.mysql.jdbc.Driverurl: jdbc:mysql://192.168.2.20:30569/test?useUnicode=true&characterEncoding=utf-8&useSSL=falseusername: rootpassword: rootcloud:nacos:server-addr: 192.168.2.20:30010username: nacospassword: nacossentinel:transport:dashboard: 192.168.2.20:31596 # 控制台的安装位置# port: 8719 # 与sentinel单独连接的端口# client-ip: 192.168.2.6 # 本机的ip,如果sentinel装在虚拟机,必须配这个# port: 30195 # 与sentinel单独连接的端口# client-ip: 192.168.2.20 # 本机的ip,如果sentinel装在虚拟机,必须配这个stream:binders: # 在此处配置要绑定的rabbitmq的服务信息;defaultRabbit: # 表示定义的名称,用于于binding整合type: rabbit # 消息组件类型# environment: # 设置rabbitmq的相关的环境配置# spring:# rabbitmq:# host: 192.168.2.20# port: 31672# username: user# password: ze2tgb2WGCbindings: # 服务的整合处理output: # 这个名字是一个通道的名称destination: studyExchange # 表示要使用的Exchange名称定义content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”binder: defaultRabbit # 设置要绑定的消息服务的具体设置group: default-groupinput: # 这个名字是一个通道的名称destination: studyExchange # 表示要使用的Exchange名称定义binder: defaultRabbit # 设置要绑定的消息服务的具体设置group: default-grouprabbitmq:host: 192.168.2.20port: 31672username: userpassword: ze2tgb2WGC# 暴露应用信息management:endpoints:web:exposure:include: '*'
示例:controller
package com.test.rabbit.consumer.controller;import com.test.rabbit.consumer.service.MessageProviderService;import lombok.RequiredArgsConstructor;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;/*** @author jia* @since 2022-04-05 11:39*/@RestController@RequestMapping("/rabbit")@RequiredArgsConstructorpublic class SendMessageController {protected final MessageProviderService messageProviderService;@GetMapping("/sendMessage")public String sendMessage() {return messageProviderService.send();}}
示例:service
package com.test.rabbit.consumer.serviceimpl;import com.test.rabbit.consumer.service.MessageProviderService;import lombok.extern.slf4j.Slf4j;import org.springframework.cloud.stream.annotation.EnableBinding;import org.springframework.cloud.stream.messaging.Source;import org.springframework.messaging.MessageChannel;import org.springframework.messaging.support.MessageBuilder;import javax.annotation.Resource;import java.util.UUID;/*** @author jia* @since 2022-04-05 11:42*/@EnableBinding(Source.class)@Slf4jpublic class MessageProviderServiceImpl implements MessageProviderService {@ResourceMessageChannel output;@Overridepublic String send() {String serial = UUID.randomUUID().toString();output.send(MessageBuilder.withPayload(serial).build());log.info("***** serial: {}", serial);return null;}}
参考
https://github.com/spring-cloud/spring-cloud-stream-samples/
https://www.jianshu.com/p/33cbe3132e63
